1use crate::acme::CertManager;
7use crate::config::ProxyConfig;
8use crate::error::{ProxyError, Result};
9use crate::lb::LoadBalancer;
10use crate::network_policy::NetworkPolicyChecker;
11use crate::routes::ServiceRegistry;
12use crate::service::{Activator, ReverseProxyService, RpsRegistry};
13use crate::sni_resolver::SniCertResolver;
14use hyper::body::Incoming;
15use hyper::server::conn::http1;
16use hyper::service::service_fn;
17use hyper::Request;
18use hyper_util::rt::TokioIo;
19use std::net::{IpAddr, SocketAddr};
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::net::{lookup_host, TcpListener, TcpStream};
23use tokio::sync::watch;
24use tokio_rustls::TlsAcceptor;
25use tracing::{debug, error, info, warn};
26
27const INGRESS_BIND_BACKOFF_INITIAL: Duration = Duration::from_secs(2);
29
30const INGRESS_BIND_BACKOFF_MAX: Duration = Duration::from_secs(30);
33
34const INGRESS_BIND_WARN_EVERY: u64 = 30;
39
40#[must_use]
46fn next_ingress_backoff(current: Duration) -> Duration {
47 (current * 2).min(INGRESS_BIND_BACKOFF_MAX)
48}
49
50#[must_use]
56fn should_warn_on_attempt(attempt: u64) -> bool {
57 attempt == 0 || attempt % INGRESS_BIND_WARN_EVERY == 0
58}
59
60async fn bind_with_retry(
71 addr: SocketAddr,
72 label: &str,
73 shutdown_rx: &mut watch::Receiver<bool>,
74) -> Option<TcpListener> {
75 let mut attempt: u64 = 0;
76 let mut backoff = INGRESS_BIND_BACKOFF_INITIAL;
77 let mut warned_eacces = false;
78
79 loop {
80 if *shutdown_rx.borrow() {
81 return None;
82 }
83
84 match TcpListener::bind(addr).await {
85 Ok(listener) => {
86 if attempt > 0 {
87 info!(addr = %addr, label, attempt, "Ingress bound after retrying");
88 }
89 return Some(listener);
90 }
91 Err(e) => {
92 let is_eacces = e.kind() == std::io::ErrorKind::PermissionDenied;
93 if is_eacces && !warned_eacces {
94 warned_eacces = true;
95 warn!(
96 addr = %addr,
97 label,
98 error = %e,
99 "Ingress bind denied: binding 80/443 needs root or CAP_NET_BIND_SERVICE; \
100 will keep retrying without aborting startup"
101 );
102 } else if should_warn_on_attempt(attempt) {
103 warn!(
104 addr = %addr,
105 label,
106 attempt,
107 error = %e,
108 "Ingress bind failed; will keep retrying (port may be held by another process)"
109 );
110 } else {
111 debug!(addr = %addr, label, attempt, error = %e, "Ingress bind retry");
112 }
113
114 tokio::select! {
115 () = tokio::time::sleep(backoff) => {}
116 _ = shutdown_rx.changed() => {
117 if *shutdown_rx.borrow() {
118 return None;
119 }
120 }
121 }
122
123 attempt = attempt.saturating_add(1);
124 backoff = next_ingress_backoff(backoff);
125 }
126 }
127 }
128}
129
130pub struct ProxyServer {
132 config: Arc<ProxyConfig>,
134 registry: Arc<ServiceRegistry>,
136 load_balancer: Arc<LoadBalancer>,
138 shutdown_tx: watch::Sender<bool>,
140 shutdown_rx: watch::Receiver<bool>,
142 tls_acceptor: Option<TlsAcceptor>,
144 cert_manager: Option<Arc<CertManager>>,
146 network_policy_checker: Option<NetworkPolicyChecker>,
148 activator: Option<Arc<dyn Activator>>,
151 rps_registry: Option<Arc<RpsRegistry>>,
154}
155
156impl ProxyServer {
157 pub fn new(
159 config: ProxyConfig,
160 registry: Arc<ServiceRegistry>,
161 load_balancer: Arc<LoadBalancer>,
162 ) -> Self {
163 let (shutdown_tx, shutdown_rx) = watch::channel(false);
164
165 Self {
166 config: Arc::new(config),
167 registry,
168 load_balancer,
169 shutdown_tx,
170 shutdown_rx,
171 tls_acceptor: None,
172 cert_manager: None,
173 network_policy_checker: None,
174 activator: None,
175 rps_registry: None,
176 }
177 }
178
179 pub fn with_registry(
181 config: ProxyConfig,
182 registry: Arc<ServiceRegistry>,
183 load_balancer: Arc<LoadBalancer>,
184 ) -> Self {
185 Self::new(config, registry, load_balancer)
186 }
187
188 pub fn with_tls_resolver(
190 config: ProxyConfig,
191 registry: Arc<ServiceRegistry>,
192 load_balancer: Arc<LoadBalancer>,
193 resolver: Arc<SniCertResolver>,
194 ) -> Self {
195 let tls_config = rustls::ServerConfig::builder()
196 .with_no_client_auth()
197 .with_cert_resolver(resolver);
198 let acceptor = TlsAcceptor::from(Arc::new(tls_config));
199 let (shutdown_tx, shutdown_rx) = watch::channel(false);
200
201 Self {
202 config: Arc::new(config),
203 registry,
204 load_balancer,
205 shutdown_tx,
206 shutdown_rx,
207 tls_acceptor: Some(acceptor),
208 cert_manager: None,
209 network_policy_checker: None,
210 activator: None,
211 rps_registry: None,
212 }
213 }
214
215 #[must_use]
217 pub fn with_cert_manager(mut self, cm: Arc<CertManager>) -> Self {
218 self.cert_manager = Some(cm);
219 self
220 }
221
222 #[must_use]
224 pub fn with_network_policy_checker(mut self, checker: NetworkPolicyChecker) -> Self {
225 self.network_policy_checker = Some(checker);
226 self
227 }
228
229 #[must_use]
236 pub fn with_activator(mut self, activator: Arc<dyn Activator>) -> Self {
237 self.activator = Some(activator);
238 self
239 }
240
241 #[must_use]
247 pub fn with_rps_registry(mut self, rps_registry: Arc<RpsRegistry>) -> Self {
248 self.rps_registry = Some(rps_registry);
249 self
250 }
251
252 #[must_use]
254 pub fn has_tls(&self) -> bool {
255 self.tls_acceptor.is_some()
256 }
257
258 #[must_use]
260 pub fn tls_acceptor(&self) -> Option<&TlsAcceptor> {
261 self.tls_acceptor.as_ref()
262 }
263
264 #[must_use]
266 pub fn registry(&self) -> Arc<ServiceRegistry> {
267 self.registry.clone()
268 }
269
270 #[must_use]
277 pub fn cert_manager(&self) -> Option<&Arc<CertManager>> {
278 self.cert_manager.as_ref()
279 }
280
281 #[must_use]
283 pub fn config(&self) -> Arc<ProxyConfig> {
284 self.config.clone()
285 }
286
287 pub fn shutdown(&self) {
289 let _ = self.shutdown_tx.send(true);
290 }
291
292 pub async fn run(&self) -> Result<()> {
299 let addr = self.config.server.http_addr;
300 let listener = TcpListener::bind(addr)
301 .await
302 .map_err(|e| ProxyError::BindFailed {
303 addr,
304 reason: e.to_string(),
305 })?;
306
307 info!(addr = %addr, "HTTP proxy server listening");
308
309 self.accept_loop(listener).await
310 }
311
312 pub async fn run_on(&self, addr: SocketAddr) -> Result<()> {
319 let listener = TcpListener::bind(addr)
320 .await
321 .map_err(|e| ProxyError::BindFailed {
322 addr,
323 reason: e.to_string(),
324 })?;
325
326 info!(addr = %addr, "HTTP proxy server listening");
327
328 self.accept_loop(listener).await
329 }
330
331 pub async fn run_with_retry(&self, addr: SocketAddr) -> Result<()> {
344 let mut shutdown_rx = self.shutdown_rx.clone();
345 let Some(listener) = bind_with_retry(addr, "http", &mut shutdown_rx).await else {
346 return Ok(());
348 };
349
350 info!(addr = %addr, "HTTP ingress server listening");
351
352 self.accept_loop(listener).await
353 }
354
355 pub async fn run_on_listener(&self, listener: TcpListener) -> Result<()> {
365 if let Ok(addr) = listener.local_addr() {
366 info!(addr = %addr, "HTTP proxy server listening (pre-bound)");
367 }
368 self.accept_loop(listener).await
369 }
370
371 async fn accept_loop(&self, listener: TcpListener) -> Result<()> {
372 let mut shutdown_rx = self.shutdown_rx.clone();
373
374 loop {
375 tokio::select! {
376 _ = shutdown_rx.changed() => {
378 if *shutdown_rx.borrow() {
379 info!("Shutting down proxy server");
380 break;
381 }
382 }
383
384 result = listener.accept() => {
386 match result {
387 Ok((stream, remote_addr)) => {
388 let registry = self.registry.clone();
389 let load_balancer = self.load_balancer.clone();
390 let config = self.config.clone();
391 let cert_manager = self.cert_manager.clone();
392 let npc = self.network_policy_checker.clone();
393 let activator = self.activator.clone();
394 let rps_registry = self.rps_registry.clone();
395
396 tokio::spawn(async move {
397 if let Err(e) = Self::handle_connection(
398 stream,
399 remote_addr,
400 registry,
401 load_balancer,
402 config,
403 cert_manager,
404 npc,
405 activator,
406 rps_registry,
407 ).await {
408 debug!(
409 error = %e,
410 remote_addr = %remote_addr,
411 "Connection error"
412 );
413 }
414 });
415 }
416 Err(e) => {
417 warn!(error = %e, "Failed to accept connection");
418 }
419 }
420 }
421 }
422 }
423
424 Ok(())
425 }
426
427 #[allow(clippy::too_many_arguments)]
428 async fn handle_connection(
429 stream: tokio::net::TcpStream,
430 remote_addr: SocketAddr,
431 registry: Arc<ServiceRegistry>,
432 load_balancer: Arc<LoadBalancer>,
433 config: Arc<ProxyConfig>,
434 cert_manager: Option<Arc<CertManager>>,
435 network_policy_checker: Option<NetworkPolicyChecker>,
436 activator: Option<Arc<dyn Activator>>,
437 rps_registry: Option<Arc<RpsRegistry>>,
438 ) -> Result<()> {
439 let io = TokioIo::new(stream);
440
441 let mut service =
442 ReverseProxyService::new(registry, load_balancer, config).with_remote_addr(remote_addr);
443 if let Some(cm) = cert_manager {
444 service = service.with_cert_manager(cm);
445 }
446 if let Some(checker) = network_policy_checker {
447 service = service.with_network_policy_checker(checker);
448 }
449 if let Some(activator) = activator {
450 service = service.with_activator(activator);
451 }
452 if let Some(rps_registry) = rps_registry {
453 service = service.with_rps_registry(rps_registry);
454 }
455
456 let service = service_fn(move |req: Request<Incoming>| {
457 let svc = service.clone();
458 async move {
459 match svc.proxy_request(req).await {
460 Ok(response) => Ok::<_, hyper::Error>(response),
461 Err(e) => {
462 error!(error = %e, "Proxy error");
463 Ok(ReverseProxyService::error_response(&e))
464 }
465 }
466 }
467 });
468
469 http1::Builder::new()
470 .preserve_header_case(true)
471 .title_case_headers(false)
472 .serve_connection(io, service)
473 .with_upgrades()
474 .await
475 .map_err(ProxyError::Hyper)?;
476
477 Ok(())
478 }
479
480 pub async fn run_https(&self) -> Result<()> {
489 let acceptor = self
490 .tls_acceptor
491 .as_ref()
492 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
493
494 let addr = self.config.server.https_addr;
495 let listener = TcpListener::bind(addr)
496 .await
497 .map_err(|e| ProxyError::BindFailed {
498 addr,
499 reason: e.to_string(),
500 })?;
501
502 info!(addr = %addr, "HTTPS proxy server listening");
503
504 self.accept_loop_tls(listener, acceptor.clone()).await
505 }
506
507 pub async fn run_https_on(&self, addr: SocketAddr) -> Result<()> {
514 let acceptor = self
515 .tls_acceptor
516 .as_ref()
517 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
518
519 let listener = TcpListener::bind(addr)
520 .await
521 .map_err(|e| ProxyError::BindFailed {
522 addr,
523 reason: e.to_string(),
524 })?;
525
526 info!(addr = %addr, "HTTPS proxy server listening");
527
528 self.accept_loop_tls(listener, acceptor.clone()).await
529 }
530
531 pub async fn run_https_with_retry(&self, addr: SocketAddr) -> Result<()> {
546 let acceptor = self
547 .tls_acceptor
548 .as_ref()
549 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
550
551 let mut shutdown_rx = self.shutdown_rx.clone();
552 let Some(listener) = bind_with_retry(addr, "https", &mut shutdown_rx).await else {
553 return Ok(());
555 };
556
557 info!(addr = %addr, "HTTPS ingress server listening");
558
559 self.accept_loop_tls(listener, acceptor.clone()).await
560 }
561
562 pub async fn run_https_on_listener(&self, listener: TcpListener) -> Result<()> {
573 let acceptor = self
574 .tls_acceptor
575 .as_ref()
576 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
577
578 if let Ok(addr) = listener.local_addr() {
579 info!(addr = %addr, "HTTPS proxy server listening (pre-bound)");
580 }
581 self.accept_loop_tls(listener, acceptor.clone()).await
582 }
583
584 #[allow(clippy::similar_names)]
594 pub async fn run_both(&self) -> Result<()> {
595 let http_addr = self.config.server.http_addr;
596 let https_addr = self.config.server.https_addr;
597
598 let acceptor = self
599 .tls_acceptor
600 .as_ref()
601 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
602
603 let http_listener =
604 TcpListener::bind(http_addr)
605 .await
606 .map_err(|e| ProxyError::BindFailed {
607 addr: http_addr,
608 reason: e.to_string(),
609 })?;
610
611 let https_listener =
612 TcpListener::bind(https_addr)
613 .await
614 .map_err(|e| ProxyError::BindFailed {
615 addr: https_addr,
616 reason: e.to_string(),
617 })?;
618
619 info!(http = %http_addr, https = %https_addr, "Proxy server listening");
620
621 let http_future = self.accept_loop(http_listener);
623 let https_future = self.accept_loop_tls(https_listener, acceptor.clone());
624
625 tokio::select! {
626 result = http_future => result,
627 result = https_future => result,
628 }
629 }
630
631 async fn accept_loop_tls(&self, listener: TcpListener, acceptor: TlsAcceptor) -> Result<()> {
632 let mut shutdown_rx = self.shutdown_rx.clone();
633
634 loop {
635 tokio::select! {
636 _ = shutdown_rx.changed() => {
638 if *shutdown_rx.borrow() {
639 info!("Shutting down HTTPS proxy server");
640 break;
641 }
642 }
643
644 result = listener.accept() => {
646 match result {
647 Ok((stream, remote_addr)) => {
648 let registry = self.registry.clone();
649 let load_balancer = self.load_balancer.clone();
650 let config = self.config.clone();
651 let acceptor = acceptor.clone();
652 let cert_manager = self.cert_manager.clone();
653 let npc = self.network_policy_checker.clone();
654 let activator = self.activator.clone();
655 let rps_registry = self.rps_registry.clone();
656
657 tokio::spawn(async move {
658 if let Err(e) = Self::handle_tls_connection(
659 stream,
660 remote_addr,
661 registry,
662 load_balancer,
663 config,
664 acceptor,
665 cert_manager,
666 npc,
667 activator,
668 rps_registry,
669 ).await {
670 debug!(
671 error = %e,
672 remote_addr = %remote_addr,
673 "TLS connection error"
674 );
675 }
676 });
677 }
678 Err(e) => {
679 warn!(error = %e, "Failed to accept TLS connection");
680 }
681 }
682 }
683 }
684 }
685
686 Ok(())
687 }
688
689 #[allow(clippy::too_many_arguments)]
690 async fn handle_tls_connection(
691 stream: tokio::net::TcpStream,
692 remote_addr: SocketAddr,
693 registry: Arc<ServiceRegistry>,
694 load_balancer: Arc<LoadBalancer>,
695 config: Arc<ProxyConfig>,
696 acceptor: TlsAcceptor,
697 cert_manager: Option<Arc<CertManager>>,
698 network_policy_checker: Option<NetworkPolicyChecker>,
699 activator: Option<Arc<dyn Activator>>,
700 rps_registry: Option<Arc<RpsRegistry>>,
701 ) -> Result<()> {
702 let sni = Self::peek_sni(&stream).await;
708
709 let managed = match sni.as_deref() {
714 Some(host) => registry.resolve(Some(host), "/").await.is_some(),
715 None => true,
716 };
717
718 if !managed {
719 if let Some(host) = sni {
721 return Self::passthrough_unmanaged(stream, remote_addr, host).await;
722 }
723 }
724
725 let tls_stream = acceptor
729 .accept(stream)
730 .await
731 .map_err(|e| ProxyError::Tls(format!("TLS handshake failed: {e}")))?;
732
733 let io = TokioIo::new(tls_stream);
734
735 let mut service = ReverseProxyService::new(registry, load_balancer, config)
736 .with_remote_addr(remote_addr)
737 .with_tls(true);
738 if let Some(cm) = cert_manager {
739 service = service.with_cert_manager(cm);
740 }
741 if let Some(checker) = network_policy_checker {
742 service = service.with_network_policy_checker(checker);
743 }
744 if let Some(activator) = activator {
745 service = service.with_activator(activator);
746 }
747 if let Some(rps_registry) = rps_registry {
748 service = service.with_rps_registry(rps_registry);
749 }
750
751 let service = service_fn(move |req: Request<Incoming>| {
752 let svc = service.clone();
753 async move {
754 match svc.proxy_request(req).await {
755 Ok(response) => Ok::<_, hyper::Error>(response),
756 Err(e) => {
757 error!(error = %e, "Proxy error");
758 Ok(ReverseProxyService::error_response(&e))
759 }
760 }
761 }
762 });
763
764 http1::Builder::new()
765 .preserve_header_case(true)
766 .title_case_headers(false)
767 .serve_connection(io, service)
768 .with_upgrades()
769 .await
770 .map_err(ProxyError::Hyper)?;
771
772 Ok(())
773 }
774
775 async fn peek_sni(stream: &TcpStream) -> Option<String> {
784 const MAX_PEEK: usize = 8192;
786 const PEEK_TIMEOUT: Duration = Duration::from_secs(5);
788
789 let mut buf = vec![0u8; MAX_PEEK];
790 let result = tokio::time::timeout(PEEK_TIMEOUT, async {
791 loop {
792 let n = match stream.peek(&mut buf).await {
793 Ok(n) if n > 0 => n,
795 _ => return None,
796 };
797
798 if n >= 5 {
801 let record_len = ((buf[3] as usize) << 8) | buf[4] as usize;
802 let needed = (5 + record_len).min(MAX_PEEK);
803 if n >= needed {
804 return crate::sni_peek::parse_sni(&buf[..n]);
805 }
806 }
807 if n >= MAX_PEEK {
808 return crate::sni_peek::parse_sni(&buf[..n]);
809 }
810
811 tokio::time::sleep(Duration::from_millis(5)).await;
814 }
815 })
816 .await;
817
818 result.ok().flatten()
819 }
820
821 async fn passthrough_unmanaged(
829 client: TcpStream,
830 remote_addr: SocketAddr,
831 host: String,
832 ) -> Result<()> {
833 let mut addrs = match lookup_host((host.as_str(), 443u16)).await {
834 Ok(it) => it,
835 Err(e) => {
836 debug!(host = %host, error = %e, "TLS passthrough DNS lookup failed; dropping");
837 return Ok(());
838 }
839 };
840
841 let Some(upstream_addr) = addrs.find(|a| !Self::is_self_addr(&a.ip())) else {
844 warn!(
845 host = %host,
846 "TLS passthrough refused: SNI resolves only to loopback/unspecified (loop guard)"
847 );
848 return Ok(());
849 };
850
851 let upstream = match TcpStream::connect(upstream_addr).await {
852 Ok(s) => s,
853 Err(e) => {
854 debug!(
855 host = %host,
856 upstream = %upstream_addr,
857 error = %e,
858 "TLS passthrough connect failed; dropping"
859 );
860 return Ok(());
861 }
862 };
863
864 debug!(
865 host = %host,
866 upstream = %upstream_addr,
867 client = %remote_addr,
868 "TLS passthrough (unmanaged SNI) -> upstream"
869 );
870 crate::stream::TcpStreamService::splice(client, upstream).await;
871 Ok(())
872 }
873
874 fn is_self_addr(ip: &IpAddr) -> bool {
877 ip.is_loopback() || ip.is_unspecified()
878 }
879}
880
881#[cfg(test)]
882mod tests {
883 use super::*;
884 use crate::lb::LoadBalancer;
885 use crate::routes::{ResolvedService, RouteEntry};
886 use zlayer_spec::{ExposeType, Protocol};
887
888 fn make_entry(
890 service: &str,
891 host: Option<&str>,
892 path: &str,
893 backends: Vec<SocketAddr>,
894 ) -> RouteEntry {
895 RouteEntry {
896 service_name: service.to_string(),
897 endpoint_name: "http".to_string(),
898 host: host.map(std::string::ToString::to_string),
899 path_prefix: path.to_string(),
900 resolved: ResolvedService {
901 name: service.to_string(),
902 backends,
903 use_tls: false,
904 sni_hostname: String::new(),
905 expose: ExposeType::Public,
906 protocol: Protocol::Http,
907 strip_prefix: false,
908 path_prefix: path.to_string(),
909 target_port: 8080,
910 },
911 }
912 }
913
914 use tokio::io::{AsyncReadExt, AsyncWriteExt};
915
916 async fn roundtrip(
924 registry: Arc<ServiceRegistry>,
925 load_balancer: Arc<LoadBalancer>,
926 cert_manager: Option<Arc<CertManager>>,
927 raw_request: &str,
928 ) -> String {
929 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
930 let addr = listener.local_addr().unwrap();
931
932 let server = tokio::spawn(async move {
933 let (stream, remote_addr) = listener.accept().await.unwrap();
934 let _ = ProxyServer::handle_connection(
935 stream,
936 remote_addr,
937 registry,
938 load_balancer,
939 Arc::new(ProxyConfig::default()),
940 cert_manager,
941 None,
942 None,
943 None,
944 )
945 .await;
946 });
947
948 let mut client = tokio::net::TcpStream::connect(addr).await.unwrap();
949 client.write_all(raw_request.as_bytes()).await.unwrap();
950 client.flush().await.unwrap();
951
952 let mut buf = Vec::new();
953 let _ = client.read_to_end(&mut buf).await;
955 server.abort();
956
957 String::from_utf8_lossy(&buf).into_owned()
958 }
959
960 #[tokio::test]
964 async fn test_unmatched_host_denied_404_generic_body() {
965 let registry = Arc::new(ServiceRegistry::new());
966 registry
969 .register(make_entry(
970 "known",
971 Some("known.example.com"),
972 "/",
973 vec!["127.0.0.1:9".parse().unwrap()],
974 ))
975 .await;
976 let lb = Arc::new(LoadBalancer::new());
977
978 let resp = roundtrip(
979 registry,
980 lb,
981 None,
982 "GET /secret/path HTTP/1.1\r\nHost: attacker.unregistered.test\r\nConnection: close\r\n\r\n",
983 )
984 .await;
985
986 assert!(
987 resp.starts_with("HTTP/1.1 404"),
988 "unmatched host must be denied with 404, got: {resp}"
989 );
990 assert!(
992 !resp.contains("attacker.unregistered.test"),
993 "response must not echo the requested host: {resp}"
994 );
995 assert!(
996 !resp.contains("/secret/path"),
997 "response must not echo the requested path: {resp}"
998 );
999 assert!(
1000 resp.contains("404 Not Found"),
1001 "response should carry the generic 404 body: {resp}"
1002 );
1003 }
1004
1005 #[tokio::test]
1008 async fn test_acme_challenge_served_not_denied() {
1009 let tmp = tempfile::tempdir().unwrap();
1010 let cm = Arc::new(
1011 CertManager::new(tmp.path().to_string_lossy().into_owned(), None)
1012 .await
1013 .unwrap(),
1014 );
1015 let token = "test-token-abc";
1016 cm.store_challenge(token, "example.com", "key-auth-payload-123");
1017
1018 let registry = Arc::new(ServiceRegistry::new());
1021 let lb = Arc::new(LoadBalancer::new());
1022
1023 let resp = roundtrip(
1024 registry,
1025 lb,
1026 Some(cm),
1027 &format!(
1028 "GET /.well-known/acme-challenge/{token} HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"
1029 ),
1030 )
1031 .await;
1032
1033 assert!(
1034 resp.starts_with("HTTP/1.1 200"),
1035 "ACME challenge must be served with 200, got: {resp}"
1036 );
1037 assert!(
1038 resp.contains("key-auth-payload-123"),
1039 "ACME challenge must return the key authorization: {resp}"
1040 );
1041 }
1042
1043 #[tokio::test]
1048 async fn test_acme_challenge_unknown_token_404_not_forbidden() {
1049 let tmp = tempfile::tempdir().unwrap();
1050 let cm = Arc::new(
1051 CertManager::new(tmp.path().to_string_lossy().into_owned(), None)
1052 .await
1053 .unwrap(),
1054 );
1055 let registry = Arc::new(ServiceRegistry::new());
1062 registry
1063 .register(make_entry(
1064 "known",
1065 Some("known.example.com"),
1066 "/",
1067 vec!["127.0.0.1:9".parse().unwrap()],
1068 ))
1069 .await;
1070 let lb = Arc::new(LoadBalancer::new());
1071
1072 let resp = roundtrip(
1073 registry,
1074 lb,
1075 Some(cm),
1076 "GET /.well-known/acme-challenge/does-not-exist HTTP/1.1\r\nHost: console.zatabase.io\r\nConnection: close\r\n\r\n",
1077 )
1078 .await;
1079
1080 assert!(
1081 resp.starts_with("HTTP/1.1 404"),
1082 "unknown ACME token must return 404, got: {resp}"
1083 );
1084 assert!(
1085 !resp.starts_with("HTTP/1.1 403"),
1086 "unknown ACME token must NOT fall through to vhost 403: {resp}"
1087 );
1088 assert!(
1089 resp.contains("ACME challenge token not found"),
1090 "404 should carry the ACME-specific body: {resp}"
1091 );
1092 }
1093
1094 #[tokio::test]
1098 async fn test_matched_no_backends_503_generic_body() {
1099 let registry = Arc::new(ServiceRegistry::new());
1100 let lb_group = "prod/api#http-secret-group";
1103 let mut entry = make_entry("api", Some("api.example.com"), "/", vec![]);
1104 entry.resolved.name = lb_group.to_string();
1105 registry.register(entry).await;
1106
1107 let lb = Arc::new(LoadBalancer::new());
1110
1111 let resp = roundtrip(
1112 registry,
1113 lb,
1114 None,
1115 "GET / HTTP/1.1\r\nHost: api.example.com\r\nConnection: close\r\n\r\n",
1116 )
1117 .await;
1118
1119 assert!(
1120 resp.starts_with("HTTP/1.1 503"),
1121 "matched route with no healthy backends must return 503, got: {resp}"
1122 );
1123 assert!(
1124 !resp.contains(lb_group),
1125 "503 body must not leak the internal LB group name: {resp}"
1126 );
1127 assert!(
1128 resp.contains("503 Service Unavailable"),
1129 "response should carry the generic 503 body: {resp}"
1130 );
1131 }
1132
1133 #[tokio::test]
1134 async fn test_server_shutdown() {
1135 let registry = Arc::new(ServiceRegistry::new());
1136 let lb = Arc::new(LoadBalancer::new());
1137 let server = ProxyServer::new(ProxyConfig::default(), registry, lb);
1138
1139 let shutdown_tx = server.shutdown_tx.clone();
1141
1142 let _ = shutdown_tx.send(true);
1144
1145 }
1148
1149 #[tokio::test]
1150 async fn test_registry_integration() {
1151 let registry = Arc::new(ServiceRegistry::new());
1152
1153 registry
1155 .register(make_entry(
1156 "test-service",
1157 None,
1158 "/api",
1159 vec!["127.0.0.1:8081".parse().unwrap()],
1160 ))
1161 .await;
1162
1163 let lb = Arc::new(LoadBalancer::new());
1164 let server = ProxyServer::new(ProxyConfig::default(), registry, lb);
1165
1166 let reg = server.registry();
1168 assert_eq!(reg.route_count().await, 1);
1169 }
1170
1171 #[test]
1172 fn test_next_ingress_backoff_doubles_then_caps() {
1173 assert_eq!(
1175 next_ingress_backoff(INGRESS_BIND_BACKOFF_INITIAL),
1176 INGRESS_BIND_BACKOFF_INITIAL * 2
1177 );
1178 let mut d = INGRESS_BIND_BACKOFF_INITIAL;
1180 for _ in 0..20 {
1181 d = next_ingress_backoff(d);
1182 }
1183 assert_eq!(d, INGRESS_BIND_BACKOFF_MAX);
1184 assert_eq!(
1186 next_ingress_backoff(INGRESS_BIND_BACKOFF_MAX),
1187 INGRESS_BIND_BACKOFF_MAX
1188 );
1189 }
1190
1191 #[test]
1192 fn test_should_warn_cadence() {
1193 assert!(should_warn_on_attempt(0));
1195 assert!(!should_warn_on_attempt(1));
1197 assert!(!should_warn_on_attempt(INGRESS_BIND_WARN_EVERY - 1));
1198 assert!(should_warn_on_attempt(INGRESS_BIND_WARN_EVERY));
1200 assert!(should_warn_on_attempt(INGRESS_BIND_WARN_EVERY * 2));
1201 }
1202
1203 #[tokio::test]
1204 async fn test_bind_with_retry_succeeds_after_initial_conflict() {
1205 let held = TcpListener::bind("127.0.0.1:0").await.unwrap();
1208 let addr = held.local_addr().unwrap();
1209
1210 let (_tx, mut rx) = watch::channel(false);
1211 let handle = tokio::spawn(async move { bind_with_retry(addr, "test", &mut rx).await });
1212
1213 tokio::time::sleep(Duration::from_millis(50)).await;
1215 drop(held);
1217
1218 let bound = tokio::time::timeout(Duration::from_secs(10), handle)
1219 .await
1220 .expect("bind_with_retry did not finish")
1221 .expect("task panicked");
1222 let listener = bound.expect("expected a bound listener, got None");
1223 assert_eq!(listener.local_addr().unwrap().port(), addr.port());
1224 }
1225
1226 #[tokio::test]
1227 async fn test_bind_with_retry_returns_none_on_shutdown() {
1228 let held = TcpListener::bind("127.0.0.1:0").await.unwrap();
1231 let addr = held.local_addr().unwrap();
1232
1233 let (tx, mut rx) = watch::channel(false);
1234 let handle = tokio::spawn(async move { bind_with_retry(addr, "test", &mut rx).await });
1235
1236 tokio::time::sleep(Duration::from_millis(50)).await;
1237 tx.send(true).unwrap();
1238
1239 let result = tokio::time::timeout(Duration::from_secs(10), handle)
1240 .await
1241 .expect("bind_with_retry did not respond to shutdown")
1242 .expect("task panicked");
1243 assert!(result.is_none(), "shutdown should yield None");
1244 }
1245}