1use crate::error::Result;
11use std::collections::{HashMap, HashSet};
12use std::net::{IpAddr, Ipv4Addr, SocketAddr};
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18use zlayer_proxy::{
19 endpoint_lb_key, load_existing_certs_into_resolver, CertManager, LbStrategy, LoadBalancer,
20 NetworkPolicyChecker, ProxyConfig, ProxyServer, RouteEntry, ServiceRegistry, SniCertResolver,
21 StreamRegistry, StreamService, TcpStreamService, UdpStreamService,
22};
23use zlayer_spec::{ExposeType, Protocol, ServiceSpec};
24
25pub const DEFAULT_INGRESS_HTTP_PORT: u16 = 80;
29
30pub const DEFAULT_INGRESS_HTTPS_PORT: u16 = 443;
32
33#[derive(Debug, Clone)]
35pub struct ProxyManagerConfig {
36 pub http_addr: SocketAddr,
38 pub https_addr: Option<SocketAddr>,
40 pub http2_enabled: bool,
42}
43
44impl Default for ProxyManagerConfig {
45 fn default() -> Self {
46 Self {
47 http_addr: "0.0.0.0:80".parse().unwrap(),
48 https_addr: None,
49 http2_enabled: true,
50 }
51 }
52}
53
54impl ProxyManagerConfig {
55 #[must_use]
57 pub fn new(http_addr: SocketAddr) -> Self {
58 Self {
59 http_addr,
60 https_addr: None,
61 http2_enabled: true,
62 }
63 }
64
65 #[must_use]
67 pub fn with_https(mut self, addr: SocketAddr) -> Self {
68 self.https_addr = Some(addr);
69 self
70 }
71
72 #[must_use]
74 pub fn with_http2(mut self, enabled: bool) -> Self {
75 self.http2_enabled = enabled;
76 self
77 }
78}
79
80#[derive(Debug, Clone)]
82struct ServiceTracking {
83 deployment: Option<String>,
89 endpoint_names: Vec<String>,
92 tcp_ports: Vec<u16>,
94 udp_ports: Vec<u16>,
96 http_ports: Vec<u16>,
98}
99
100pub struct ProxyManager {
110 config: ProxyManagerConfig,
112 registry: Arc<ServiceRegistry>,
114 load_balancer: Arc<LoadBalancer>,
116 servers: RwLock<HashMap<u16, Arc<ProxyServer>>>,
118 services: RwLock<HashMap<String, ServiceTracking>>,
120 stream_registry: Option<Arc<StreamRegistry>>,
122 cert_manager: Option<Arc<CertManager>>,
124 tcp_listeners: RwLock<HashSet<u16>>,
126 udp_listeners: RwLock<HashSet<u16>>,
128 active_connections: Arc<AtomicU64>,
130 network_policy_checker: Option<NetworkPolicyChecker>,
132 loopback_registry: Arc<StreamRegistry>,
142 loopback_tcp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
146 loopback_udp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
149 published_ports: RwLock<HashMap<u16, (Option<String>, String)>>,
165 lb_health_checker: tokio::task::JoinHandle<()>,
174 ingress_started: AtomicBool,
179}
180
181impl Drop for ProxyManager {
182 fn drop(&mut self) {
183 self.lb_health_checker.abort();
184 }
185}
186
187impl ProxyManager {
188 pub fn new(
191 config: ProxyManagerConfig,
192 registry: Arc<ServiceRegistry>,
193 cert_manager: Option<Arc<CertManager>>,
194 ) -> Self {
195 let load_balancer = Arc::new(LoadBalancer::new());
196
197 let lb_health_checker =
203 load_balancer.spawn_health_checker(Duration::from_secs(5), Duration::from_secs(2));
204
205 Self {
206 config,
207 registry,
208 load_balancer,
209 servers: RwLock::new(HashMap::new()),
210 services: RwLock::new(HashMap::new()),
211 stream_registry: None,
212 cert_manager,
213 tcp_listeners: RwLock::new(HashSet::new()),
214 udp_listeners: RwLock::new(HashSet::new()),
215 active_connections: Arc::new(AtomicU64::new(0)),
216 network_policy_checker: None,
217 loopback_registry: Arc::new(StreamRegistry::new()),
218 loopback_tcp: RwLock::new(HashMap::new()),
219 loopback_udp: RwLock::new(HashMap::new()),
220 published_ports: RwLock::new(HashMap::new()),
221 lb_health_checker,
222 ingress_started: AtomicBool::new(false),
223 }
224 }
225
226 pub fn registry(&self) -> Arc<ServiceRegistry> {
228 self.registry.clone()
229 }
230
231 pub fn load_balancer(&self) -> Arc<LoadBalancer> {
233 self.load_balancer.clone()
234 }
235
236 pub fn active_connections(&self) -> u64 {
238 self.active_connections.load(Ordering::Relaxed)
239 }
240
241 pub fn cert_manager(&self) -> Option<&Arc<CertManager>> {
243 self.cert_manager.as_ref()
244 }
245
246 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
248 self.stream_registry = Some(registry);
249 }
250
251 #[must_use]
253 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
254 self.stream_registry = Some(registry);
255 self
256 }
257
258 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
260 self.stream_registry.as_ref()
261 }
262
263 pub fn set_network_policy_checker(&mut self, checker: NetworkPolicyChecker) {
265 self.network_policy_checker = Some(checker);
266 }
267
268 #[must_use]
270 pub fn with_network_policy_checker(mut self, checker: NetworkPolicyChecker) -> Self {
271 self.network_policy_checker = Some(checker);
272 self
273 }
274
275 pub async fn listen_on(&self, port: u16, bind_ip: IpAddr) -> Result<()> {
283 let mut servers = self.servers.write().await;
284
285 if servers.contains_key(&port) {
286 debug!(port = port, "Already listening on port");
287 return Ok(());
288 }
289
290 let addr = SocketAddr::new(bind_ip, port);
291 let mut proxy_config = ProxyConfig::default();
292 proxy_config.server.http_addr = addr;
293 proxy_config.server.http2_enabled = self.config.http2_enabled;
294
295 let mut server = ProxyServer::with_registry(
296 proxy_config,
297 self.registry.clone(),
298 self.load_balancer.clone(),
299 );
300 if let Some(ref checker) = self.network_policy_checker {
301 server = server.with_network_policy_checker(checker.clone());
302 }
303 let server = Arc::new(server);
304
305 info!(port = port, bind = %addr, "Proxy listening on port");
306
307 let server_clone = server.clone();
308 tokio::spawn(async move {
309 if let Err(e) = server_clone.run().await {
310 tracing::error!(port = port, error = %e, "Proxy server error on port");
311 }
312 });
313
314 servers.insert(port, server);
315 Ok(())
316 }
317
318 pub async fn listen_on_tls(&self, port: u16, bind_ip: IpAddr) -> Result<()> {
326 let mut servers = self.servers.write().await;
327
328 if servers.contains_key(&port) {
329 debug!(port = port, "Already listening on port (TLS)");
330 return Ok(());
331 }
332
333 let Some(cert_manager) = &self.cert_manager else {
334 warn!(
335 port = port,
336 "Cannot start TLS listener: no CertManager configured"
337 );
338 return Ok(());
339 };
340
341 let sni_resolver = Arc::new(SniCertResolver::new());
343
344 let _ = load_existing_certs_into_resolver(cert_manager, &sni_resolver).await;
346
347 let addr = SocketAddr::new(bind_ip, port);
348 let mut proxy_config = ProxyConfig::default();
349 proxy_config.server.https_addr = addr;
350
351 let mut server = ProxyServer::with_tls_resolver(
352 proxy_config,
353 self.registry.clone(),
354 self.load_balancer.clone(),
355 sni_resolver,
356 )
357 .with_cert_manager(Arc::clone(cert_manager));
358 if let Some(ref checker) = self.network_policy_checker {
359 server = server.with_network_policy_checker(checker.clone());
360 }
361 let server = Arc::new(server);
362
363 info!(port = port, bind = %addr, "HTTPS proxy listening on port");
364
365 let server_clone = server.clone();
366 tokio::spawn(async move {
367 if let Err(e) = server_clone.run_https().await {
368 tracing::error!(port = port, error = %e, "HTTPS proxy server error");
369 }
370 });
371
372 servers.insert(port, server);
373 Ok(())
374 }
375
376 pub async fn start_ingress(&self) {
394 self.start_ingress_on(DEFAULT_INGRESS_HTTP_PORT, DEFAULT_INGRESS_HTTPS_PORT)
395 .await;
396 }
397
398 #[allow(clippy::similar_names)]
403 pub async fn start_ingress_on(&self, http_port: u16, https_port: u16) {
404 if self
406 .ingress_started
407 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
408 .is_err()
409 {
410 debug!("Ingress already started; skipping");
411 return;
412 }
413
414 let bind_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED); let http_addr = SocketAddr::new(bind_ip, http_port);
418 let mut http_proxy_config = ProxyConfig::default();
419 http_proxy_config.server.http_addr = http_addr;
420 http_proxy_config.server.http2_enabled = self.config.http2_enabled;
421
422 let mut http_server = ProxyServer::with_registry(
423 http_proxy_config,
424 self.registry.clone(),
425 self.load_balancer.clone(),
426 );
427 if let Some(ref checker) = self.network_policy_checker {
428 http_server = http_server.with_network_policy_checker(checker.clone());
429 }
430 let http_server = Arc::new(http_server);
431 info!(port = http_port, bind = %http_addr, "Starting HTTP ingress (retry-never-error)");
432 {
433 let server = http_server.clone();
434 tokio::spawn(async move {
435 if let Err(e) = server.run_with_retry(http_addr).await {
436 warn!(port = http_port, error = %e, "HTTP ingress accept loop exited");
439 }
440 });
441 }
442 self.servers.write().await.insert(http_port, http_server);
443
444 let Some(cert_manager) = &self.cert_manager else {
446 warn!(
447 port = https_port,
448 "Cannot start HTTPS ingress: no CertManager configured (HTTP ingress is up)"
449 );
450 return;
451 };
452
453 let sni_resolver = Arc::new(SniCertResolver::new());
454 let _ = load_existing_certs_into_resolver(cert_manager, &sni_resolver).await;
456
457 let https_addr = SocketAddr::new(bind_ip, https_port);
458 let mut https_proxy_config = ProxyConfig::default();
459 https_proxy_config.server.https_addr = https_addr;
460
461 let mut https_server = ProxyServer::with_tls_resolver(
462 https_proxy_config,
463 self.registry.clone(),
464 self.load_balancer.clone(),
465 sni_resolver,
466 )
467 .with_cert_manager(Arc::clone(cert_manager));
468 if let Some(ref checker) = self.network_policy_checker {
469 https_server = https_server.with_network_policy_checker(checker.clone());
470 }
471 let https_server = Arc::new(https_server);
472 info!(port = https_port, bind = %https_addr, "Starting HTTPS ingress (retry-never-error)");
473 {
474 let server = https_server.clone();
475 tokio::spawn(async move {
476 if let Err(e) = server.run_https_with_retry(https_addr).await {
477 warn!(port = https_port, error = %e, "HTTPS ingress accept loop exited");
478 }
479 });
480 }
481 self.servers.write().await.insert(https_port, https_server);
482 }
483
484 pub async fn stop(&self) {
489 let mut servers = self.servers.write().await;
490 for (port, server) in servers.drain() {
491 info!(port = port, "Stopping proxy on port");
492 server.shutdown();
493 }
494
495 let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
497 while self.active_connections.load(Ordering::Relaxed) > 0 {
498 if tokio::time::Instant::now() >= deadline {
499 let remaining = self.active_connections.load(Ordering::Relaxed);
500 warn!(
501 remaining = remaining,
502 "Drain timeout reached, forcing shutdown"
503 );
504 break;
505 }
506 tokio::time::sleep(Duration::from_millis(100)).await;
507 }
508
509 info!("All proxy servers stopped");
510 }
511
512 pub async fn unbind(&self, port: u16) {
514 let mut servers = self.servers.write().await;
515 if let Some(server) = servers.remove(&port) {
516 info!(port = port, "Unbinding proxy from port");
517 server.shutdown();
518 }
519 }
520
521 pub async fn ensure_ports_for_service(
537 &self,
538 spec: &ServiceSpec,
539 overlay_ip: Option<IpAddr>,
540 ) -> Result<()> {
541 for endpoint in &spec.endpoints {
542 let bind_ip = match endpoint.expose {
543 ExposeType::Public => IpAddr::V4(Ipv4Addr::UNSPECIFIED), ExposeType::Internal => {
545 let ip = overlay_ip.unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST));
547 if overlay_ip.is_none() {
548 warn!(
549 endpoint = %endpoint.name,
550 port = endpoint.port,
551 "No overlay IP available for internal endpoint; binding to 127.0.0.1"
552 );
553 }
554 ip
555 }
556 };
557
558 match endpoint.protocol {
559 Protocol::Https => {
560 self.listen_on_tls(endpoint.port, bind_ip).await?;
562 }
563 Protocol::Http | Protocol::Websocket => {
564 self.listen_on(endpoint.port, bind_ip).await?;
566 }
567 Protocol::Tcp => {
568 self.ensure_tcp_listener(endpoint.port, bind_ip).await;
570 }
571 Protocol::Udp => {
572 self.ensure_udp_listener(endpoint.port, bind_ip).await;
574 }
575 }
576 }
577 Ok(())
578 }
579
580 async fn ensure_tcp_listener(&self, port: u16, bind_ip: IpAddr) {
585 {
587 let listeners = self.tcp_listeners.read().await;
588 if listeners.contains(&port) {
589 debug!(port = port, "TCP stream listener already active");
590 return;
591 }
592 }
593
594 let registry = if let Some(r) = &self.stream_registry {
595 Arc::clone(r)
596 } else {
597 warn!(
598 port = port,
599 "Cannot start TCP listener: StreamRegistry not configured"
600 );
601 return;
602 };
603
604 let addr = SocketAddr::new(bind_ip, port);
605 let listener = match tokio::net::TcpListener::bind(addr).await {
606 Ok(l) => l,
607 Err(e) => {
608 warn!(
609 port = port,
610 bind = %addr,
611 error = %e,
612 "Failed to bind TCP stream listener, continuing"
613 );
614 return;
615 }
616 };
617
618 {
620 let mut listeners = self.tcp_listeners.write().await;
621 listeners.insert(port);
622 }
623
624 let tcp_service = Arc::new(TcpStreamService::new(registry, port));
625 tokio::spawn(async move {
626 tcp_service.serve(listener).await;
627 });
628
629 info!(port = port, bind = %addr, "TCP stream proxy listening");
630 }
631
632 async fn ensure_udp_listener(&self, port: u16, bind_ip: IpAddr) {
637 {
639 let listeners = self.udp_listeners.read().await;
640 if listeners.contains(&port) {
641 debug!(port = port, "UDP stream listener already active");
642 return;
643 }
644 }
645
646 let registry = if let Some(r) = &self.stream_registry {
647 Arc::clone(r)
648 } else {
649 warn!(
650 port = port,
651 "Cannot start UDP listener: StreamRegistry not configured"
652 );
653 return;
654 };
655
656 let addr = SocketAddr::new(bind_ip, port);
657 let socket = match tokio::net::UdpSocket::bind(addr).await {
658 Ok(s) => s,
659 Err(e) => {
660 warn!(
661 port = port,
662 bind = %addr,
663 error = %e,
664 "Failed to bind UDP stream listener, continuing"
665 );
666 return;
667 }
668 };
669
670 {
672 let mut listeners = self.udp_listeners.write().await;
673 listeners.insert(port);
674 }
675
676 let udp_service = Arc::new(UdpStreamService::new(registry, port, None));
677 tokio::spawn(async move {
678 if let Err(e) = udp_service.serve(socket).await {
679 tracing::error!(
680 port = port,
681 error = %e,
682 "UDP stream proxy service failed"
683 );
684 }
685 });
686
687 info!(port = port, bind = %addr, "UDP stream proxy listening");
688 }
689
690 pub async fn publish_loopback_for_container(
738 &self,
739 deployment: Option<&str>,
740 service_name: &str,
741 spec: &ServiceSpec,
742 container_ip: IpAddr,
743 port_override: Option<u16>,
744 ) -> Result<()> {
745 for endpoint in &spec.endpoints {
746 if matches!(endpoint.expose, ExposeType::Public) {
748 continue;
749 }
750
751 let backend = SocketAddr::new(
752 container_ip,
753 port_override.unwrap_or_else(|| endpoint.target_port()),
754 );
755 let publish_port = endpoint.port;
756
757 self.claim_published_port(deployment, service_name, publish_port)
759 .await?;
760
761 match endpoint.protocol {
762 Protocol::Tcp | Protocol::Http | Protocol::Https | Protocol::Websocket => {
763 self.publish_loopback_tcp(service_name, publish_port, backend)
766 .await;
767 }
768 Protocol::Udp => {
769 self.publish_loopback_udp(service_name, publish_port, backend)
770 .await;
771 }
772 }
773 }
774 Ok(())
775 }
776
777 async fn claim_published_port(
784 &self,
785 deployment: Option<&str>,
786 service_name: &str,
787 publish_port: u16,
788 ) -> Result<()> {
789 let mut owners = self.published_ports.write().await;
790 if let Some((owner_dep, owner_svc)) = owners.get(&publish_port) {
791 if owner_dep.as_deref() == deployment && owner_svc == service_name {
792 return Ok(());
794 }
795 let owner = format!("{}/{}", owner_dep.as_deref().unwrap_or("_"), owner_svc);
796 let requester = format!("{}/{}", deployment.unwrap_or("_"), service_name);
797 warn!(
798 port = publish_port,
799 owner = %owner,
800 requester = %requester,
801 "Refusing to publish host port already owned by a different deployment/service (would cross-wire backends)"
802 );
803 return Err(crate::error::AgentError::PortConflict {
804 port: publish_port,
805 owner,
806 requester,
807 });
808 }
809 owners.insert(
810 publish_port,
811 (deployment.map(str::to_string), service_name.to_string()),
812 );
813 Ok(())
814 }
815
816 async fn publish_loopback_tcp(
819 &self,
820 service_name: &str,
821 publish_port: u16,
822 backend: SocketAddr,
823 ) {
824 if let Some(existing) = self.loopback_registry.resolve_tcp(publish_port) {
826 let mut backends = existing.backends;
827 if !backends.contains(&backend) {
828 backends.push(backend);
829 }
830 self.loopback_registry
831 .update_tcp_backends(publish_port, backends);
832 } else {
833 self.loopback_registry.register_tcp(
834 publish_port,
835 StreamService::new(service_name.to_string(), vec![backend]),
836 );
837 }
838
839 let mut listeners = self.loopback_tcp.write().await;
841 if listeners.contains_key(&publish_port) {
842 debug!(port = publish_port, "Loopback TCP listener already active");
843 return;
844 }
845
846 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), publish_port);
847 let listener = match tokio::net::TcpListener::bind(addr).await {
848 Ok(l) => l,
849 Err(e) => {
850 warn!(
851 port = publish_port,
852 bind = %addr,
853 error = %e,
854 "Failed to bind loopback TCP listener, continuing"
855 );
856 return;
857 }
858 };
859
860 let tcp_service = Arc::new(TcpStreamService::new(
861 Arc::clone(&self.loopback_registry),
862 publish_port,
863 ));
864 let handle = tokio::spawn(async move {
865 tcp_service.serve(listener).await;
866 });
867 listeners.insert(publish_port, handle);
868 drop(listeners);
869
870 info!(
871 service = service_name,
872 port = publish_port,
873 bind = %addr,
874 backend = %backend,
875 "Published service port on node loopback (TCP)"
876 );
877 }
878
879 async fn publish_loopback_udp(
882 &self,
883 service_name: &str,
884 publish_port: u16,
885 backend: SocketAddr,
886 ) {
887 if let Some(existing) = self.loopback_registry.resolve_udp(publish_port) {
888 let mut backends = existing.backends;
889 if !backends.contains(&backend) {
890 backends.push(backend);
891 }
892 self.loopback_registry
893 .update_udp_backends(publish_port, backends);
894 } else {
895 self.loopback_registry.register_udp(
896 publish_port,
897 StreamService::new(service_name.to_string(), vec![backend]),
898 );
899 }
900
901 let mut listeners = self.loopback_udp.write().await;
902 if listeners.contains_key(&publish_port) {
903 debug!(port = publish_port, "Loopback UDP listener already active");
904 return;
905 }
906
907 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), publish_port);
908 let socket = match tokio::net::UdpSocket::bind(addr).await {
909 Ok(s) => s,
910 Err(e) => {
911 warn!(
912 port = publish_port,
913 bind = %addr,
914 error = %e,
915 "Failed to bind loopback UDP listener, continuing"
916 );
917 return;
918 }
919 };
920
921 let udp_service = Arc::new(UdpStreamService::new(
922 Arc::clone(&self.loopback_registry),
923 publish_port,
924 None,
925 ));
926 let handle = tokio::spawn(async move {
927 if let Err(e) = udp_service.serve(socket).await {
928 tracing::error!(
929 port = publish_port,
930 error = %e,
931 "Loopback UDP stream proxy service failed"
932 );
933 }
934 });
935 listeners.insert(publish_port, handle);
936 drop(listeners);
937
938 info!(
939 service = service_name,
940 port = publish_port,
941 bind = %addr,
942 backend = %backend,
943 "Published service port on node loopback (UDP)"
944 );
945 }
946
947 pub async fn unpublish_loopback_for_container(
957 &self,
958 spec: &ServiceSpec,
959 container_ip: IpAddr,
960 port_override: Option<u16>,
961 ) {
962 for endpoint in &spec.endpoints {
963 if matches!(endpoint.expose, ExposeType::Public) {
964 continue;
965 }
966
967 let backend = SocketAddr::new(
968 container_ip,
969 port_override.unwrap_or_else(|| endpoint.target_port()),
970 );
971 let publish_port = endpoint.port;
972
973 match endpoint.protocol {
974 Protocol::Tcp | Protocol::Http | Protocol::Https | Protocol::Websocket => {
975 self.unpublish_loopback_tcp(publish_port, backend).await;
976 }
977 Protocol::Udp => {
978 self.unpublish_loopback_udp(publish_port, backend).await;
979 }
980 }
981 }
982 }
983
984 async fn unpublish_loopback_tcp(&self, publish_port: u16, backend: SocketAddr) {
987 let Some(existing) = self.loopback_registry.resolve_tcp(publish_port) else {
988 return;
989 };
990 let remaining: Vec<SocketAddr> = existing
991 .backends
992 .into_iter()
993 .filter(|b| *b != backend)
994 .collect();
995
996 if remaining.is_empty() {
997 let _ = self.loopback_registry.unregister_tcp(publish_port);
998 let mut listeners = self.loopback_tcp.write().await;
999 if let Some(handle) = listeners.remove(&publish_port) {
1000 handle.abort();
1001 }
1002 self.published_ports.write().await.remove(&publish_port);
1005 debug!(
1006 port = publish_port,
1007 "Freed loopback TCP listener (no backends remain)"
1008 );
1009 } else {
1010 self.loopback_registry
1011 .update_tcp_backends(publish_port, remaining);
1012 }
1013 }
1014
1015 async fn unpublish_loopback_udp(&self, publish_port: u16, backend: SocketAddr) {
1018 let Some(existing) = self.loopback_registry.resolve_udp(publish_port) else {
1019 return;
1020 };
1021 let remaining: Vec<SocketAddr> = existing
1022 .backends
1023 .into_iter()
1024 .filter(|b| *b != backend)
1025 .collect();
1026
1027 if remaining.is_empty() {
1028 let _ = self.loopback_registry.unregister_udp(publish_port);
1029 let mut listeners = self.loopback_udp.write().await;
1030 if let Some(handle) = listeners.remove(&publish_port) {
1031 handle.abort();
1032 }
1033 self.published_ports.write().await.remove(&publish_port);
1036 debug!(
1037 port = publish_port,
1038 "Freed loopback UDP listener (no backends remain)"
1039 );
1040 } else {
1041 self.loopback_registry
1042 .update_udp_backends(publish_port, remaining);
1043 }
1044 }
1045
1046 pub async fn add_service(&self, name: &str, spec: &ServiceSpec) {
1058 let deployment = spec.deployment.as_deref();
1059 let mut services = self.services.write().await;
1060
1061 let mut endpoint_names = Vec::new();
1063 let mut tcp_ports = Vec::new();
1064 let mut udp_ports = Vec::new();
1065 let mut http_ports = Vec::new();
1066
1067 for endpoint in &spec.endpoints {
1068 match endpoint.protocol {
1069 Protocol::Http | Protocol::Https | Protocol::Websocket => {
1070 let entry = RouteEntry::from_endpoint(deployment, name, endpoint);
1072 self.registry.register(entry).await;
1073 http_ports.push(endpoint.port);
1074
1075 let lb_key = endpoint_lb_key(deployment, name, &endpoint.name);
1085 self.load_balancer
1086 .register(&lb_key, vec![], LbStrategy::RoundRobin);
1087
1088 info!(
1089 service = name,
1090 endpoint = %endpoint.name,
1091 protocol = ?endpoint.protocol,
1092 path = ?endpoint.path,
1093 expose = ?endpoint.expose,
1094 "Added HTTP proxy route for service"
1095 );
1096 }
1097 Protocol::Tcp => {
1098 tcp_ports.push(endpoint.port);
1099 info!(
1100 service = name,
1101 endpoint = %endpoint.name,
1102 protocol = ?endpoint.protocol,
1103 port = endpoint.port,
1104 expose = ?endpoint.expose,
1105 "Tracking TCP stream endpoint for service"
1106 );
1107 }
1108 Protocol::Udp => {
1109 udp_ports.push(endpoint.port);
1110 info!(
1111 service = name,
1112 endpoint = %endpoint.name,
1113 protocol = ?endpoint.protocol,
1114 port = endpoint.port,
1115 expose = ?endpoint.expose,
1116 "Tracking UDP stream endpoint for service"
1117 );
1118 }
1119 }
1120
1121 endpoint_names.push(endpoint.name.clone());
1122 }
1123
1124 self.load_balancer
1131 .register(name, vec![], LbStrategy::RoundRobin);
1132
1133 services.insert(
1134 name.to_string(),
1135 ServiceTracking {
1136 deployment: deployment.map(str::to_string),
1137 endpoint_names,
1138 tcp_ports,
1139 udp_ports,
1140 http_ports,
1141 },
1142 );
1143 }
1144
1145 pub async fn remove_service(&self, name: &str) {
1155 let mut services = self.services.write().await;
1156
1157 if let Some(tracking) = services.remove(name) {
1158 self.registry.unregister_service(name).await;
1160
1161 self.load_balancer.unregister(name);
1166 let deployment = tracking.deployment.as_deref();
1167 for endpoint_name in &tracking.endpoint_names {
1168 let lb_key = endpoint_lb_key(deployment, name, endpoint_name);
1169 self.load_balancer.unregister(&lb_key);
1170 }
1171
1172 if !tracking.tcp_ports.is_empty() {
1174 let mut tcp_set = self.tcp_listeners.write().await;
1175 for port in &tracking.tcp_ports {
1176 if let Some(registry) = &self.stream_registry {
1177 let _ = registry.unregister_tcp(*port);
1178 }
1179 tcp_set.remove(port);
1180 debug!(service = name, port = port, "Removed TCP listener tracking");
1181 }
1182 }
1183
1184 if !tracking.udp_ports.is_empty() {
1186 let mut udp_set = self.udp_listeners.write().await;
1187 for port in &tracking.udp_ports {
1188 if let Some(registry) = &self.stream_registry {
1189 let _ = registry.unregister_udp(*port);
1190 }
1191 udp_set.remove(port);
1192 debug!(service = name, port = port, "Removed UDP listener tracking");
1193 }
1194 }
1195
1196 if !tracking.http_ports.is_empty() {
1199 let ports_still_in_use: HashSet<u16> = services
1200 .values()
1201 .flat_map(|t| t.http_ports.iter().copied())
1202 .collect();
1203
1204 let mut servers = self.servers.write().await;
1205 for port in &tracking.http_ports {
1206 if !ports_still_in_use.contains(port) {
1207 if let Some(server) = servers.remove(port) {
1208 server.shutdown();
1209 info!(
1210 service = name,
1211 port = port,
1212 "Shut down HTTP proxy server (no remaining services on port)"
1213 );
1214 }
1215 }
1216 }
1217 }
1218
1219 info!(service = name, "Removed all proxy resources for service");
1220 }
1221 }
1222
1223 pub async fn add_backend(&self, service: &str, addr: SocketAddr) {
1230 self.registry.add_backend(service, addr).await;
1231 self.load_balancer.add_backend(service, addr);
1232 let services = self.services.read().await;
1234 if let Some(tracking) = services.get(service) {
1235 let deployment = tracking.deployment.as_deref();
1236 for endpoint_name in &tracking.endpoint_names {
1237 let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
1238 self.load_balancer.add_backend(&lb_key, addr);
1239 }
1240 }
1241 info!(service = service, backend = %addr, "Registered backend with proxy");
1242 }
1243
1244 pub async fn remove_backend(&self, service: &str, addr: SocketAddr) {
1249 self.registry.remove_backend(service, addr).await;
1250 self.load_balancer.remove_backend(service, &addr);
1251 let services = self.services.read().await;
1252 if let Some(tracking) = services.get(service) {
1253 let deployment = tracking.deployment.as_deref();
1254 for endpoint_name in &tracking.endpoint_names {
1255 let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
1256 self.load_balancer.remove_backend(&lb_key, &addr);
1257 }
1258 }
1259 debug!(service = service, backend = %addr, "Removed backend from service");
1260 }
1261
1262 #[allow(clippy::unused_async)]
1269 pub async fn update_backend_health(&self, service: &str, addr: SocketAddr, healthy: bool) {
1270 self.load_balancer.mark_health(service, &addr, healthy);
1271 let services = self.services.read().await;
1272 if let Some(tracking) = services.get(service) {
1273 let deployment = tracking.deployment.as_deref();
1274 for endpoint_name in &tracking.endpoint_names {
1275 let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
1276 self.load_balancer.mark_health(&lb_key, &addr, healthy);
1277 }
1278 }
1279 debug!(
1280 service = service,
1281 backend = %addr,
1282 healthy = healthy,
1283 "Updated backend health in load balancer"
1284 );
1285 }
1286
1287 pub async fn update_backends(&self, service: &str, addrs: Vec<SocketAddr>) {
1295 self.registry.update_backends(service, addrs.clone()).await;
1296 self.load_balancer.update_backends(service, addrs.clone());
1298 let services = self.services.read().await;
1299 if let Some(tracking) = services.get(service) {
1300 let deployment = tracking.deployment.as_deref();
1301 for endpoint_name in &tracking.endpoint_names {
1302 let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
1303 self.load_balancer.update_backends(&lb_key, addrs.clone());
1304 }
1305 }
1306 debug!(service = service, "Updated backends for service");
1307 }
1308
1309 pub async fn update_endpoint_backends(
1316 &self,
1317 service: &str,
1318 endpoint_name: &str,
1319 addrs: Vec<SocketAddr>,
1320 ) {
1321 self.registry
1322 .update_backends_for_endpoint(service, endpoint_name, addrs.clone())
1323 .await;
1324 let deployment = {
1327 let services = self.services.read().await;
1328 services.get(service).and_then(|t| t.deployment.clone())
1329 };
1330 let lb_key = endpoint_lb_key(deployment.as_deref(), service, endpoint_name);
1331 self.load_balancer.update_backends(&lb_key, addrs);
1332 debug!(
1333 service = service,
1334 endpoint = endpoint_name,
1335 "Updated backends for service endpoint"
1336 );
1337 }
1338
1339 pub async fn route_count(&self) -> usize {
1341 self.registry.route_count().await
1342 }
1343
1344 pub async fn list_services(&self) -> Vec<String> {
1346 self.services.read().await.keys().cloned().collect()
1347 }
1348
1349 pub async fn has_service(&self, name: &str) -> bool {
1351 self.services.read().await.contains_key(name)
1352 }
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357 use super::*;
1358
1359 fn mock_service_spec_with_endpoints() -> ServiceSpec {
1360 use zlayer_spec::*;
1361 serde_yaml::from_str::<DeploymentSpec>(
1362 r"
1363version: v1
1364deployment: test
1365services:
1366 test:
1367 rtype: service
1368 image:
1369 name: test:latest
1370 endpoints:
1371 - name: http
1372 protocol: http
1373 port: 8080
1374 path: /api
1375 expose: public
1376 - name: websocket
1377 protocol: websocket
1378 port: 8081
1379 path: /ws
1380 expose: internal
1381",
1382 )
1383 .unwrap()
1384 .services
1385 .remove("test")
1386 .unwrap()
1387 }
1388
1389 fn mock_service_spec_tcp_only() -> ServiceSpec {
1390 mock_service_spec_tcp_only_port(9000)
1391 }
1392
1393 fn mock_service_spec_tcp_only_port(port: u16) -> ServiceSpec {
1394 use zlayer_spec::*;
1395 let yaml = format!(
1396 "
1397version: v1
1398deployment: test
1399services:
1400 test:
1401 rtype: service
1402 image:
1403 name: test:latest
1404 endpoints:
1405 - name: grpc
1406 protocol: tcp
1407 port: {port}
1408"
1409 );
1410 serde_yaml::from_str::<DeploymentSpec>(&yaml)
1411 .unwrap()
1412 .services
1413 .remove("test")
1414 .unwrap()
1415 }
1416
1417 fn reserve_free_tcp_port() -> u16 {
1425 let listener =
1426 std::net::TcpListener::bind("127.0.0.1:0").expect("failed to bind ephemeral test port");
1427 listener.local_addr().unwrap().port()
1428 }
1429
1430 #[tokio::test]
1431 async fn test_proxy_manager_new() {
1432 let config = ProxyManagerConfig::default();
1433 let registry = Arc::new(ServiceRegistry::new());
1434 let manager = ProxyManager::new(config, registry, None);
1435
1436 assert_eq!(manager.route_count().await, 0);
1437 assert!(manager.list_services().await.is_empty());
1438 }
1439
1440 #[tokio::test]
1441 async fn test_add_service_with_http_endpoints() {
1442 let config = ProxyManagerConfig::default();
1443 let registry = Arc::new(ServiceRegistry::new());
1444 let manager = ProxyManager::new(config, registry, None);
1445
1446 let spec = mock_service_spec_with_endpoints();
1447 manager.add_service("api", &spec).await;
1448
1449 assert_eq!(manager.route_count().await, 2);
1451 assert!(manager.has_service("api").await);
1452 }
1453
1454 #[tokio::test]
1455 async fn test_tcp_endpoints_tracked_not_routed() {
1456 let config = ProxyManagerConfig::default();
1457 let registry = Arc::new(ServiceRegistry::new());
1458 let manager = ProxyManager::new(config, registry, None);
1459
1460 let spec = mock_service_spec_tcp_only();
1461 manager.add_service("grpc-service", &spec).await;
1462
1463 assert_eq!(manager.route_count().await, 0);
1465 assert!(manager.has_service("grpc-service").await);
1467 }
1468
1469 #[tokio::test]
1470 async fn test_remove_service() {
1471 let config = ProxyManagerConfig::default();
1472 let registry = Arc::new(ServiceRegistry::new());
1473 let manager = ProxyManager::new(config, registry, None);
1474
1475 let spec = mock_service_spec_with_endpoints();
1476 manager.add_service("api", &spec).await;
1477 assert_eq!(manager.route_count().await, 2);
1478
1479 manager.remove_service("api").await;
1480 assert_eq!(manager.route_count().await, 0);
1481 assert!(!manager.has_service("api").await);
1482 }
1483
1484 #[tokio::test]
1485 async fn test_backend_management() {
1486 let config = ProxyManagerConfig::default();
1487 let registry = Arc::new(ServiceRegistry::new());
1488 let manager = ProxyManager::new(config, registry.clone(), None);
1489
1490 let spec = mock_service_spec_with_endpoints();
1491 manager.add_service("api", &spec).await;
1492
1493 let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
1495 let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();
1496
1497 manager.add_backend("api", addr1).await;
1498 manager.add_backend("api", addr2).await;
1499
1500 let resolved = registry.resolve(None, "/api").await.unwrap();
1502 assert_eq!(resolved.backends.len(), 2);
1503
1504 manager.remove_backend("api", addr1).await;
1506 let resolved = registry.resolve(None, "/api").await.unwrap();
1507 assert_eq!(resolved.backends.len(), 1);
1508 }
1509
1510 #[tokio::test]
1511 async fn test_update_backends_replaces_all() {
1512 let config = ProxyManagerConfig::default();
1513 let registry = Arc::new(ServiceRegistry::new());
1514 let manager = ProxyManager::new(config, registry.clone(), None);
1515
1516 let spec = mock_service_spec_with_endpoints();
1517 manager.add_service("api", &spec).await;
1518
1519 let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
1521 manager.add_backend("api", addr1).await;
1522
1523 let new_backends: Vec<SocketAddr> = vec![
1525 "127.0.0.1:9000".parse().unwrap(),
1526 "127.0.0.1:9001".parse().unwrap(),
1527 "127.0.0.1:9002".parse().unwrap(),
1528 ];
1529 manager.update_backends("api", new_backends).await;
1530
1531 let resolved = registry.resolve(None, "/api").await.unwrap();
1532 assert_eq!(resolved.backends.len(), 3);
1533 }
1534
1535 #[tokio::test]
1536 async fn test_config_builder() {
1537 let config = ProxyManagerConfig::new("0.0.0.0:8080".parse().unwrap())
1538 .with_https("0.0.0.0:8443".parse().unwrap())
1539 .with_http2(false);
1540
1541 assert_eq!(
1542 config.http_addr,
1543 "0.0.0.0:8080".parse::<SocketAddr>().unwrap()
1544 );
1545 assert_eq!(
1546 config.https_addr,
1547 Some("0.0.0.0:8443".parse::<SocketAddr>().unwrap())
1548 );
1549 assert!(!config.http2_enabled);
1550 }
1551
1552 #[tokio::test]
1557 async fn test_ensure_ports_differentiates_public_and_internal() {
1558 let config = ProxyManagerConfig::default();
1559 let registry = Arc::new(ServiceRegistry::new());
1560 let manager = ProxyManager::new(config, registry, None);
1561
1562 let spec = mock_service_spec_with_endpoints();
1563 let result = manager.ensure_ports_for_service(&spec, None).await;
1565 let _ = result;
1568 }
1569
1570 #[tokio::test]
1571 async fn test_ensure_ports_with_overlay_ip() {
1572 let config = ProxyManagerConfig::default();
1573 let registry = Arc::new(ServiceRegistry::new());
1574 let manager = ProxyManager::new(config, registry, None);
1575
1576 let spec = mock_service_spec_with_endpoints();
1577 let overlay_ip: IpAddr = "10.200.0.5".parse().unwrap();
1579 let result = manager
1580 .ensure_ports_for_service(&spec, Some(overlay_ip))
1581 .await;
1582 let _ = result;
1583 }
1584
1585 fn mock_mixed_service_spec() -> ServiceSpec {
1586 use zlayer_spec::*;
1587 serde_yaml::from_str::<DeploymentSpec>(
1588 r"
1589version: v1
1590deployment: test
1591services:
1592 mixed:
1593 rtype: service
1594 image:
1595 name: test:latest
1596 endpoints:
1597 - name: http
1598 protocol: http
1599 port: 8080
1600 path: /api
1601 expose: public
1602 - name: grpc
1603 protocol: tcp
1604 port: 9000
1605 expose: public
1606 - name: game
1607 protocol: udp
1608 port: 27015
1609 expose: public
1610",
1611 )
1612 .unwrap()
1613 .services
1614 .remove("mixed")
1615 .unwrap()
1616 }
1617
1618 #[tokio::test]
1619 async fn test_add_mixed_service_tracks_all_endpoints() {
1620 let config = ProxyManagerConfig::default();
1621 let registry = Arc::new(ServiceRegistry::new());
1622 let manager = ProxyManager::new(config, registry, None);
1623
1624 let spec = mock_mixed_service_spec();
1625 manager.add_service("mixed", &spec).await;
1626
1627 assert_eq!(manager.route_count().await, 1);
1629 assert!(manager.has_service("mixed").await);
1631 }
1632
1633 #[tokio::test]
1634 async fn test_ensure_ports_tcp_with_stream_registry() {
1635 use zlayer_proxy::StreamService;
1636
1637 let stream_registry = Arc::new(StreamRegistry::new());
1638 let config = ProxyManagerConfig::default();
1639 let registry = Arc::new(ServiceRegistry::new());
1640 let mut manager = ProxyManager::new(config, registry, None);
1641 manager.set_stream_registry(stream_registry.clone());
1642
1643 let port = reserve_free_tcp_port();
1647 let spec = mock_service_spec_tcp_only_port(port);
1648
1649 stream_registry.register_tcp(port, StreamService::new("grpc-service".to_string(), vec![]));
1651
1652 let result = manager.ensure_ports_for_service(&spec, None).await;
1654 assert!(result.is_ok());
1655
1656 let tcp_ports = manager.tcp_listeners.read().await;
1658 assert!(tcp_ports.contains(&port));
1659 }
1660
1661 #[tokio::test]
1662 async fn test_ensure_ports_tcp_without_stream_registry() {
1663 let config = ProxyManagerConfig::default();
1664 let registry = Arc::new(ServiceRegistry::new());
1665 let manager = ProxyManager::new(config, registry, None);
1666
1667 let spec = mock_service_spec_tcp_only();
1668
1669 let result = manager.ensure_ports_for_service(&spec, None).await;
1671 assert!(result.is_ok());
1672
1673 let tcp_ports = manager.tcp_listeners.read().await;
1675 assert!(tcp_ports.is_empty());
1676 }
1677
1678 #[tokio::test]
1679 async fn test_stream_registry_setter() {
1680 let stream_registry = Arc::new(StreamRegistry::new());
1681 let config = ProxyManagerConfig::default();
1682 let registry = Arc::new(ServiceRegistry::new());
1683 let mut manager = ProxyManager::new(config, registry, None);
1684
1685 assert!(manager.stream_registry().is_none());
1686 manager.set_stream_registry(stream_registry.clone());
1687 assert!(manager.stream_registry().is_some());
1688 }
1689
1690 fn mock_internal_tcp_spec(port: u16) -> ServiceSpec {
1693 use zlayer_spec::*;
1694 let yaml = format!(
1695 "
1696version: v1
1697deployment: test
1698services:
1699 test:
1700 rtype: service
1701 image:
1702 name: test:latest
1703 scale:
1704 mode: fixed
1705 replicas: 1
1706 endpoints:
1707 - name: tcp
1708 protocol: tcp
1709 port: {port}
1710 expose: internal
1711"
1712 );
1713 serde_yaml::from_str::<DeploymentSpec>(&yaml)
1714 .unwrap()
1715 .services
1716 .remove("test")
1717 .unwrap()
1718 }
1719
1720 #[tokio::test]
1725 async fn test_publish_loopback_round_trips_then_frees_port() {
1726 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1727
1728 let backend = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1730 let backend_addr = backend.local_addr().unwrap();
1731 let backend_ip = backend_addr.ip();
1732 let backend_port = backend_addr.port();
1733 tokio::spawn(async move {
1734 if let Ok((mut sock, _)) = backend.accept().await {
1735 let mut buf = [0u8; 16];
1736 let n = sock.read(&mut buf).await.unwrap_or(0);
1737 let _ = sock.write_all(b"pong:").await;
1739 let _ = sock.write_all(&buf[..n]).await;
1740 let _ = sock.flush().await;
1741 }
1742 });
1743
1744 let config = ProxyManagerConfig::default();
1745 let registry = Arc::new(ServiceRegistry::new());
1746 let manager = ProxyManager::new(config, registry, None);
1747
1748 let publish_port = reserve_free_tcp_port();
1750 let spec = mock_internal_tcp_spec(publish_port);
1751 assert!(
1752 spec.publish_to_node_loopback(),
1753 "single-member internal spec should publish to loopback"
1754 );
1755
1756 manager
1759 .publish_loopback_for_container(
1760 Some("dep-a"),
1761 "test",
1762 &spec,
1763 backend_ip,
1764 Some(backend_port),
1765 )
1766 .await
1767 .expect("publish should succeed on a free port");
1768
1769 let mut client = tokio::net::TcpStream::connect((Ipv4Addr::LOCALHOST, publish_port))
1771 .await
1772 .expect("connect to published loopback port");
1773 client.write_all(b"ping").await.unwrap();
1774 client.flush().await.unwrap();
1775 let mut reply = Vec::new();
1776 client.read_to_end(&mut reply).await.unwrap();
1777 assert_eq!(&reply, b"pong:ping");
1778 drop(client);
1779
1780 manager
1782 .unpublish_loopback_for_container(&spec, backend_ip, Some(backend_port))
1783 .await;
1784
1785 let mut bound = None;
1788 for _ in 0..50 {
1789 match std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, publish_port)) {
1790 Ok(l) => {
1791 bound = Some(l);
1792 break;
1793 }
1794 Err(_) => tokio::time::sleep(Duration::from_millis(20)).await,
1795 }
1796 }
1797 assert!(
1798 bound.is_some(),
1799 "loopback port {publish_port} should be freed after unpublish"
1800 );
1801 }
1802
1803 #[tokio::test]
1804 async fn test_publish_loopback_skips_public_endpoints() {
1805 let config = ProxyManagerConfig::default();
1809 let registry = Arc::new(ServiceRegistry::new());
1810 let manager = ProxyManager::new(config, registry, None);
1811
1812 let spec = mock_mixed_service_spec();
1813 let backend_ip: IpAddr = "127.0.0.1".parse().unwrap();
1814 manager
1815 .publish_loopback_for_container(Some("dep-a"), "mixed", &spec, backend_ip, None)
1816 .await
1817 .expect("public-only spec publishes nothing and must not error");
1818
1819 assert!(manager.loopback_tcp.read().await.is_empty());
1821 assert!(manager.loopback_udp.read().await.is_empty());
1822 }
1823
1824 #[tokio::test]
1825 async fn test_registry_accessor() {
1826 let config = ProxyManagerConfig::default();
1827 let registry = Arc::new(ServiceRegistry::new());
1828 let manager = ProxyManager::new(config, registry.clone(), None);
1829
1830 assert_eq!(Arc::as_ptr(&manager.registry()), Arc::as_ptr(®istry));
1832 }
1833
1834 #[tokio::test]
1839 async fn test_published_port_ownership_rejects_cross_deployment() {
1840 let config = ProxyManagerConfig::default();
1841 let registry = Arc::new(ServiceRegistry::new());
1842 let manager = ProxyManager::new(config, registry, None);
1843
1844 let publish_port = reserve_free_tcp_port();
1846 let spec = mock_internal_tcp_spec(publish_port);
1847
1848 let backend_a: IpAddr = "10.0.0.1".parse().unwrap();
1850 let tgt_a = 5001u16;
1851 let backend_b: IpAddr = "10.0.0.2".parse().unwrap();
1852 let tgt_b = 5002u16;
1853
1854 manager
1856 .publish_loopback_for_container(Some("dep-a"), "svc", &spec, backend_a, Some(tgt_a))
1857 .await
1858 .expect("deployment A should claim the free port");
1859
1860 let err = manager
1862 .publish_loopback_for_container(Some("dep-b"), "svc", &spec, backend_b, Some(tgt_b))
1863 .await
1864 .expect_err("deployment B must be refused on an owned port");
1865 match err {
1866 crate::error::AgentError::PortConflict { port, .. } => {
1867 assert_eq!(port, publish_port);
1868 }
1869 other => panic!("expected PortConflict, got {other:?}"),
1870 }
1871
1872 let svc = manager
1875 .loopback_registry
1876 .resolve_tcp(publish_port)
1877 .expect("port should still be registered to deployment A");
1878 let expected_a = SocketAddr::new(backend_a, tgt_a);
1879 let foreign_b = SocketAddr::new(backend_b, tgt_b);
1880 assert_eq!(svc.backends, vec![expected_a]);
1881 assert!(
1882 !svc.backends.contains(&foreign_b),
1883 "deployment B's backend must NOT be cross-wired into the pool"
1884 );
1885 }
1886
1887 #[tokio::test]
1890 async fn test_published_port_same_owner_appends_replica() {
1891 let config = ProxyManagerConfig::default();
1892 let registry = Arc::new(ServiceRegistry::new());
1893 let manager = ProxyManager::new(config, registry, None);
1894
1895 let publish_port = reserve_free_tcp_port();
1896 let spec = mock_internal_tcp_spec(publish_port);
1897
1898 let replica1: IpAddr = "10.0.0.1".parse().unwrap();
1899 let replica2: IpAddr = "10.0.0.2".parse().unwrap();
1900 let target_port = 6000u16;
1901
1902 manager
1904 .publish_loopback_for_container(
1905 Some("dep-a"),
1906 "svc",
1907 &spec,
1908 replica1,
1909 Some(target_port),
1910 )
1911 .await
1912 .expect("first replica claims the port");
1913
1914 manager
1916 .publish_loopback_for_container(
1917 Some("dep-a"),
1918 "svc",
1919 &spec,
1920 replica2,
1921 Some(target_port),
1922 )
1923 .await
1924 .expect("same-owner second replica should be accepted");
1925
1926 let svc = manager
1927 .loopback_registry
1928 .resolve_tcp(publish_port)
1929 .expect("port should be registered");
1930 let b1 = SocketAddr::new(replica1, target_port);
1931 let b2 = SocketAddr::new(replica2, target_port);
1932 assert_eq!(svc.backends.len(), 2, "both replicas should be in the pool");
1933 assert!(svc.backends.contains(&b1));
1934 assert!(svc.backends.contains(&b2));
1935 }
1936
1937 #[tokio::test]
1941 async fn test_published_port_freed_on_unpublish() {
1942 let config = ProxyManagerConfig::default();
1943 let registry = Arc::new(ServiceRegistry::new());
1944 let manager = ProxyManager::new(config, registry, None);
1945
1946 let publish_port = reserve_free_tcp_port();
1947 let spec = mock_internal_tcp_spec(publish_port);
1948 let backend_a: IpAddr = "10.0.0.1".parse().unwrap();
1949 let target_port = 7000u16;
1950
1951 manager
1952 .publish_loopback_for_container(
1953 Some("dep-a"),
1954 "svc",
1955 &spec,
1956 backend_a,
1957 Some(target_port),
1958 )
1959 .await
1960 .expect("deployment A claims the port");
1961 assert!(manager
1962 .published_ports
1963 .read()
1964 .await
1965 .contains_key(&publish_port));
1966
1967 manager
1969 .unpublish_loopback_for_container(&spec, backend_a, Some(target_port))
1970 .await;
1971 assert!(
1972 !manager
1973 .published_ports
1974 .read()
1975 .await
1976 .contains_key(&publish_port),
1977 "ownership entry should be cleared once the last backend is gone"
1978 );
1979
1980 let backend_b: IpAddr = "10.0.0.2".parse().unwrap();
1982 manager
1983 .publish_loopback_for_container(
1984 Some("dep-b"),
1985 "svc",
1986 &spec,
1987 backend_b,
1988 Some(target_port),
1989 )
1990 .await
1991 .expect("freed port should be claimable by another deployment");
1992 }
1993
1994 #[tokio::test]
1995 #[allow(clippy::similar_names)]
1996 async fn test_start_ingress_is_idempotent() {
1997 let config = ProxyManagerConfig::default();
1998 let registry = Arc::new(ServiceRegistry::new());
1999 let manager = ProxyManager::new(config, registry, None);
2000
2001 let http_port = reserve_free_tcp_port();
2005 let https_port = reserve_free_tcp_port();
2006
2007 manager.start_ingress_on(http_port, https_port).await;
2008 assert!(
2010 manager.servers.read().await.contains_key(&http_port),
2011 "HTTP ingress should be registered"
2012 );
2013 assert!(
2014 manager.ingress_started.load(Ordering::SeqCst),
2015 "ingress_started flag should be set"
2016 );
2017 let count_after_first = manager.servers.read().await.len();
2018
2019 manager.start_ingress_on(http_port, https_port).await;
2022 assert_eq!(
2023 manager.servers.read().await.len(),
2024 count_after_first,
2025 "second start_ingress call must not register additional servers"
2026 );
2027 }
2028}