1use crate::acme::CertManager;
7use crate::config::ProxyConfig;
8use crate::error::{ProxyError, Result};
9use crate::lb::LoadBalancer;
10use crate::network_policy::NetworkPolicyChecker;
11use crate::routes::ServiceRegistry;
12use crate::service::ReverseProxyService;
13use crate::sni_resolver::SniCertResolver;
14use hyper::body::Incoming;
15use hyper::server::conn::http1;
16use hyper::service::service_fn;
17use hyper::Request;
18use hyper_util::rt::TokioIo;
19use std::net::SocketAddr;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::net::TcpListener;
23use tokio::sync::watch;
24use tokio_rustls::TlsAcceptor;
25use tracing::{debug, error, info, warn};
26
27const INGRESS_BIND_BACKOFF_INITIAL: Duration = Duration::from_secs(2);
29
30const INGRESS_BIND_BACKOFF_MAX: Duration = Duration::from_secs(30);
33
34const INGRESS_BIND_WARN_EVERY: u64 = 30;
39
40#[must_use]
46fn next_ingress_backoff(current: Duration) -> Duration {
47 (current * 2).min(INGRESS_BIND_BACKOFF_MAX)
48}
49
50#[must_use]
56fn should_warn_on_attempt(attempt: u64) -> bool {
57 attempt == 0 || attempt % INGRESS_BIND_WARN_EVERY == 0
58}
59
60async fn bind_with_retry(
71 addr: SocketAddr,
72 label: &str,
73 shutdown_rx: &mut watch::Receiver<bool>,
74) -> Option<TcpListener> {
75 let mut attempt: u64 = 0;
76 let mut backoff = INGRESS_BIND_BACKOFF_INITIAL;
77 let mut warned_eacces = false;
78
79 loop {
80 if *shutdown_rx.borrow() {
81 return None;
82 }
83
84 match TcpListener::bind(addr).await {
85 Ok(listener) => {
86 if attempt > 0 {
87 info!(addr = %addr, label, attempt, "Ingress bound after retrying");
88 }
89 return Some(listener);
90 }
91 Err(e) => {
92 let is_eacces = e.kind() == std::io::ErrorKind::PermissionDenied;
93 if is_eacces && !warned_eacces {
94 warned_eacces = true;
95 warn!(
96 addr = %addr,
97 label,
98 error = %e,
99 "Ingress bind denied: binding 80/443 needs root or CAP_NET_BIND_SERVICE; \
100 will keep retrying without aborting startup"
101 );
102 } else if should_warn_on_attempt(attempt) {
103 warn!(
104 addr = %addr,
105 label,
106 attempt,
107 error = %e,
108 "Ingress bind failed; will keep retrying (port may be held by another process)"
109 );
110 } else {
111 debug!(addr = %addr, label, attempt, error = %e, "Ingress bind retry");
112 }
113
114 tokio::select! {
115 () = tokio::time::sleep(backoff) => {}
116 _ = shutdown_rx.changed() => {
117 if *shutdown_rx.borrow() {
118 return None;
119 }
120 }
121 }
122
123 attempt = attempt.saturating_add(1);
124 backoff = next_ingress_backoff(backoff);
125 }
126 }
127 }
128}
129
130pub struct ProxyServer {
132 config: Arc<ProxyConfig>,
134 registry: Arc<ServiceRegistry>,
136 load_balancer: Arc<LoadBalancer>,
138 shutdown_tx: watch::Sender<bool>,
140 shutdown_rx: watch::Receiver<bool>,
142 tls_acceptor: Option<TlsAcceptor>,
144 cert_manager: Option<Arc<CertManager>>,
146 network_policy_checker: Option<NetworkPolicyChecker>,
148}
149
150impl ProxyServer {
151 pub fn new(
153 config: ProxyConfig,
154 registry: Arc<ServiceRegistry>,
155 load_balancer: Arc<LoadBalancer>,
156 ) -> Self {
157 let (shutdown_tx, shutdown_rx) = watch::channel(false);
158
159 Self {
160 config: Arc::new(config),
161 registry,
162 load_balancer,
163 shutdown_tx,
164 shutdown_rx,
165 tls_acceptor: None,
166 cert_manager: None,
167 network_policy_checker: None,
168 }
169 }
170
171 pub fn with_registry(
173 config: ProxyConfig,
174 registry: Arc<ServiceRegistry>,
175 load_balancer: Arc<LoadBalancer>,
176 ) -> Self {
177 Self::new(config, registry, load_balancer)
178 }
179
180 pub fn with_tls_resolver(
182 config: ProxyConfig,
183 registry: Arc<ServiceRegistry>,
184 load_balancer: Arc<LoadBalancer>,
185 resolver: Arc<SniCertResolver>,
186 ) -> Self {
187 let tls_config = rustls::ServerConfig::builder()
188 .with_no_client_auth()
189 .with_cert_resolver(resolver);
190 let acceptor = TlsAcceptor::from(Arc::new(tls_config));
191 let (shutdown_tx, shutdown_rx) = watch::channel(false);
192
193 Self {
194 config: Arc::new(config),
195 registry,
196 load_balancer,
197 shutdown_tx,
198 shutdown_rx,
199 tls_acceptor: Some(acceptor),
200 cert_manager: None,
201 network_policy_checker: None,
202 }
203 }
204
205 #[must_use]
207 pub fn with_cert_manager(mut self, cm: Arc<CertManager>) -> Self {
208 self.cert_manager = Some(cm);
209 self
210 }
211
212 #[must_use]
214 pub fn with_network_policy_checker(mut self, checker: NetworkPolicyChecker) -> Self {
215 self.network_policy_checker = Some(checker);
216 self
217 }
218
219 #[must_use]
221 pub fn has_tls(&self) -> bool {
222 self.tls_acceptor.is_some()
223 }
224
225 #[must_use]
227 pub fn tls_acceptor(&self) -> Option<&TlsAcceptor> {
228 self.tls_acceptor.as_ref()
229 }
230
231 #[must_use]
233 pub fn registry(&self) -> Arc<ServiceRegistry> {
234 self.registry.clone()
235 }
236
237 #[must_use]
239 pub fn config(&self) -> Arc<ProxyConfig> {
240 self.config.clone()
241 }
242
243 pub fn shutdown(&self) {
245 let _ = self.shutdown_tx.send(true);
246 }
247
248 pub async fn run(&self) -> Result<()> {
255 let addr = self.config.server.http_addr;
256 let listener = TcpListener::bind(addr)
257 .await
258 .map_err(|e| ProxyError::BindFailed {
259 addr,
260 reason: e.to_string(),
261 })?;
262
263 info!(addr = %addr, "HTTP proxy server listening");
264
265 self.accept_loop(listener).await
266 }
267
268 pub async fn run_on(&self, addr: SocketAddr) -> Result<()> {
275 let listener = TcpListener::bind(addr)
276 .await
277 .map_err(|e| ProxyError::BindFailed {
278 addr,
279 reason: e.to_string(),
280 })?;
281
282 info!(addr = %addr, "HTTP proxy server listening");
283
284 self.accept_loop(listener).await
285 }
286
287 pub async fn run_with_retry(&self, addr: SocketAddr) -> Result<()> {
300 let mut shutdown_rx = self.shutdown_rx.clone();
301 let Some(listener) = bind_with_retry(addr, "http", &mut shutdown_rx).await else {
302 return Ok(());
304 };
305
306 info!(addr = %addr, "HTTP ingress server listening");
307
308 self.accept_loop(listener).await
309 }
310
311 async fn accept_loop(&self, listener: TcpListener) -> Result<()> {
312 let mut shutdown_rx = self.shutdown_rx.clone();
313
314 loop {
315 tokio::select! {
316 _ = shutdown_rx.changed() => {
318 if *shutdown_rx.borrow() {
319 info!("Shutting down proxy server");
320 break;
321 }
322 }
323
324 result = listener.accept() => {
326 match result {
327 Ok((stream, remote_addr)) => {
328 let registry = self.registry.clone();
329 let load_balancer = self.load_balancer.clone();
330 let config = self.config.clone();
331 let cert_manager = self.cert_manager.clone();
332 let npc = self.network_policy_checker.clone();
333
334 tokio::spawn(async move {
335 if let Err(e) = Self::handle_connection(
336 stream,
337 remote_addr,
338 registry,
339 load_balancer,
340 config,
341 cert_manager,
342 npc,
343 ).await {
344 debug!(
345 error = %e,
346 remote_addr = %remote_addr,
347 "Connection error"
348 );
349 }
350 });
351 }
352 Err(e) => {
353 warn!(error = %e, "Failed to accept connection");
354 }
355 }
356 }
357 }
358 }
359
360 Ok(())
361 }
362
363 #[allow(clippy::too_many_arguments)]
364 async fn handle_connection(
365 stream: tokio::net::TcpStream,
366 remote_addr: SocketAddr,
367 registry: Arc<ServiceRegistry>,
368 load_balancer: Arc<LoadBalancer>,
369 config: Arc<ProxyConfig>,
370 cert_manager: Option<Arc<CertManager>>,
371 network_policy_checker: Option<NetworkPolicyChecker>,
372 ) -> Result<()> {
373 let io = TokioIo::new(stream);
374
375 let mut service =
376 ReverseProxyService::new(registry, load_balancer, config).with_remote_addr(remote_addr);
377 if let Some(cm) = cert_manager {
378 service = service.with_cert_manager(cm);
379 }
380 if let Some(checker) = network_policy_checker {
381 service = service.with_network_policy_checker(checker);
382 }
383
384 let service = service_fn(move |req: Request<Incoming>| {
385 let svc = service.clone();
386 async move {
387 match svc.proxy_request(req).await {
388 Ok(response) => Ok::<_, hyper::Error>(response),
389 Err(e) => {
390 error!(error = %e, "Proxy error");
391 Ok(ReverseProxyService::error_response(&e))
392 }
393 }
394 }
395 });
396
397 http1::Builder::new()
398 .preserve_header_case(true)
399 .title_case_headers(false)
400 .serve_connection(io, service)
401 .with_upgrades()
402 .await
403 .map_err(ProxyError::Hyper)?;
404
405 Ok(())
406 }
407
408 pub async fn run_https(&self) -> Result<()> {
417 let acceptor = self
418 .tls_acceptor
419 .as_ref()
420 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
421
422 let addr = self.config.server.https_addr;
423 let listener = TcpListener::bind(addr)
424 .await
425 .map_err(|e| ProxyError::BindFailed {
426 addr,
427 reason: e.to_string(),
428 })?;
429
430 info!(addr = %addr, "HTTPS proxy server listening");
431
432 self.accept_loop_tls(listener, acceptor.clone()).await
433 }
434
435 pub async fn run_https_on(&self, addr: SocketAddr) -> Result<()> {
442 let acceptor = self
443 .tls_acceptor
444 .as_ref()
445 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
446
447 let listener = TcpListener::bind(addr)
448 .await
449 .map_err(|e| ProxyError::BindFailed {
450 addr,
451 reason: e.to_string(),
452 })?;
453
454 info!(addr = %addr, "HTTPS proxy server listening");
455
456 self.accept_loop_tls(listener, acceptor.clone()).await
457 }
458
459 pub async fn run_https_with_retry(&self, addr: SocketAddr) -> Result<()> {
474 let acceptor = self
475 .tls_acceptor
476 .as_ref()
477 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
478
479 let mut shutdown_rx = self.shutdown_rx.clone();
480 let Some(listener) = bind_with_retry(addr, "https", &mut shutdown_rx).await else {
481 return Ok(());
483 };
484
485 info!(addr = %addr, "HTTPS ingress server listening");
486
487 self.accept_loop_tls(listener, acceptor.clone()).await
488 }
489
490 #[allow(clippy::similar_names)]
500 pub async fn run_both(&self) -> Result<()> {
501 let http_addr = self.config.server.http_addr;
502 let https_addr = self.config.server.https_addr;
503
504 let acceptor = self
505 .tls_acceptor
506 .as_ref()
507 .ok_or_else(|| ProxyError::Config("TLS not configured".to_string()))?;
508
509 let http_listener =
510 TcpListener::bind(http_addr)
511 .await
512 .map_err(|e| ProxyError::BindFailed {
513 addr: http_addr,
514 reason: e.to_string(),
515 })?;
516
517 let https_listener =
518 TcpListener::bind(https_addr)
519 .await
520 .map_err(|e| ProxyError::BindFailed {
521 addr: https_addr,
522 reason: e.to_string(),
523 })?;
524
525 info!(http = %http_addr, https = %https_addr, "Proxy server listening");
526
527 let http_future = self.accept_loop(http_listener);
529 let https_future = self.accept_loop_tls(https_listener, acceptor.clone());
530
531 tokio::select! {
532 result = http_future => result,
533 result = https_future => result,
534 }
535 }
536
537 async fn accept_loop_tls(&self, listener: TcpListener, acceptor: TlsAcceptor) -> Result<()> {
538 let mut shutdown_rx = self.shutdown_rx.clone();
539
540 loop {
541 tokio::select! {
542 _ = shutdown_rx.changed() => {
544 if *shutdown_rx.borrow() {
545 info!("Shutting down HTTPS proxy server");
546 break;
547 }
548 }
549
550 result = listener.accept() => {
552 match result {
553 Ok((stream, remote_addr)) => {
554 let registry = self.registry.clone();
555 let load_balancer = self.load_balancer.clone();
556 let config = self.config.clone();
557 let acceptor = acceptor.clone();
558 let cert_manager = self.cert_manager.clone();
559 let npc = self.network_policy_checker.clone();
560
561 tokio::spawn(async move {
562 if let Err(e) = Self::handle_tls_connection(
563 stream,
564 remote_addr,
565 registry,
566 load_balancer,
567 config,
568 acceptor,
569 cert_manager,
570 npc,
571 ).await {
572 debug!(
573 error = %e,
574 remote_addr = %remote_addr,
575 "TLS connection error"
576 );
577 }
578 });
579 }
580 Err(e) => {
581 warn!(error = %e, "Failed to accept TLS connection");
582 }
583 }
584 }
585 }
586 }
587
588 Ok(())
589 }
590
591 #[allow(clippy::too_many_arguments)]
592 async fn handle_tls_connection(
593 stream: tokio::net::TcpStream,
594 remote_addr: SocketAddr,
595 registry: Arc<ServiceRegistry>,
596 load_balancer: Arc<LoadBalancer>,
597 config: Arc<ProxyConfig>,
598 acceptor: TlsAcceptor,
599 cert_manager: Option<Arc<CertManager>>,
600 network_policy_checker: Option<NetworkPolicyChecker>,
601 ) -> Result<()> {
602 let tls_stream = acceptor
604 .accept(stream)
605 .await
606 .map_err(|e| ProxyError::Tls(format!("TLS handshake failed: {e}")))?;
607
608 let io = TokioIo::new(tls_stream);
609
610 let mut service = ReverseProxyService::new(registry, load_balancer, config)
611 .with_remote_addr(remote_addr)
612 .with_tls(true);
613 if let Some(cm) = cert_manager {
614 service = service.with_cert_manager(cm);
615 }
616 if let Some(checker) = network_policy_checker {
617 service = service.with_network_policy_checker(checker);
618 }
619
620 let service = service_fn(move |req: Request<Incoming>| {
621 let svc = service.clone();
622 async move {
623 match svc.proxy_request(req).await {
624 Ok(response) => Ok::<_, hyper::Error>(response),
625 Err(e) => {
626 error!(error = %e, "Proxy error");
627 Ok(ReverseProxyService::error_response(&e))
628 }
629 }
630 }
631 });
632
633 http1::Builder::new()
634 .preserve_header_case(true)
635 .title_case_headers(false)
636 .serve_connection(io, service)
637 .with_upgrades()
638 .await
639 .map_err(ProxyError::Hyper)?;
640
641 Ok(())
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use crate::lb::LoadBalancer;
649 use crate::routes::{ResolvedService, RouteEntry};
650 use zlayer_spec::{ExposeType, Protocol};
651
652 fn make_entry(
654 service: &str,
655 host: Option<&str>,
656 path: &str,
657 backends: Vec<SocketAddr>,
658 ) -> RouteEntry {
659 RouteEntry {
660 service_name: service.to_string(),
661 endpoint_name: "http".to_string(),
662 host: host.map(std::string::ToString::to_string),
663 path_prefix: path.to_string(),
664 resolved: ResolvedService {
665 name: service.to_string(),
666 backends,
667 use_tls: false,
668 sni_hostname: String::new(),
669 expose: ExposeType::Public,
670 protocol: Protocol::Http,
671 strip_prefix: false,
672 path_prefix: path.to_string(),
673 target_port: 8080,
674 },
675 }
676 }
677
678 use tokio::io::{AsyncReadExt, AsyncWriteExt};
679
680 async fn roundtrip(
688 registry: Arc<ServiceRegistry>,
689 load_balancer: Arc<LoadBalancer>,
690 cert_manager: Option<Arc<CertManager>>,
691 raw_request: &str,
692 ) -> String {
693 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
694 let addr = listener.local_addr().unwrap();
695
696 let server = tokio::spawn(async move {
697 let (stream, remote_addr) = listener.accept().await.unwrap();
698 let _ = ProxyServer::handle_connection(
699 stream,
700 remote_addr,
701 registry,
702 load_balancer,
703 Arc::new(ProxyConfig::default()),
704 cert_manager,
705 None,
706 )
707 .await;
708 });
709
710 let mut client = tokio::net::TcpStream::connect(addr).await.unwrap();
711 client.write_all(raw_request.as_bytes()).await.unwrap();
712 client.flush().await.unwrap();
713
714 let mut buf = Vec::new();
715 let _ = client.read_to_end(&mut buf).await;
717 server.abort();
718
719 String::from_utf8_lossy(&buf).into_owned()
720 }
721
722 #[tokio::test]
726 async fn test_unmatched_host_denied_404_generic_body() {
727 let registry = Arc::new(ServiceRegistry::new());
728 registry
731 .register(make_entry(
732 "known",
733 Some("known.example.com"),
734 "/",
735 vec!["127.0.0.1:9".parse().unwrap()],
736 ))
737 .await;
738 let lb = Arc::new(LoadBalancer::new());
739
740 let resp = roundtrip(
741 registry,
742 lb,
743 None,
744 "GET /secret/path HTTP/1.1\r\nHost: attacker.unregistered.test\r\nConnection: close\r\n\r\n",
745 )
746 .await;
747
748 assert!(
749 resp.starts_with("HTTP/1.1 404"),
750 "unmatched host must be denied with 404, got: {resp}"
751 );
752 assert!(
754 !resp.contains("attacker.unregistered.test"),
755 "response must not echo the requested host: {resp}"
756 );
757 assert!(
758 !resp.contains("/secret/path"),
759 "response must not echo the requested path: {resp}"
760 );
761 assert!(
762 resp.contains("404 Not Found"),
763 "response should carry the generic 404 body: {resp}"
764 );
765 }
766
767 #[tokio::test]
770 async fn test_acme_challenge_served_not_denied() {
771 let tmp = tempfile::tempdir().unwrap();
772 let cm = Arc::new(
773 CertManager::new(tmp.path().to_string_lossy().into_owned(), None)
774 .await
775 .unwrap(),
776 );
777 let token = "test-token-abc";
778 cm.store_challenge(token, "example.com", "key-auth-payload-123");
779
780 let registry = Arc::new(ServiceRegistry::new());
783 let lb = Arc::new(LoadBalancer::new());
784
785 let resp = roundtrip(
786 registry,
787 lb,
788 Some(cm),
789 &format!(
790 "GET /.well-known/acme-challenge/{token} HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"
791 ),
792 )
793 .await;
794
795 assert!(
796 resp.starts_with("HTTP/1.1 200"),
797 "ACME challenge must be served with 200, got: {resp}"
798 );
799 assert!(
800 resp.contains("key-auth-payload-123"),
801 "ACME challenge must return the key authorization: {resp}"
802 );
803 }
804
805 #[tokio::test]
809 async fn test_matched_no_backends_503_generic_body() {
810 let registry = Arc::new(ServiceRegistry::new());
811 let lb_group = "prod/api#http-secret-group";
814 let mut entry = make_entry("api", Some("api.example.com"), "/", vec![]);
815 entry.resolved.name = lb_group.to_string();
816 registry.register(entry).await;
817
818 let lb = Arc::new(LoadBalancer::new());
821
822 let resp = roundtrip(
823 registry,
824 lb,
825 None,
826 "GET / HTTP/1.1\r\nHost: api.example.com\r\nConnection: close\r\n\r\n",
827 )
828 .await;
829
830 assert!(
831 resp.starts_with("HTTP/1.1 503"),
832 "matched route with no healthy backends must return 503, got: {resp}"
833 );
834 assert!(
835 !resp.contains(lb_group),
836 "503 body must not leak the internal LB group name: {resp}"
837 );
838 assert!(
839 resp.contains("503 Service Unavailable"),
840 "response should carry the generic 503 body: {resp}"
841 );
842 }
843
844 #[tokio::test]
845 async fn test_server_shutdown() {
846 let registry = Arc::new(ServiceRegistry::new());
847 let lb = Arc::new(LoadBalancer::new());
848 let server = ProxyServer::new(ProxyConfig::default(), registry, lb);
849
850 let shutdown_tx = server.shutdown_tx.clone();
852
853 let _ = shutdown_tx.send(true);
855
856 }
859
860 #[tokio::test]
861 async fn test_registry_integration() {
862 let registry = Arc::new(ServiceRegistry::new());
863
864 registry
866 .register(make_entry(
867 "test-service",
868 None,
869 "/api",
870 vec!["127.0.0.1:8081".parse().unwrap()],
871 ))
872 .await;
873
874 let lb = Arc::new(LoadBalancer::new());
875 let server = ProxyServer::new(ProxyConfig::default(), registry, lb);
876
877 let reg = server.registry();
879 assert_eq!(reg.route_count().await, 1);
880 }
881
882 #[test]
883 fn test_next_ingress_backoff_doubles_then_caps() {
884 assert_eq!(
886 next_ingress_backoff(INGRESS_BIND_BACKOFF_INITIAL),
887 INGRESS_BIND_BACKOFF_INITIAL * 2
888 );
889 let mut d = INGRESS_BIND_BACKOFF_INITIAL;
891 for _ in 0..20 {
892 d = next_ingress_backoff(d);
893 }
894 assert_eq!(d, INGRESS_BIND_BACKOFF_MAX);
895 assert_eq!(
897 next_ingress_backoff(INGRESS_BIND_BACKOFF_MAX),
898 INGRESS_BIND_BACKOFF_MAX
899 );
900 }
901
902 #[test]
903 fn test_should_warn_cadence() {
904 assert!(should_warn_on_attempt(0));
906 assert!(!should_warn_on_attempt(1));
908 assert!(!should_warn_on_attempt(INGRESS_BIND_WARN_EVERY - 1));
909 assert!(should_warn_on_attempt(INGRESS_BIND_WARN_EVERY));
911 assert!(should_warn_on_attempt(INGRESS_BIND_WARN_EVERY * 2));
912 }
913
914 #[tokio::test]
915 async fn test_bind_with_retry_succeeds_after_initial_conflict() {
916 let held = TcpListener::bind("127.0.0.1:0").await.unwrap();
919 let addr = held.local_addr().unwrap();
920
921 let (_tx, mut rx) = watch::channel(false);
922 let handle = tokio::spawn(async move { bind_with_retry(addr, "test", &mut rx).await });
923
924 tokio::time::sleep(Duration::from_millis(50)).await;
926 drop(held);
928
929 let bound = tokio::time::timeout(Duration::from_secs(10), handle)
930 .await
931 .expect("bind_with_retry did not finish")
932 .expect("task panicked");
933 let listener = bound.expect("expected a bound listener, got None");
934 assert_eq!(listener.local_addr().unwrap().port(), addr.port());
935 }
936
937 #[tokio::test]
938 async fn test_bind_with_retry_returns_none_on_shutdown() {
939 let held = TcpListener::bind("127.0.0.1:0").await.unwrap();
942 let addr = held.local_addr().unwrap();
943
944 let (tx, mut rx) = watch::channel(false);
945 let handle = tokio::spawn(async move { bind_with_retry(addr, "test", &mut rx).await });
946
947 tokio::time::sleep(Duration::from_millis(50)).await;
948 tx.send(true).unwrap();
949
950 let result = tokio::time::timeout(Duration::from_secs(10), handle)
951 .await
952 .expect("bind_with_retry did not respond to shutdown")
953 .expect("task panicked");
954 assert!(result.is_none(), "shutdown should yield None");
955 }
956}