Skip to main content

zlayer_agent/
proxy_manager.rs

1//! Proxy management for agent-controlled services
2//!
3//! This module provides the `ProxyManager` struct that integrates the proxy crate
4//! with the agent's service management. It handles:
5//! - Managing proxy routes based on `ServiceSpec` endpoints (HTTP/HTTPS/WebSocket)
6//! - Managing L4 stream proxy listeners (TCP/UDP)
7//! - Tracking and updating backend servers for load balancing
8//! - Coordinating proxy server lifecycle
9
10use crate::error::Result;
11use std::collections::{HashMap, HashSet};
12use std::net::{IpAddr, Ipv4Addr, SocketAddr};
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18use zlayer_proxy::{
19    endpoint_lb_key, load_existing_certs_into_resolver, CertManager, LbStrategy, LoadBalancer,
20    NetworkPolicyChecker, ProxyConfig, ProxyServer, RouteEntry, ServiceRegistry, SniCertResolver,
21    StreamRegistry, StreamService, TcpStreamService, UdpStreamService,
22};
23use zlayer_spec::{ExposeType, Protocol, ServiceSpec};
24
25/// Default HTTP ingress port. The lone daemon IS the cluster ingress: it binds
26/// `0.0.0.0:80` so any deployed service with an HTTP endpoint is reachable on
27/// the standard web port without per-service listener configuration.
28pub const DEFAULT_INGRESS_HTTP_PORT: u16 = 80;
29
30/// Default HTTPS ingress port. See [`DEFAULT_INGRESS_HTTP_PORT`].
31pub const DEFAULT_INGRESS_HTTPS_PORT: u16 = 443;
32
33/// Configuration for the `ProxyManager`
34#[derive(Debug, Clone)]
35pub struct ProxyManagerConfig {
36    /// HTTP bind address
37    pub http_addr: SocketAddr,
38    /// HTTPS bind address (optional)
39    pub https_addr: Option<SocketAddr>,
40    /// Whether to enable HTTP/2
41    pub http2_enabled: bool,
42}
43
44impl Default for ProxyManagerConfig {
45    fn default() -> Self {
46        Self {
47            http_addr: "0.0.0.0:80".parse().unwrap(),
48            https_addr: None,
49            http2_enabled: true,
50        }
51    }
52}
53
54impl ProxyManagerConfig {
55    /// Create a new configuration with the specified HTTP address
56    #[must_use]
57    pub fn new(http_addr: SocketAddr) -> Self {
58        Self {
59            http_addr,
60            https_addr: None,
61            http2_enabled: true,
62        }
63    }
64
65    /// Set the HTTPS address
66    #[must_use]
67    pub fn with_https(mut self, addr: SocketAddr) -> Self {
68        self.https_addr = Some(addr);
69        self
70    }
71
72    /// Set HTTP/2 support
73    #[must_use]
74    pub fn with_http2(mut self, enabled: bool) -> Self {
75        self.http2_enabled = enabled;
76        self
77    }
78}
79
80/// Per-service tracking information for cleanup purposes.
81#[derive(Debug, Clone)]
82struct ServiceTracking {
83    /// Owning deployment name, when known. Threaded from
84    /// `ServiceSpec.deployment` by `add_service`. `None` for standalone /
85    /// single-deployment callers (`docker run`). Used to build the
86    /// deployment-scoped LB group key so two deployments sharing a
87    /// service+endpoint name keep independent backend pools.
88    deployment: Option<String>,
89    /// Endpoint names (used to derive per-endpoint LB group keys for
90    /// cleanup on `remove_service`).
91    endpoint_names: Vec<String>,
92    /// TCP ports owned by this service
93    tcp_ports: Vec<u16>,
94    /// UDP ports owned by this service
95    udp_ports: Vec<u16>,
96    /// HTTP/HTTPS/WebSocket ports owned by this service
97    http_ports: Vec<u16>,
98}
99
100/// Manages proxy routing for agent-controlled services
101///
102/// The `ProxyManager` coordinates between the agent's service lifecycle and
103/// the proxy crate's routing/load balancing infrastructure. It supports:
104///
105/// - **HTTP/HTTPS/WebSocket (L7)**: Multiple port listeners sharing the same
106///   `ServiceRegistry` for request matching and load balancing.
107/// - **TCP/UDP (L4)**: Standalone stream proxy listeners that forward raw
108///   connections/datagrams to backends via the `StreamRegistry`.
109pub struct ProxyManager {
110    /// Configuration
111    config: ProxyManagerConfig,
112    /// Shared service registry for HTTP request matching and backend management
113    registry: Arc<ServiceRegistry>,
114    /// Load balancer for health-aware backend selection
115    load_balancer: Arc<LoadBalancer>,
116    /// Per-port HTTP proxy server handles
117    servers: RwLock<HashMap<u16, Arc<ProxyServer>>>,
118    /// Tracked services and their endpoints (includes port ownership for cleanup)
119    services: RwLock<HashMap<String, ServiceTracking>>,
120    /// Stream registry for L4 TCP/UDP proxy routing
121    stream_registry: Option<Arc<StreamRegistry>>,
122    /// Certificate manager for TLS
123    cert_manager: Option<Arc<CertManager>>,
124    /// Ports with active TCP stream listeners (to avoid double-binding)
125    tcp_listeners: RwLock<HashSet<u16>>,
126    /// Ports with active UDP stream listeners (to avoid double-binding)
127    udp_listeners: RwLock<HashSet<u16>>,
128    /// Number of active proxy connections (for graceful drain on shutdown)
129    active_connections: Arc<AtomicU64>,
130    /// Optional network policy checker for access control enforcement
131    network_policy_checker: Option<NetworkPolicyChecker>,
132    /// Dedicated stream registry for node-loopback (`127.0.0.1:<port>`)
133    /// publishing.
134    ///
135    /// This is intentionally separate from [`Self::stream_registry`]: the
136    /// latter is keyed by endpoint port and entangled with the L7/L4 +
137    /// Public/Internal binding matrix (`ensure_ports_for_service`). The
138    /// loopback path forwards the node's `127.0.0.1:<endpoint.port>` to the
139    /// container's real backend, independent of how the endpoint is exposed,
140    /// so it owns its own registry and listener set.
141    loopback_registry: Arc<StreamRegistry>,
142    /// Active loopback TCP listeners keyed by published port. The
143    /// [`JoinHandle`] owns the bound socket via its accept loop; aborting it
144    /// frees the OS port. Used for both dedup and cleanup.
145    loopback_tcp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
146    /// Active loopback UDP listeners keyed by published port. See
147    /// [`Self::loopback_tcp`].
148    loopback_udp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
149    /// Ownership map for published host-port loopback bindings.
150    ///
151    /// A host port (`127.0.0.1:<port>`) is a GLOBAL host resource — it can be
152    /// bound exactly once. This maps each published port to the
153    /// `(deployment, service)` that owns it. The first publisher claims the
154    /// port; a second publisher for the SAME `(deployment, service)` is a
155    /// legitimate scale-up (replica backend appended); a publisher for a
156    /// DIFFERENT `(deployment, service)` is REFUSED (it would otherwise be
157    /// silently appended into the foreign pool, so `:<port>` would serve the
158    /// wrong deployment's backends — Bug 7). The entry is freed when the
159    /// owning service's last backend on the port is unpublished.
160    ///
161    /// The key is the published port; the value is
162    /// `(deployment, service_name)` where `deployment` is `None` for
163    /// standalone callers.
164    published_ports: RwLock<HashMap<u16, (Option<String>, String)>>,
165    /// Background TCP health-check task for the L7 load balancer. Periodically
166    /// TCP-connects to every registered backend and flips its health status,
167    /// so a backend that was marked unhealthy by a transient request-path
168    /// failure (e.g. the overlay momentarily reconfiguring while sibling
169    /// containers churn during a CI build) AUTO-RECOVERS once it answers
170    /// connects again. Without this the L7 LB had no recovery path of its own
171    /// and a single transient blip left a service stuck on "no healthy
172    /// backends" until a daemon restart. Aborted on drop.
173    lb_health_checker: tokio::task::JoinHandle<()>,
174    /// Whether the standing `0.0.0.0:80` / `0.0.0.0:443` ingress listeners have
175    /// already been started. Makes [`ProxyManager::start_ingress`] idempotent
176    /// so a double call (or a restart path) does not spawn duplicate
177    /// retry-bind tasks that would fight over the same ports.
178    ingress_started: AtomicBool,
179}
180
181impl Drop for ProxyManager {
182    fn drop(&mut self) {
183        self.lb_health_checker.abort();
184    }
185}
186
187impl ProxyManager {
188    /// Create a new `ProxyManager` with the given configuration, service registry,
189    /// and optional certificate manager.
190    pub fn new(
191        config: ProxyManagerConfig,
192        registry: Arc<ServiceRegistry>,
193        cert_manager: Option<Arc<CertManager>>,
194    ) -> Self {
195        let load_balancer = Arc::new(LoadBalancer::new());
196
197        // Spawn the L7 load balancer's own TCP health checker so unhealthy
198        // backends auto-recover. Probe every 5s with a 2s per-probe timeout:
199        // fast enough that a transient blip during a CI build (sibling
200        // containers churning the overlay) clears well within a single e2e
201        // step, without hammering backends.
202        let lb_health_checker =
203            load_balancer.spawn_health_checker(Duration::from_secs(5), Duration::from_secs(2));
204
205        Self {
206            config,
207            registry,
208            load_balancer,
209            servers: RwLock::new(HashMap::new()),
210            services: RwLock::new(HashMap::new()),
211            stream_registry: None,
212            cert_manager,
213            tcp_listeners: RwLock::new(HashSet::new()),
214            udp_listeners: RwLock::new(HashSet::new()),
215            active_connections: Arc::new(AtomicU64::new(0)),
216            network_policy_checker: None,
217            loopback_registry: Arc::new(StreamRegistry::new()),
218            loopback_tcp: RwLock::new(HashMap::new()),
219            loopback_udp: RwLock::new(HashMap::new()),
220            published_ports: RwLock::new(HashMap::new()),
221            lb_health_checker,
222            ingress_started: AtomicBool::new(false),
223        }
224    }
225
226    /// Get a reference to the service registry
227    pub fn registry(&self) -> Arc<ServiceRegistry> {
228        self.registry.clone()
229    }
230
231    /// Get a reference to the load balancer
232    pub fn load_balancer(&self) -> Arc<LoadBalancer> {
233        self.load_balancer.clone()
234    }
235
236    /// Get the number of currently active proxy connections.
237    pub fn active_connections(&self) -> u64 {
238        self.active_connections.load(Ordering::Relaxed)
239    }
240
241    /// Get a reference to the certificate manager (if configured)
242    pub fn cert_manager(&self) -> Option<&Arc<CertManager>> {
243        self.cert_manager.as_ref()
244    }
245
246    /// Set the stream registry for L4 proxy integration (TCP/UDP)
247    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
248        self.stream_registry = Some(registry);
249    }
250
251    /// Builder pattern: add stream registry for L4 proxy integration
252    #[must_use]
253    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
254        self.stream_registry = Some(registry);
255        self
256    }
257
258    /// Get the stream registry (if configured)
259    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
260        self.stream_registry.as_ref()
261    }
262
263    /// Set the network policy checker for access control enforcement
264    pub fn set_network_policy_checker(&mut self, checker: NetworkPolicyChecker) {
265        self.network_policy_checker = Some(checker);
266    }
267
268    /// Builder pattern: add network policy checker for access control enforcement
269    #[must_use]
270    pub fn with_network_policy_checker(mut self, checker: NetworkPolicyChecker) -> Self {
271        self.network_policy_checker = Some(checker);
272        self
273    }
274
275    /// Start listening on a specific port bound to the given address.
276    ///
277    /// If already listening on this port, skip.
278    /// All port listeners share the same `ServiceRegistry` for request matching.
279    ///
280    /// # Errors
281    /// Returns an error if the proxy server cannot be started.
282    pub async fn listen_on(&self, port: u16, bind_ip: IpAddr) -> Result<()> {
283        let mut servers = self.servers.write().await;
284
285        if servers.contains_key(&port) {
286            debug!(port = port, "Already listening on port");
287            return Ok(());
288        }
289
290        let addr = SocketAddr::new(bind_ip, port);
291        let mut proxy_config = ProxyConfig::default();
292        proxy_config.server.http_addr = addr;
293        proxy_config.server.http2_enabled = self.config.http2_enabled;
294
295        let mut server = ProxyServer::with_registry(
296            proxy_config,
297            self.registry.clone(),
298            self.load_balancer.clone(),
299        );
300        if let Some(ref checker) = self.network_policy_checker {
301            server = server.with_network_policy_checker(checker.clone());
302        }
303        let server = Arc::new(server);
304
305        info!(port = port, bind = %addr, "Proxy listening on port");
306
307        let server_clone = server.clone();
308        tokio::spawn(async move {
309            if let Err(e) = server_clone.run().await {
310                tracing::error!(port = port, error = %e, "Proxy server error on port");
311            }
312        });
313
314        servers.insert(port, server);
315        Ok(())
316    }
317
318    /// Start an HTTPS listener on the given port using `SniCertResolver` for dynamic cert selection.
319    ///
320    /// If already listening on this port, skip.
321    /// Requires a `CertManager` to be configured; logs a warning and returns `Ok(())` if not.
322    ///
323    /// # Errors
324    /// Returns an error if the HTTPS proxy server cannot be started.
325    pub async fn listen_on_tls(&self, port: u16, bind_ip: IpAddr) -> Result<()> {
326        let mut servers = self.servers.write().await;
327
328        if servers.contains_key(&port) {
329            debug!(port = port, "Already listening on port (TLS)");
330            return Ok(());
331        }
332
333        let Some(cert_manager) = &self.cert_manager else {
334            warn!(
335                port = port,
336                "Cannot start TLS listener: no CertManager configured"
337            );
338            return Ok(());
339        };
340
341        // Create SniCertResolver and load existing certs
342        let sni_resolver = Arc::new(SniCertResolver::new());
343
344        // Load existing certificates (best-effort; log warnings on failure)
345        let _ = load_existing_certs_into_resolver(cert_manager, &sni_resolver).await;
346
347        let addr = SocketAddr::new(bind_ip, port);
348        let mut proxy_config = ProxyConfig::default();
349        proxy_config.server.https_addr = addr;
350
351        let mut server = ProxyServer::with_tls_resolver(
352            proxy_config,
353            self.registry.clone(),
354            self.load_balancer.clone(),
355            sni_resolver,
356        )
357        .with_cert_manager(Arc::clone(cert_manager));
358        if let Some(ref checker) = self.network_policy_checker {
359            server = server.with_network_policy_checker(checker.clone());
360        }
361        let server = Arc::new(server);
362
363        info!(port = port, bind = %addr, "HTTPS proxy listening on port");
364
365        let server_clone = server.clone();
366        tokio::spawn(async move {
367            if let Err(e) = server_clone.run_https().await {
368                tracing::error!(port = port, error = %e, "HTTPS proxy server error");
369            }
370        });
371
372        servers.insert(port, server);
373        Ok(())
374    }
375
376    /// Start the standing HTTP/HTTPS ingress on `0.0.0.0:80` and `0.0.0.0:443`.
377    ///
378    /// The lone daemon IS the ingress: this binds the two well-known web ports
379    /// so every deployed service with an HTTP/HTTPS endpoint is reachable on
380    /// the standard ports, routed by the shared [`ServiceRegistry`]/SNI cert
381    /// resolver the manager already holds — no per-service listener config.
382    ///
383    /// **Conflict policy: WARN, never error.** If 80/443 is already held the
384    /// underlying [`ProxyServer::run_with_retry`] /
385    /// [`ProxyServer::run_https_with_retry`] log a warning and keep retrying
386    /// the bind forever, grabbing the port the moment it frees. This NEVER
387    /// aborts startup, NEVER blocks deployments, and NEVER hard-errors. Binding
388    /// 80/443 needs root or `CAP_NET_BIND_SERVICE`; a non-root daemon simply
389    /// never grabs them and keeps warning — that is fine.
390    ///
391    /// Idempotent: a second call is a no-op (it will not spawn duplicate
392    /// retry-bind tasks).
393    pub async fn start_ingress(&self) {
394        self.start_ingress_on(DEFAULT_INGRESS_HTTP_PORT, DEFAULT_INGRESS_HTTPS_PORT)
395            .await;
396    }
397
398    /// Like [`Self::start_ingress`] but with explicit HTTP/HTTPS ports.
399    ///
400    /// Used by tests; the production path uses [`DEFAULT_INGRESS_HTTP_PORT`] /
401    /// [`DEFAULT_INGRESS_HTTPS_PORT`].
402    #[allow(clippy::similar_names)]
403    pub async fn start_ingress_on(&self, http_port: u16, https_port: u16) {
404        // Idempotency: only the first caller wins.
405        if self
406            .ingress_started
407            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
408            .is_err()
409        {
410            debug!("Ingress already started; skipping");
411            return;
412        }
413
414        let bind_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED); // 0.0.0.0
415
416        // ---- HTTP ingress (:80) ----
417        let http_addr = SocketAddr::new(bind_ip, http_port);
418        let mut http_proxy_config = ProxyConfig::default();
419        http_proxy_config.server.http_addr = http_addr;
420        http_proxy_config.server.http2_enabled = self.config.http2_enabled;
421
422        let mut http_server = ProxyServer::with_registry(
423            http_proxy_config,
424            self.registry.clone(),
425            self.load_balancer.clone(),
426        );
427        if let Some(ref checker) = self.network_policy_checker {
428            http_server = http_server.with_network_policy_checker(checker.clone());
429        }
430        let http_server = Arc::new(http_server);
431        info!(port = http_port, bind = %http_addr, "Starting HTTP ingress (retry-never-error)");
432        {
433            let server = http_server.clone();
434            tokio::spawn(async move {
435                if let Err(e) = server.run_with_retry(http_addr).await {
436                    // Only reached on a post-bind fatal accept-loop error; the
437                    // bind itself never errors out.
438                    warn!(port = http_port, error = %e, "HTTP ingress accept loop exited");
439                }
440            });
441        }
442        self.servers.write().await.insert(http_port, http_server);
443
444        // ---- HTTPS ingress (:443) ----
445        let Some(cert_manager) = &self.cert_manager else {
446            warn!(
447                port = https_port,
448                "Cannot start HTTPS ingress: no CertManager configured (HTTP ingress is up)"
449            );
450            return;
451        };
452
453        let sni_resolver = Arc::new(SniCertResolver::new());
454        // Load existing certificates (best-effort; log warnings on failure).
455        let _ = load_existing_certs_into_resolver(cert_manager, &sni_resolver).await;
456
457        let https_addr = SocketAddr::new(bind_ip, https_port);
458        let mut https_proxy_config = ProxyConfig::default();
459        https_proxy_config.server.https_addr = https_addr;
460
461        let mut https_server = ProxyServer::with_tls_resolver(
462            https_proxy_config,
463            self.registry.clone(),
464            self.load_balancer.clone(),
465            sni_resolver,
466        )
467        .with_cert_manager(Arc::clone(cert_manager));
468        if let Some(ref checker) = self.network_policy_checker {
469            https_server = https_server.with_network_policy_checker(checker.clone());
470        }
471        let https_server = Arc::new(https_server);
472        info!(port = https_port, bind = %https_addr, "Starting HTTPS ingress (retry-never-error)");
473        {
474            let server = https_server.clone();
475            tokio::spawn(async move {
476                if let Err(e) = server.run_https_with_retry(https_addr).await {
477                    warn!(port = https_port, error = %e, "HTTPS ingress accept loop exited");
478                }
479            });
480        }
481        self.servers.write().await.insert(https_port, https_server);
482    }
483
484    /// Stop all proxy servers on all ports.
485    ///
486    /// After signalling each server to shut down, waits up to 30 seconds for
487    /// active connections to drain before returning.
488    pub async fn stop(&self) {
489        let mut servers = self.servers.write().await;
490        for (port, server) in servers.drain() {
491            info!(port = port, "Stopping proxy on port");
492            server.shutdown();
493        }
494
495        // Wait up to 30s for active connections to drain
496        let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
497        while self.active_connections.load(Ordering::Relaxed) > 0 {
498            if tokio::time::Instant::now() >= deadline {
499                let remaining = self.active_connections.load(Ordering::Relaxed);
500                warn!(
501                    remaining = remaining,
502                    "Drain timeout reached, forcing shutdown"
503                );
504                break;
505            }
506            tokio::time::sleep(Duration::from_millis(100)).await;
507        }
508
509        info!("All proxy servers stopped");
510    }
511
512    /// Remove and shut down the listener on a specific port.
513    pub async fn unbind(&self, port: u16) {
514        let mut servers = self.servers.write().await;
515        if let Some(server) = servers.remove(&port) {
516            info!(port = port, "Unbinding proxy from port");
517            server.shutdown();
518        }
519    }
520
521    /// Scan a service's endpoints and ensure the proxy is listening on all
522    /// required ports.
523    ///
524    /// - **HTTP/HTTPS/WebSocket** endpoints start an HTTP proxy listener.
525    /// - **TCP** endpoints bind a `TcpListener` and spawn a `TcpStreamService`.
526    /// - **UDP** endpoints bind a `UdpSocket` and spawn a `UdpStreamService`.
527    ///
528    /// Bind address is determined by the `expose` type:
529    /// - **Public** endpoints bind to `0.0.0.0` (all interfaces).
530    /// - **Internal** endpoints bind to the overlay IP so they are only
531    ///   reachable from within the overlay network.  If no overlay is
532    ///   available, internal endpoints bind to `127.0.0.1` (localhost only).
533    ///
534    /// # Errors
535    /// Returns an error if an HTTP/HTTPS listener cannot be started.
536    pub async fn ensure_ports_for_service(
537        &self,
538        spec: &ServiceSpec,
539        overlay_ip: Option<IpAddr>,
540    ) -> Result<()> {
541        for endpoint in &spec.endpoints {
542            let bind_ip = match endpoint.expose {
543                ExposeType::Public => IpAddr::V4(Ipv4Addr::UNSPECIFIED), // 0.0.0.0
544                ExposeType::Internal => {
545                    // Prefer overlay IP; fall back to loopback if overlay is unavailable.
546                    let ip = overlay_ip.unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST));
547                    if overlay_ip.is_none() {
548                        warn!(
549                            endpoint = %endpoint.name,
550                            port = endpoint.port,
551                            "No overlay IP available for internal endpoint; binding to 127.0.0.1"
552                        );
553                    }
554                    ip
555                }
556            };
557
558            match endpoint.protocol {
559                Protocol::Https => {
560                    // L7 TLS: start HTTPS proxy listener with SNI cert resolution
561                    self.listen_on_tls(endpoint.port, bind_ip).await?;
562                }
563                Protocol::Http | Protocol::Websocket => {
564                    // L7: start HTTP proxy listener
565                    self.listen_on(endpoint.port, bind_ip).await?;
566                }
567                Protocol::Tcp => {
568                    // L4 TCP: bind listener and spawn TcpStreamService
569                    self.ensure_tcp_listener(endpoint.port, bind_ip).await;
570                }
571                Protocol::Udp => {
572                    // L4 UDP: bind socket and spawn UdpStreamService
573                    self.ensure_udp_listener(endpoint.port, bind_ip).await;
574                }
575            }
576        }
577        Ok(())
578    }
579
580    /// Ensure a TCP stream listener is running on the given port.
581    ///
582    /// If a listener is already active on this port, this is a no-op.
583    /// Requires `stream_registry` to be configured; logs a warning if not.
584    async fn ensure_tcp_listener(&self, port: u16, bind_ip: IpAddr) {
585        // Check if already listening
586        {
587            let listeners = self.tcp_listeners.read().await;
588            if listeners.contains(&port) {
589                debug!(port = port, "TCP stream listener already active");
590                return;
591            }
592        }
593
594        let registry = if let Some(r) = &self.stream_registry {
595            Arc::clone(r)
596        } else {
597            warn!(
598                port = port,
599                "Cannot start TCP listener: StreamRegistry not configured"
600            );
601            return;
602        };
603
604        let addr = SocketAddr::new(bind_ip, port);
605        let listener = match tokio::net::TcpListener::bind(addr).await {
606            Ok(l) => l,
607            Err(e) => {
608                warn!(
609                    port = port,
610                    bind = %addr,
611                    error = %e,
612                    "Failed to bind TCP stream listener, continuing"
613                );
614                return;
615            }
616        };
617
618        // Mark as active before spawning
619        {
620            let mut listeners = self.tcp_listeners.write().await;
621            listeners.insert(port);
622        }
623
624        let tcp_service = Arc::new(TcpStreamService::new(registry, port));
625        tokio::spawn(async move {
626            tcp_service.serve(listener).await;
627        });
628
629        info!(port = port, bind = %addr, "TCP stream proxy listening");
630    }
631
632    /// Ensure a UDP stream listener is running on the given port.
633    ///
634    /// If a listener is already active on this port, this is a no-op.
635    /// Requires `stream_registry` to be configured; logs a warning if not.
636    async fn ensure_udp_listener(&self, port: u16, bind_ip: IpAddr) {
637        // Check if already listening
638        {
639            let listeners = self.udp_listeners.read().await;
640            if listeners.contains(&port) {
641                debug!(port = port, "UDP stream listener already active");
642                return;
643            }
644        }
645
646        let registry = if let Some(r) = &self.stream_registry {
647            Arc::clone(r)
648        } else {
649            warn!(
650                port = port,
651                "Cannot start UDP listener: StreamRegistry not configured"
652            );
653            return;
654        };
655
656        let addr = SocketAddr::new(bind_ip, port);
657        let socket = match tokio::net::UdpSocket::bind(addr).await {
658            Ok(s) => s,
659            Err(e) => {
660                warn!(
661                    port = port,
662                    bind = %addr,
663                    error = %e,
664                    "Failed to bind UDP stream listener, continuing"
665                );
666                return;
667            }
668        };
669
670        // Mark as active before spawning
671        {
672            let mut listeners = self.udp_listeners.write().await;
673            listeners.insert(port);
674        }
675
676        let udp_service = Arc::new(UdpStreamService::new(registry, port, None));
677        tokio::spawn(async move {
678            if let Err(e) = udp_service.serve(socket).await {
679                tracing::error!(
680                    port = port,
681                    error = %e,
682                    "UDP stream proxy service failed"
683                );
684            }
685        });
686
687        info!(port = port, bind = %addr, "UDP stream proxy listening");
688    }
689
690    /// Publish a single container's exposed ports on the node loopback
691    /// (`127.0.0.1:<endpoint.port>`), forwarding to wherever the container
692    /// actually listens.
693    ///
694    /// This implements the GitHub-Actions "service published to localhost"
695    /// convention so a consumer sharing the node loopback can reach the
696    /// service at `localhost:<port>`. The published port is always
697    /// `endpoint.port`; the backend the listener forwards to is
698    /// `(container_ip, port_override.unwrap_or(endpoint.target_port()))`,
699    /// which is already runtime-resolved by the caller:
700    ///
701    /// - On the macOS seatbelt/libkrun runtimes every replica shares the host
702    ///   `127.0.0.1` and gets a unique `port_override`, so the container
703    ///   listens on `127.0.0.1:<port_override>` and we forward there.
704    /// - On Linux/VZ/HCS the container listens on its overlay IP, so
705    ///   `container_ip` is the overlay address and `port_override` is `None`,
706    ///   forwarding to `overlay_ip:<target_port>`.
707    ///
708    /// Backends accumulate across replicas so multiple members round-robin
709    /// behind the single loopback port. `Public` endpoints are skipped: they
710    /// are already bound on `0.0.0.0` and therefore already reachable on
711    /// loopback — binding `127.0.0.1:<port>` again would fail with
712    /// `EADDRINUSE`.
713    ///
714    /// This NEVER rewrites a container's own loopback: it only binds the
715    /// NODE's `127.0.0.1` and forwards to the container's runtime-resolved
716    /// address.
717    ///
718    /// Bind failures are tolerated (logged at `warn!`); this never panics on
719    /// them.
720    ///
721    /// A published host port (`127.0.0.1:<port>`) is a GLOBAL host resource and
722    /// is OWNED by the first `(deployment, service)` to publish it. A second
723    /// publish for the SAME `(deployment, service)` appends a replica backend
724    /// (legitimate scale-up). A publish for a DIFFERENT `(deployment, service)`
725    /// is REFUSED with [`AgentError::PortConflict`] rather than silently
726    /// appended into the foreign pool (Bug 7: that would make `:<port>` serve
727    /// the wrong deployment's backends). On a conflict for any endpoint this
728    /// returns `Err` after having published the conflict-free endpoints up to
729    /// that point.
730    ///
731    /// `deployment` is the owning deployment name (`Some`) or `None` for
732    /// standalone callers.
733    ///
734    /// # Errors
735    /// Returns [`AgentError::PortConflict`] when a published port is already
736    /// owned by a different `(deployment, service)`.
737    pub async fn publish_loopback_for_container(
738        &self,
739        deployment: Option<&str>,
740        service_name: &str,
741        spec: &ServiceSpec,
742        container_ip: IpAddr,
743        port_override: Option<u16>,
744    ) -> Result<()> {
745        for endpoint in &spec.endpoints {
746            // Public endpoints already bind 0.0.0.0 -> already on loopback.
747            if matches!(endpoint.expose, ExposeType::Public) {
748                continue;
749            }
750
751            let backend = SocketAddr::new(
752                container_ip,
753                port_override.unwrap_or_else(|| endpoint.target_port()),
754            );
755            let publish_port = endpoint.port;
756
757            // Enforce host-port ownership before touching any registry.
758            self.claim_published_port(deployment, service_name, publish_port)
759                .await?;
760
761            match endpoint.protocol {
762                Protocol::Tcp | Protocol::Http | Protocol::Https | Protocol::Websocket => {
763                    // A raw TCP forward carries HTTP/HTTPS/WS just fine, so
764                    // all L7 protocols ride the loopback TCP path.
765                    self.publish_loopback_tcp(service_name, publish_port, backend)
766                        .await;
767                }
768                Protocol::Udp => {
769                    self.publish_loopback_udp(service_name, publish_port, backend)
770                        .await;
771                }
772            }
773        }
774        Ok(())
775    }
776
777    /// Claim ownership of host port `publish_port` for `(deployment, service)`.
778    ///
779    /// - Unowned → claim it and return `Ok`.
780    /// - Owned by the SAME `(deployment, service)` → return `Ok` (scale-up).
781    /// - Owned by a DIFFERENT `(deployment, service)` → return
782    ///   [`AgentError::PortConflict`] (refuse the cross-wire).
783    async fn claim_published_port(
784        &self,
785        deployment: Option<&str>,
786        service_name: &str,
787        publish_port: u16,
788    ) -> Result<()> {
789        let mut owners = self.published_ports.write().await;
790        if let Some((owner_dep, owner_svc)) = owners.get(&publish_port) {
791            if owner_dep.as_deref() == deployment && owner_svc == service_name {
792                // Same owner: legitimate scale-up / re-publish.
793                return Ok(());
794            }
795            let owner = format!("{}/{}", owner_dep.as_deref().unwrap_or("_"), owner_svc);
796            let requester = format!("{}/{}", deployment.unwrap_or("_"), service_name);
797            warn!(
798                port = publish_port,
799                owner = %owner,
800                requester = %requester,
801                "Refusing to publish host port already owned by a different deployment/service (would cross-wire backends)"
802            );
803            return Err(crate::error::AgentError::PortConflict {
804                port: publish_port,
805                owner,
806                requester,
807            });
808        }
809        owners.insert(
810            publish_port,
811            (deployment.map(str::to_string), service_name.to_string()),
812        );
813        Ok(())
814    }
815
816    /// Register `backend` for the loopback TCP listener on `publish_port`,
817    /// binding `127.0.0.1:<publish_port>` if it is not already bound.
818    async fn publish_loopback_tcp(
819        &self,
820        service_name: &str,
821        publish_port: u16,
822        backend: SocketAddr,
823    ) {
824        // Accumulate the backend in the loopback registry.
825        if let Some(existing) = self.loopback_registry.resolve_tcp(publish_port) {
826            let mut backends = existing.backends;
827            if !backends.contains(&backend) {
828                backends.push(backend);
829            }
830            self.loopback_registry
831                .update_tcp_backends(publish_port, backends);
832        } else {
833            self.loopback_registry.register_tcp(
834                publish_port,
835                StreamService::new(service_name.to_string(), vec![backend]),
836            );
837        }
838
839        // Bind the loopback listener once per port.
840        let mut listeners = self.loopback_tcp.write().await;
841        if listeners.contains_key(&publish_port) {
842            debug!(port = publish_port, "Loopback TCP listener already active");
843            return;
844        }
845
846        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), publish_port);
847        let listener = match tokio::net::TcpListener::bind(addr).await {
848            Ok(l) => l,
849            Err(e) => {
850                warn!(
851                    port = publish_port,
852                    bind = %addr,
853                    error = %e,
854                    "Failed to bind loopback TCP listener, continuing"
855                );
856                return;
857            }
858        };
859
860        let tcp_service = Arc::new(TcpStreamService::new(
861            Arc::clone(&self.loopback_registry),
862            publish_port,
863        ));
864        let handle = tokio::spawn(async move {
865            tcp_service.serve(listener).await;
866        });
867        listeners.insert(publish_port, handle);
868        drop(listeners);
869
870        info!(
871            service = service_name,
872            port = publish_port,
873            bind = %addr,
874            backend = %backend,
875            "Published service port on node loopback (TCP)"
876        );
877    }
878
879    /// Register `backend` for the loopback UDP listener on `publish_port`,
880    /// binding `127.0.0.1:<publish_port>` if it is not already bound.
881    async fn publish_loopback_udp(
882        &self,
883        service_name: &str,
884        publish_port: u16,
885        backend: SocketAddr,
886    ) {
887        if let Some(existing) = self.loopback_registry.resolve_udp(publish_port) {
888            let mut backends = existing.backends;
889            if !backends.contains(&backend) {
890                backends.push(backend);
891            }
892            self.loopback_registry
893                .update_udp_backends(publish_port, backends);
894        } else {
895            self.loopback_registry.register_udp(
896                publish_port,
897                StreamService::new(service_name.to_string(), vec![backend]),
898            );
899        }
900
901        let mut listeners = self.loopback_udp.write().await;
902        if listeners.contains_key(&publish_port) {
903            debug!(port = publish_port, "Loopback UDP listener already active");
904            return;
905        }
906
907        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), publish_port);
908        let socket = match tokio::net::UdpSocket::bind(addr).await {
909            Ok(s) => s,
910            Err(e) => {
911                warn!(
912                    port = publish_port,
913                    bind = %addr,
914                    error = %e,
915                    "Failed to bind loopback UDP listener, continuing"
916                );
917                return;
918            }
919        };
920
921        let udp_service = Arc::new(UdpStreamService::new(
922            Arc::clone(&self.loopback_registry),
923            publish_port,
924            None,
925        ));
926        let handle = tokio::spawn(async move {
927            if let Err(e) = udp_service.serve(socket).await {
928                tracing::error!(
929                    port = publish_port,
930                    error = %e,
931                    "Loopback UDP stream proxy service failed"
932                );
933            }
934        });
935        listeners.insert(publish_port, handle);
936        drop(listeners);
937
938        info!(
939            service = service_name,
940            port = publish_port,
941            bind = %addr,
942            backend = %backend,
943            "Published service port on node loopback (UDP)"
944        );
945    }
946
947    /// Remove a single container's backend from the node-loopback publish
948    /// path. Mirrors [`Self::publish_loopback_for_container`]: it recomputes
949    /// the same `(container_ip, port_override.unwrap_or(target_port))` backend
950    /// per endpoint and drops it from the loopback registry.
951    ///
952    /// When a published port's backend set becomes empty, the registry entry
953    /// is unregistered and the loopback listener is forgotten so the port is
954    /// freed for the next bind. `Public` endpoints are skipped (they were
955    /// never published here).
956    pub async fn unpublish_loopback_for_container(
957        &self,
958        spec: &ServiceSpec,
959        container_ip: IpAddr,
960        port_override: Option<u16>,
961    ) {
962        for endpoint in &spec.endpoints {
963            if matches!(endpoint.expose, ExposeType::Public) {
964                continue;
965            }
966
967            let backend = SocketAddr::new(
968                container_ip,
969                port_override.unwrap_or_else(|| endpoint.target_port()),
970            );
971            let publish_port = endpoint.port;
972
973            match endpoint.protocol {
974                Protocol::Tcp | Protocol::Http | Protocol::Https | Protocol::Websocket => {
975                    self.unpublish_loopback_tcp(publish_port, backend).await;
976                }
977                Protocol::Udp => {
978                    self.unpublish_loopback_udp(publish_port, backend).await;
979                }
980            }
981        }
982    }
983
984    /// Drop `backend` from the loopback TCP service on `publish_port`,
985    /// freeing the listener when no backends remain.
986    async fn unpublish_loopback_tcp(&self, publish_port: u16, backend: SocketAddr) {
987        let Some(existing) = self.loopback_registry.resolve_tcp(publish_port) else {
988            return;
989        };
990        let remaining: Vec<SocketAddr> = existing
991            .backends
992            .into_iter()
993            .filter(|b| *b != backend)
994            .collect();
995
996        if remaining.is_empty() {
997            let _ = self.loopback_registry.unregister_tcp(publish_port);
998            let mut listeners = self.loopback_tcp.write().await;
999            if let Some(handle) = listeners.remove(&publish_port) {
1000                handle.abort();
1001            }
1002            // Release host-port ownership so a different (deployment, service)
1003            // may bind it next.
1004            self.published_ports.write().await.remove(&publish_port);
1005            debug!(
1006                port = publish_port,
1007                "Freed loopback TCP listener (no backends remain)"
1008            );
1009        } else {
1010            self.loopback_registry
1011                .update_tcp_backends(publish_port, remaining);
1012        }
1013    }
1014
1015    /// Drop `backend` from the loopback UDP service on `publish_port`,
1016    /// freeing the listener when no backends remain.
1017    async fn unpublish_loopback_udp(&self, publish_port: u16, backend: SocketAddr) {
1018        let Some(existing) = self.loopback_registry.resolve_udp(publish_port) else {
1019            return;
1020        };
1021        let remaining: Vec<SocketAddr> = existing
1022            .backends
1023            .into_iter()
1024            .filter(|b| *b != backend)
1025            .collect();
1026
1027        if remaining.is_empty() {
1028            let _ = self.loopback_registry.unregister_udp(publish_port);
1029            let mut listeners = self.loopback_udp.write().await;
1030            if let Some(handle) = listeners.remove(&publish_port) {
1031                handle.abort();
1032            }
1033            // Release host-port ownership so a different (deployment, service)
1034            // may bind it next.
1035            self.published_ports.write().await.remove(&publish_port);
1036            debug!(
1037                port = publish_port,
1038                "Freed loopback UDP listener (no backends remain)"
1039            );
1040        } else {
1041            self.loopback_registry
1042                .update_udp_backends(publish_port, remaining);
1043        }
1044    }
1045
1046    /// Add routes for a service based on its specification
1047    ///
1048    /// This creates proxy routes for each endpoint defined in the `ServiceSpec`.
1049    /// HTTP/HTTPS/WebSocket endpoints get L7 routes via the `ServiceRegistry`.
1050    /// TCP/UDP endpoints are tracked but their L4 registration is handled
1051    /// by the `ServiceManager::register_service_routes()` method.
1052    ///
1053    /// The owning `deployment` (from `ServiceSpec.deployment`) scopes the LB
1054    /// group keys so two deployments that share a `service`+`endpoint` name
1055    /// keep independent backend pools (Bug 7). `None` for standalone /
1056    /// single-deployment callers.
1057    pub async fn add_service(&self, name: &str, spec: &ServiceSpec) {
1058        let deployment = spec.deployment.as_deref();
1059        let mut services = self.services.write().await;
1060
1061        // Track which endpoints and ports we're adding
1062        let mut endpoint_names = Vec::new();
1063        let mut tcp_ports = Vec::new();
1064        let mut udp_ports = Vec::new();
1065        let mut http_ports = Vec::new();
1066
1067        for endpoint in &spec.endpoints {
1068            match endpoint.protocol {
1069                Protocol::Http | Protocol::Https | Protocol::Websocket => {
1070                    // L7: register route in the ServiceRegistry
1071                    let entry = RouteEntry::from_endpoint(deployment, name, endpoint);
1072                    self.registry.register(entry).await;
1073                    http_ports.push(endpoint.port);
1074
1075                    // Register one LB group per L7 endpoint, keyed by the
1076                    // deployment-scoped composite
1077                    // `{deployment}/{service}#{endpoint}`. This matches the
1078                    // `resolved.name` set by `RouteEntry::from_endpoint` and
1079                    // is required so that (a) different endpoints on the same
1080                    // service (potentially with different `target_role`
1081                    // filters) maintain independent backend pools, and (b)
1082                    // two deployments sharing a service+endpoint name do not
1083                    // cross-wire into one pool.
1084                    let lb_key = endpoint_lb_key(deployment, name, &endpoint.name);
1085                    self.load_balancer
1086                        .register(&lb_key, vec![], LbStrategy::RoundRobin);
1087
1088                    info!(
1089                        service = name,
1090                        endpoint = %endpoint.name,
1091                        protocol = ?endpoint.protocol,
1092                        path = ?endpoint.path,
1093                        expose = ?endpoint.expose,
1094                        "Added HTTP proxy route for service"
1095                    );
1096                }
1097                Protocol::Tcp => {
1098                    tcp_ports.push(endpoint.port);
1099                    info!(
1100                        service = name,
1101                        endpoint = %endpoint.name,
1102                        protocol = ?endpoint.protocol,
1103                        port = endpoint.port,
1104                        expose = ?endpoint.expose,
1105                        "Tracking TCP stream endpoint for service"
1106                    );
1107                }
1108                Protocol::Udp => {
1109                    udp_ports.push(endpoint.port);
1110                    info!(
1111                        service = name,
1112                        endpoint = %endpoint.name,
1113                        protocol = ?endpoint.protocol,
1114                        port = endpoint.port,
1115                        expose = ?endpoint.expose,
1116                        "Tracking UDP stream endpoint for service"
1117                    );
1118                }
1119            }
1120
1121            endpoint_names.push(endpoint.name.clone());
1122        }
1123
1124        // Register a service-level LB group as well so legacy callers that
1125        // use `update_backends(service, ...)` (which fans out to all
1126        // endpoints) and any code that selects by bare service name still
1127        // resolve. Per-endpoint LB groups (registered above) are the
1128        // primary source for L7 select; this is a no-op for callers that
1129        // already use composite keys.
1130        self.load_balancer
1131            .register(name, vec![], LbStrategy::RoundRobin);
1132
1133        services.insert(
1134            name.to_string(),
1135            ServiceTracking {
1136                deployment: deployment.map(str::to_string),
1137                endpoint_names,
1138                tcp_ports,
1139                udp_ports,
1140                http_ports,
1141            },
1142        );
1143    }
1144
1145    /// Remove all routes, L4 listeners, and HTTP server handles for a service.
1146    ///
1147    /// This performs a full cleanup of all proxy resources associated with the
1148    /// service:
1149    /// - Removes L7 (HTTP/HTTPS/WebSocket) routes from the `ServiceRegistry`
1150    /// - Unregisters TCP/UDP stream services from the `StreamRegistry`
1151    /// - Removes port tracking for TCP/UDP listeners
1152    /// - Shuts down HTTP proxy server handles that were exclusively owned by
1153    ///   this service (only if no other service uses the same port)
1154    pub async fn remove_service(&self, name: &str) {
1155        let mut services = self.services.write().await;
1156
1157        if let Some(tracking) = services.remove(name) {
1158            // 1. Remove L7 routes from the ServiceRegistry
1159            self.registry.unregister_service(name).await;
1160
1161            // 1b. Remove from the load balancer (both the service-level
1162            //     group and every per-endpoint composite group). The
1163            //     per-endpoint keys are deployment-scoped to match the keys
1164            //     registered in `add_service`.
1165            self.load_balancer.unregister(name);
1166            let deployment = tracking.deployment.as_deref();
1167            for endpoint_name in &tracking.endpoint_names {
1168                let lb_key = endpoint_lb_key(deployment, name, endpoint_name);
1169                self.load_balancer.unregister(&lb_key);
1170            }
1171
1172            // 2. Unregister TCP stream services and clear port tracking
1173            if !tracking.tcp_ports.is_empty() {
1174                let mut tcp_set = self.tcp_listeners.write().await;
1175                for port in &tracking.tcp_ports {
1176                    if let Some(registry) = &self.stream_registry {
1177                        let _ = registry.unregister_tcp(*port);
1178                    }
1179                    tcp_set.remove(port);
1180                    debug!(service = name, port = port, "Removed TCP listener tracking");
1181                }
1182            }
1183
1184            // 3. Unregister UDP stream services and clear port tracking
1185            if !tracking.udp_ports.is_empty() {
1186                let mut udp_set = self.udp_listeners.write().await;
1187                for port in &tracking.udp_ports {
1188                    if let Some(registry) = &self.stream_registry {
1189                        let _ = registry.unregister_udp(*port);
1190                    }
1191                    udp_set.remove(port);
1192                    debug!(service = name, port = port, "Removed UDP listener tracking");
1193                }
1194            }
1195
1196            // 4. Shut down HTTP proxy servers on ports exclusively owned by
1197            //    this service (skip ports still used by other services)
1198            if !tracking.http_ports.is_empty() {
1199                let ports_still_in_use: HashSet<u16> = services
1200                    .values()
1201                    .flat_map(|t| t.http_ports.iter().copied())
1202                    .collect();
1203
1204                let mut servers = self.servers.write().await;
1205                for port in &tracking.http_ports {
1206                    if !ports_still_in_use.contains(port) {
1207                        if let Some(server) = servers.remove(port) {
1208                            server.shutdown();
1209                            info!(
1210                                service = name,
1211                                port = port,
1212                                "Shut down HTTP proxy server (no remaining services on port)"
1213                            );
1214                        }
1215                    }
1216                }
1217            }
1218
1219            info!(service = name, "Removed all proxy resources for service");
1220        }
1221    }
1222
1223    /// Add a single backend to a service.
1224    ///
1225    /// Adds to the service-level LB group **and** to every per-endpoint LB
1226    /// group tracked for `service`. Per-endpoint role filtering happens at
1227    /// collection time in the agent's service manager, so any backend
1228    /// surfaced here is already eligible for every endpoint.
1229    pub async fn add_backend(&self, service: &str, addr: SocketAddr) {
1230        self.registry.add_backend(service, addr).await;
1231        self.load_balancer.add_backend(service, addr);
1232        // Fan out to every per-endpoint LB group for backward-compat.
1233        let services = self.services.read().await;
1234        if let Some(tracking) = services.get(service) {
1235            let deployment = tracking.deployment.as_deref();
1236            for endpoint_name in &tracking.endpoint_names {
1237                let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
1238                self.load_balancer.add_backend(&lb_key, addr);
1239            }
1240        }
1241        info!(service = service, backend = %addr, "Registered backend with proxy");
1242    }
1243
1244    /// Remove a backend from a service.
1245    ///
1246    /// Removes from the service-level LB group **and** from every
1247    /// per-endpoint LB group.
1248    pub async fn remove_backend(&self, service: &str, addr: SocketAddr) {
1249        self.registry.remove_backend(service, addr).await;
1250        self.load_balancer.remove_backend(service, &addr);
1251        let services = self.services.read().await;
1252        if let Some(tracking) = services.get(service) {
1253            let deployment = tracking.deployment.as_deref();
1254            for endpoint_name in &tracking.endpoint_names {
1255                let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
1256                self.load_balancer.remove_backend(&lb_key, &addr);
1257            }
1258        }
1259        debug!(service = service, backend = %addr, "Removed backend from service");
1260    }
1261
1262    /// Update the health status of a backend in the load balancer.
1263    ///
1264    /// Delegates to [`LoadBalancer::mark_health`] so that unhealthy backends
1265    /// are skipped during selection. Health is tracked on both the
1266    /// service-level group and every per-endpoint group that contains
1267    /// this address.
1268    #[allow(clippy::unused_async)]
1269    pub async fn update_backend_health(&self, service: &str, addr: SocketAddr, healthy: bool) {
1270        self.load_balancer.mark_health(service, &addr, healthy);
1271        let services = self.services.read().await;
1272        if let Some(tracking) = services.get(service) {
1273            let deployment = tracking.deployment.as_deref();
1274            for endpoint_name in &tracking.endpoint_names {
1275                let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
1276                self.load_balancer.mark_health(&lb_key, &addr, healthy);
1277            }
1278        }
1279        debug!(
1280            service = service,
1281            backend = %addr,
1282            healthy = healthy,
1283            "Updated backend health in load balancer"
1284        );
1285    }
1286
1287    /// Update the backends for **every** endpoint of a service with the
1288    /// same list.
1289    ///
1290    /// Use this only when caller cannot distinguish per-endpoint backend
1291    /// sets (e.g., legacy paths that do not honor `target_role`). Prefer
1292    /// [`Self::update_endpoint_backends`] when per-endpoint filtering is
1293    /// possible.
1294    pub async fn update_backends(&self, service: &str, addrs: Vec<SocketAddr>) {
1295        self.registry.update_backends(service, addrs.clone()).await;
1296        // Update the service-level LB group plus every per-endpoint group.
1297        self.load_balancer.update_backends(service, addrs.clone());
1298        let services = self.services.read().await;
1299        if let Some(tracking) = services.get(service) {
1300            let deployment = tracking.deployment.as_deref();
1301            for endpoint_name in &tracking.endpoint_names {
1302                let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
1303                self.load_balancer.update_backends(&lb_key, addrs.clone());
1304            }
1305        }
1306        debug!(service = service, "Updated backends for service");
1307    }
1308
1309    /// Update backends for a single L7 endpoint of a service.
1310    ///
1311    /// This honors [`EndpointSpec::target_role`] filtering: the caller
1312    /// supplies the role-filtered backend list and this method updates
1313    /// only the routes and LB group corresponding to `(service,
1314    /// endpoint_name)`.
1315    pub async fn update_endpoint_backends(
1316        &self,
1317        service: &str,
1318        endpoint_name: &str,
1319        addrs: Vec<SocketAddr>,
1320    ) {
1321        self.registry
1322            .update_backends_for_endpoint(service, endpoint_name, addrs.clone())
1323            .await;
1324        // Resolve the owning deployment so the LB key matches what
1325        // `add_service` registered.
1326        let deployment = {
1327            let services = self.services.read().await;
1328            services.get(service).and_then(|t| t.deployment.clone())
1329        };
1330        let lb_key = endpoint_lb_key(deployment.as_deref(), service, endpoint_name);
1331        self.load_balancer.update_backends(&lb_key, addrs);
1332        debug!(
1333            service = service,
1334            endpoint = endpoint_name,
1335            "Updated backends for service endpoint"
1336        );
1337    }
1338
1339    /// Get the number of registered routes
1340    pub async fn route_count(&self) -> usize {
1341        self.registry.route_count().await
1342    }
1343
1344    /// Get the list of registered service names
1345    pub async fn list_services(&self) -> Vec<String> {
1346        self.services.read().await.keys().cloned().collect()
1347    }
1348
1349    /// Check if a service has any registered endpoints
1350    pub async fn has_service(&self, name: &str) -> bool {
1351        self.services.read().await.contains_key(name)
1352    }
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357    use super::*;
1358
1359    fn mock_service_spec_with_endpoints() -> ServiceSpec {
1360        use zlayer_spec::*;
1361        serde_yaml::from_str::<DeploymentSpec>(
1362            r"
1363version: v1
1364deployment: test
1365services:
1366  test:
1367    rtype: service
1368    image:
1369      name: test:latest
1370    endpoints:
1371      - name: http
1372        protocol: http
1373        port: 8080
1374        path: /api
1375        expose: public
1376      - name: websocket
1377        protocol: websocket
1378        port: 8081
1379        path: /ws
1380        expose: internal
1381",
1382        )
1383        .unwrap()
1384        .services
1385        .remove("test")
1386        .unwrap()
1387    }
1388
1389    fn mock_service_spec_tcp_only() -> ServiceSpec {
1390        mock_service_spec_tcp_only_port(9000)
1391    }
1392
1393    fn mock_service_spec_tcp_only_port(port: u16) -> ServiceSpec {
1394        use zlayer_spec::*;
1395        let yaml = format!(
1396            "
1397version: v1
1398deployment: test
1399services:
1400  test:
1401    rtype: service
1402    image:
1403      name: test:latest
1404    endpoints:
1405      - name: grpc
1406        protocol: tcp
1407        port: {port}
1408"
1409        );
1410        serde_yaml::from_str::<DeploymentSpec>(&yaml)
1411            .unwrap()
1412            .services
1413            .remove("test")
1414            .unwrap()
1415    }
1416
1417    /// Reserve an unused localhost TCP port by binding a listener on `:0`,
1418    /// reading the assigned port, and dropping the listener.
1419    ///
1420    /// There is an inherent race between dropping the listener and the test
1421    /// re-binding the port, but this is dramatically more reliable than
1422    /// hard-coding a port (e.g., 9000) which is commonly in use on dev
1423    /// machines (php-fpm, the running zlayer daemon, etc.).
1424    fn reserve_free_tcp_port() -> u16 {
1425        let listener =
1426            std::net::TcpListener::bind("127.0.0.1:0").expect("failed to bind ephemeral test port");
1427        listener.local_addr().unwrap().port()
1428    }
1429
1430    #[tokio::test]
1431    async fn test_proxy_manager_new() {
1432        let config = ProxyManagerConfig::default();
1433        let registry = Arc::new(ServiceRegistry::new());
1434        let manager = ProxyManager::new(config, registry, None);
1435
1436        assert_eq!(manager.route_count().await, 0);
1437        assert!(manager.list_services().await.is_empty());
1438    }
1439
1440    #[tokio::test]
1441    async fn test_add_service_with_http_endpoints() {
1442        let config = ProxyManagerConfig::default();
1443        let registry = Arc::new(ServiceRegistry::new());
1444        let manager = ProxyManager::new(config, registry, None);
1445
1446        let spec = mock_service_spec_with_endpoints();
1447        manager.add_service("api", &spec).await;
1448
1449        // Should have 2 routes (http and websocket)
1450        assert_eq!(manager.route_count().await, 2);
1451        assert!(manager.has_service("api").await);
1452    }
1453
1454    #[tokio::test]
1455    async fn test_tcp_endpoints_tracked_not_routed() {
1456        let config = ProxyManagerConfig::default();
1457        let registry = Arc::new(ServiceRegistry::new());
1458        let manager = ProxyManager::new(config, registry, None);
1459
1460        let spec = mock_service_spec_tcp_only();
1461        manager.add_service("grpc-service", &spec).await;
1462
1463        // TCP endpoints don't add HTTP routes
1464        assert_eq!(manager.route_count().await, 0);
1465        // But the service is still tracked with its endpoint name
1466        assert!(manager.has_service("grpc-service").await);
1467    }
1468
1469    #[tokio::test]
1470    async fn test_remove_service() {
1471        let config = ProxyManagerConfig::default();
1472        let registry = Arc::new(ServiceRegistry::new());
1473        let manager = ProxyManager::new(config, registry, None);
1474
1475        let spec = mock_service_spec_with_endpoints();
1476        manager.add_service("api", &spec).await;
1477        assert_eq!(manager.route_count().await, 2);
1478
1479        manager.remove_service("api").await;
1480        assert_eq!(manager.route_count().await, 0);
1481        assert!(!manager.has_service("api").await);
1482    }
1483
1484    #[tokio::test]
1485    async fn test_backend_management() {
1486        let config = ProxyManagerConfig::default();
1487        let registry = Arc::new(ServiceRegistry::new());
1488        let manager = ProxyManager::new(config, registry.clone(), None);
1489
1490        let spec = mock_service_spec_with_endpoints();
1491        manager.add_service("api", &spec).await;
1492
1493        // Add backends
1494        let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
1495        let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();
1496
1497        manager.add_backend("api", addr1).await;
1498        manager.add_backend("api", addr2).await;
1499
1500        // Verify backends via the registry's resolve
1501        let resolved = registry.resolve(None, "/api").await.unwrap();
1502        assert_eq!(resolved.backends.len(), 2);
1503
1504        // Remove a backend
1505        manager.remove_backend("api", addr1).await;
1506        let resolved = registry.resolve(None, "/api").await.unwrap();
1507        assert_eq!(resolved.backends.len(), 1);
1508    }
1509
1510    #[tokio::test]
1511    async fn test_update_backends_replaces_all() {
1512        let config = ProxyManagerConfig::default();
1513        let registry = Arc::new(ServiceRegistry::new());
1514        let manager = ProxyManager::new(config, registry.clone(), None);
1515
1516        let spec = mock_service_spec_with_endpoints();
1517        manager.add_service("api", &spec).await;
1518
1519        // Add initial backend
1520        let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
1521        manager.add_backend("api", addr1).await;
1522
1523        // Update with new backends (replaces)
1524        let new_backends: Vec<SocketAddr> = vec![
1525            "127.0.0.1:9000".parse().unwrap(),
1526            "127.0.0.1:9001".parse().unwrap(),
1527            "127.0.0.1:9002".parse().unwrap(),
1528        ];
1529        manager.update_backends("api", new_backends).await;
1530
1531        let resolved = registry.resolve(None, "/api").await.unwrap();
1532        assert_eq!(resolved.backends.len(), 3);
1533    }
1534
1535    #[tokio::test]
1536    async fn test_config_builder() {
1537        let config = ProxyManagerConfig::new("0.0.0.0:8080".parse().unwrap())
1538            .with_https("0.0.0.0:8443".parse().unwrap())
1539            .with_http2(false);
1540
1541        assert_eq!(
1542            config.http_addr,
1543            "0.0.0.0:8080".parse::<SocketAddr>().unwrap()
1544        );
1545        assert_eq!(
1546            config.https_addr,
1547            Some("0.0.0.0:8443".parse::<SocketAddr>().unwrap())
1548        );
1549        assert!(!config.http2_enabled);
1550    }
1551
1552    /// Test that `ensure_ports_for_service` correctly differentiates
1553    /// Public (0.0.0.0) vs Internal (overlay or 127.0.0.1) bind addresses.
1554    /// We can't actually bind in unit tests, but we verify the function
1555    /// processes both endpoint types without error.
1556    #[tokio::test]
1557    async fn test_ensure_ports_differentiates_public_and_internal() {
1558        let config = ProxyManagerConfig::default();
1559        let registry = Arc::new(ServiceRegistry::new());
1560        let manager = ProxyManager::new(config, registry, None);
1561
1562        let spec = mock_service_spec_with_endpoints();
1563        // Passing None for overlay_ip: internal endpoints should fall back to 127.0.0.1
1564        let result = manager.ensure_ports_for_service(&spec, None).await;
1565        // listen_on may fail because we can't actually bind in tests, but
1566        // the function itself should run without panicking.
1567        let _ = result;
1568    }
1569
1570    #[tokio::test]
1571    async fn test_ensure_ports_with_overlay_ip() {
1572        let config = ProxyManagerConfig::default();
1573        let registry = Arc::new(ServiceRegistry::new());
1574        let manager = ProxyManager::new(config, registry, None);
1575
1576        let spec = mock_service_spec_with_endpoints();
1577        // Pass an overlay IP -- internal endpoints should bind there
1578        let overlay_ip: IpAddr = "10.200.0.5".parse().unwrap();
1579        let result = manager
1580            .ensure_ports_for_service(&spec, Some(overlay_ip))
1581            .await;
1582        let _ = result;
1583    }
1584
1585    fn mock_mixed_service_spec() -> ServiceSpec {
1586        use zlayer_spec::*;
1587        serde_yaml::from_str::<DeploymentSpec>(
1588            r"
1589version: v1
1590deployment: test
1591services:
1592  mixed:
1593    rtype: service
1594    image:
1595      name: test:latest
1596    endpoints:
1597      - name: http
1598        protocol: http
1599        port: 8080
1600        path: /api
1601        expose: public
1602      - name: grpc
1603        protocol: tcp
1604        port: 9000
1605        expose: public
1606      - name: game
1607        protocol: udp
1608        port: 27015
1609        expose: public
1610",
1611        )
1612        .unwrap()
1613        .services
1614        .remove("mixed")
1615        .unwrap()
1616    }
1617
1618    #[tokio::test]
1619    async fn test_add_mixed_service_tracks_all_endpoints() {
1620        let config = ProxyManagerConfig::default();
1621        let registry = Arc::new(ServiceRegistry::new());
1622        let manager = ProxyManager::new(config, registry, None);
1623
1624        let spec = mock_mixed_service_spec();
1625        manager.add_service("mixed", &spec).await;
1626
1627        // Only 1 HTTP route (tcp and udp don't add HTTP routes)
1628        assert_eq!(manager.route_count().await, 1);
1629        // Service is tracked
1630        assert!(manager.has_service("mixed").await);
1631    }
1632
1633    #[tokio::test]
1634    async fn test_ensure_ports_tcp_with_stream_registry() {
1635        use zlayer_proxy::StreamService;
1636
1637        let stream_registry = Arc::new(StreamRegistry::new());
1638        let config = ProxyManagerConfig::default();
1639        let registry = Arc::new(ServiceRegistry::new());
1640        let mut manager = ProxyManager::new(config, registry, None);
1641        manager.set_stream_registry(stream_registry.clone());
1642
1643        // Use an OS-assigned free port to avoid collisions with anything
1644        // listening on the dev/CI box (e.g. php-fpm or a running zlayer
1645        // daemon both default to port 9000 on 127.0.0.1).
1646        let port = reserve_free_tcp_port();
1647        let spec = mock_service_spec_tcp_only_port(port);
1648
1649        // Register the TCP service in the stream registry first (as ServiceManager does)
1650        stream_registry.register_tcp(port, StreamService::new("grpc-service".to_string(), vec![]));
1651
1652        // Ensure ports -- should bind TCP listener
1653        let result = manager.ensure_ports_for_service(&spec, None).await;
1654        assert!(result.is_ok());
1655
1656        // Verify the TCP listener port is tracked
1657        let tcp_ports = manager.tcp_listeners.read().await;
1658        assert!(tcp_ports.contains(&port));
1659    }
1660
1661    #[tokio::test]
1662    async fn test_ensure_ports_tcp_without_stream_registry() {
1663        let config = ProxyManagerConfig::default();
1664        let registry = Arc::new(ServiceRegistry::new());
1665        let manager = ProxyManager::new(config, registry, None);
1666
1667        let spec = mock_service_spec_tcp_only();
1668
1669        // Without stream registry, ensure_ports should not fail, just warn
1670        let result = manager.ensure_ports_for_service(&spec, None).await;
1671        assert!(result.is_ok());
1672
1673        // No TCP listeners should be tracked
1674        let tcp_ports = manager.tcp_listeners.read().await;
1675        assert!(tcp_ports.is_empty());
1676    }
1677
1678    #[tokio::test]
1679    async fn test_stream_registry_setter() {
1680        let stream_registry = Arc::new(StreamRegistry::new());
1681        let config = ProxyManagerConfig::default();
1682        let registry = Arc::new(ServiceRegistry::new());
1683        let mut manager = ProxyManager::new(config, registry, None);
1684
1685        assert!(manager.stream_registry().is_none());
1686        manager.set_stream_registry(stream_registry.clone());
1687        assert!(manager.stream_registry().is_some());
1688    }
1689
1690    /// Single-member service spec with one INTERNAL TCP endpoint published on
1691    /// `port`. Internal (not Public) so the loopback path actually binds it.
1692    fn mock_internal_tcp_spec(port: u16) -> ServiceSpec {
1693        use zlayer_spec::*;
1694        let yaml = format!(
1695            "
1696version: v1
1697deployment: test
1698services:
1699  test:
1700    rtype: service
1701    image:
1702      name: test:latest
1703    scale:
1704      mode: fixed
1705      replicas: 1
1706    endpoints:
1707      - name: tcp
1708        protocol: tcp
1709        port: {port}
1710        expose: internal
1711"
1712        );
1713        serde_yaml::from_str::<DeploymentSpec>(&yaml)
1714            .unwrap()
1715            .services
1716            .remove("test")
1717            .unwrap()
1718    }
1719
1720    /// End-to-end loopback publish: spin up a real backend `TcpListener`,
1721    /// publish it on the node loopback, connect to `127.0.0.1:<publish_port>`
1722    /// and assert bytes round-trip through the forward; then unpublish and
1723    /// assert the port is freed (a fresh bind succeeds).
1724    #[tokio::test]
1725    async fn test_publish_loopback_round_trips_then_frees_port() {
1726        use tokio::io::{AsyncReadExt, AsyncWriteExt};
1727
1728        // Real backend that echoes a single line back with a known reply.
1729        let backend = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
1730        let backend_addr = backend.local_addr().unwrap();
1731        let backend_ip = backend_addr.ip();
1732        let backend_port = backend_addr.port();
1733        tokio::spawn(async move {
1734            if let Ok((mut sock, _)) = backend.accept().await {
1735                let mut buf = [0u8; 16];
1736                let n = sock.read(&mut buf).await.unwrap_or(0);
1737                // Echo back what we received, prefixed.
1738                let _ = sock.write_all(b"pong:").await;
1739                let _ = sock.write_all(&buf[..n]).await;
1740                let _ = sock.flush().await;
1741            }
1742        });
1743
1744        let config = ProxyManagerConfig::default();
1745        let registry = Arc::new(ServiceRegistry::new());
1746        let manager = ProxyManager::new(config, registry, None);
1747
1748        // Reserve a free publish port (the node-loopback address).
1749        let publish_port = reserve_free_tcp_port();
1750        let spec = mock_internal_tcp_spec(publish_port);
1751        assert!(
1752            spec.publish_to_node_loopback(),
1753            "single-member internal spec should publish to loopback"
1754        );
1755
1756        // The backend is the real listener; port_override forces the forward
1757        // target to the backend's actual ephemeral port (the macOS-style path).
1758        manager
1759            .publish_loopback_for_container(
1760                Some("dep-a"),
1761                "test",
1762                &spec,
1763                backend_ip,
1764                Some(backend_port),
1765            )
1766            .await
1767            .expect("publish should succeed on a free port");
1768
1769        // Connect to 127.0.0.1:<publish_port> and round-trip a payload.
1770        let mut client = tokio::net::TcpStream::connect((Ipv4Addr::LOCALHOST, publish_port))
1771            .await
1772            .expect("connect to published loopback port");
1773        client.write_all(b"ping").await.unwrap();
1774        client.flush().await.unwrap();
1775        let mut reply = Vec::new();
1776        client.read_to_end(&mut reply).await.unwrap();
1777        assert_eq!(&reply, b"pong:ping");
1778        drop(client);
1779
1780        // Unpublish; the last backend's removal frees the listener.
1781        manager
1782            .unpublish_loopback_for_container(&spec, backend_ip, Some(backend_port))
1783            .await;
1784
1785        // The aborted accept task drops the listener asynchronously; retry a
1786        // few times so the OS reclaims the port before we assert it is free.
1787        let mut bound = None;
1788        for _ in 0..50 {
1789            match std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, publish_port)) {
1790                Ok(l) => {
1791                    bound = Some(l);
1792                    break;
1793                }
1794                Err(_) => tokio::time::sleep(Duration::from_millis(20)).await,
1795            }
1796        }
1797        assert!(
1798            bound.is_some(),
1799            "loopback port {publish_port} should be freed after unpublish"
1800        );
1801    }
1802
1803    #[tokio::test]
1804    async fn test_publish_loopback_skips_public_endpoints() {
1805        // Public endpoints are already on 0.0.0.0, so the loopback path must
1806        // NOT bind 127.0.0.1:<port> again. mock_mixed_service_spec exposes
1807        // everything as public.
1808        let config = ProxyManagerConfig::default();
1809        let registry = Arc::new(ServiceRegistry::new());
1810        let manager = ProxyManager::new(config, registry, None);
1811
1812        let spec = mock_mixed_service_spec();
1813        let backend_ip: IpAddr = "127.0.0.1".parse().unwrap();
1814        manager
1815            .publish_loopback_for_container(Some("dep-a"), "mixed", &spec, backend_ip, None)
1816            .await
1817            .expect("public-only spec publishes nothing and must not error");
1818
1819        // No loopback listeners should have been created for public endpoints.
1820        assert!(manager.loopback_tcp.read().await.is_empty());
1821        assert!(manager.loopback_udp.read().await.is_empty());
1822    }
1823
1824    #[tokio::test]
1825    async fn test_registry_accessor() {
1826        let config = ProxyManagerConfig::default();
1827        let registry = Arc::new(ServiceRegistry::new());
1828        let manager = ProxyManager::new(config, registry.clone(), None);
1829
1830        // registry() should return the same Arc
1831        assert_eq!(Arc::as_ptr(&manager.registry()), Arc::as_ptr(&registry));
1832    }
1833
1834    /// Bug 7: a host port published by deployment A must NOT be cross-wired
1835    /// into deployment B's backend pool. B's publish on the same port is
1836    /// REFUSED with `PortConflict`, and `:<port>` keeps resolving to A's
1837    /// backend only.
1838    #[tokio::test]
1839    async fn test_published_port_ownership_rejects_cross_deployment() {
1840        let config = ProxyManagerConfig::default();
1841        let registry = Arc::new(ServiceRegistry::new());
1842        let manager = ProxyManager::new(config, registry, None);
1843
1844        // Reserve a free publish port shared by both deployments.
1845        let publish_port = reserve_free_tcp_port();
1846        let spec = mock_internal_tcp_spec(publish_port);
1847
1848        // Distinct container backends for the two deployments.
1849        let backend_a: IpAddr = "10.0.0.1".parse().unwrap();
1850        let tgt_a = 5001u16;
1851        let backend_b: IpAddr = "10.0.0.2".parse().unwrap();
1852        let tgt_b = 5002u16;
1853
1854        // Deployment A claims the port -> succeeds.
1855        manager
1856            .publish_loopback_for_container(Some("dep-a"), "svc", &spec, backend_a, Some(tgt_a))
1857            .await
1858            .expect("deployment A should claim the free port");
1859
1860        // Deployment B publishing the SAME port -> REFUSED.
1861        let err = manager
1862            .publish_loopback_for_container(Some("dep-b"), "svc", &spec, backend_b, Some(tgt_b))
1863            .await
1864            .expect_err("deployment B must be refused on an owned port");
1865        match err {
1866            crate::error::AgentError::PortConflict { port, .. } => {
1867                assert_eq!(port, publish_port);
1868            }
1869            other => panic!("expected PortConflict, got {other:?}"),
1870        }
1871
1872        // `:<port>` must still serve ONLY deployment A's backend — B was never
1873        // appended into the foreign pool.
1874        let svc = manager
1875            .loopback_registry
1876            .resolve_tcp(publish_port)
1877            .expect("port should still be registered to deployment A");
1878        let expected_a = SocketAddr::new(backend_a, tgt_a);
1879        let foreign_b = SocketAddr::new(backend_b, tgt_b);
1880        assert_eq!(svc.backends, vec![expected_a]);
1881        assert!(
1882            !svc.backends.contains(&foreign_b),
1883            "deployment B's backend must NOT be cross-wired into the pool"
1884        );
1885    }
1886
1887    /// A second replica of the SAME (deployment, service) on an already-owned
1888    /// port is a legitimate scale-up: the replica backend IS appended.
1889    #[tokio::test]
1890    async fn test_published_port_same_owner_appends_replica() {
1891        let config = ProxyManagerConfig::default();
1892        let registry = Arc::new(ServiceRegistry::new());
1893        let manager = ProxyManager::new(config, registry, None);
1894
1895        let publish_port = reserve_free_tcp_port();
1896        let spec = mock_internal_tcp_spec(publish_port);
1897
1898        let replica1: IpAddr = "10.0.0.1".parse().unwrap();
1899        let replica2: IpAddr = "10.0.0.2".parse().unwrap();
1900        let target_port = 6000u16;
1901
1902        // First replica claims the port.
1903        manager
1904            .publish_loopback_for_container(
1905                Some("dep-a"),
1906                "svc",
1907                &spec,
1908                replica1,
1909                Some(target_port),
1910            )
1911            .await
1912            .expect("first replica claims the port");
1913
1914        // Second replica of the SAME (deployment, service) -> appended.
1915        manager
1916            .publish_loopback_for_container(
1917                Some("dep-a"),
1918                "svc",
1919                &spec,
1920                replica2,
1921                Some(target_port),
1922            )
1923            .await
1924            .expect("same-owner second replica should be accepted");
1925
1926        let svc = manager
1927            .loopback_registry
1928            .resolve_tcp(publish_port)
1929            .expect("port should be registered");
1930        let b1 = SocketAddr::new(replica1, target_port);
1931        let b2 = SocketAddr::new(replica2, target_port);
1932        assert_eq!(svc.backends.len(), 2, "both replicas should be in the pool");
1933        assert!(svc.backends.contains(&b1));
1934        assert!(svc.backends.contains(&b2));
1935    }
1936
1937    /// After the owning service unpublishes its last backend, the host-port
1938    /// ownership entry is released so a different (deployment, service) may
1939    /// claim it.
1940    #[tokio::test]
1941    async fn test_published_port_freed_on_unpublish() {
1942        let config = ProxyManagerConfig::default();
1943        let registry = Arc::new(ServiceRegistry::new());
1944        let manager = ProxyManager::new(config, registry, None);
1945
1946        let publish_port = reserve_free_tcp_port();
1947        let spec = mock_internal_tcp_spec(publish_port);
1948        let backend_a: IpAddr = "10.0.0.1".parse().unwrap();
1949        let target_port = 7000u16;
1950
1951        manager
1952            .publish_loopback_for_container(
1953                Some("dep-a"),
1954                "svc",
1955                &spec,
1956                backend_a,
1957                Some(target_port),
1958            )
1959            .await
1960            .expect("deployment A claims the port");
1961        assert!(manager
1962            .published_ports
1963            .read()
1964            .await
1965            .contains_key(&publish_port));
1966
1967        // Unpublish A's only backend -> ownership released.
1968        manager
1969            .unpublish_loopback_for_container(&spec, backend_a, Some(target_port))
1970            .await;
1971        assert!(
1972            !manager
1973                .published_ports
1974                .read()
1975                .await
1976                .contains_key(&publish_port),
1977            "ownership entry should be cleared once the last backend is gone"
1978        );
1979
1980        // A different deployment can now claim the freed port.
1981        let backend_b: IpAddr = "10.0.0.2".parse().unwrap();
1982        manager
1983            .publish_loopback_for_container(
1984                Some("dep-b"),
1985                "svc",
1986                &spec,
1987                backend_b,
1988                Some(target_port),
1989            )
1990            .await
1991            .expect("freed port should be claimable by another deployment");
1992    }
1993
1994    #[tokio::test]
1995    #[allow(clippy::similar_names)]
1996    async fn test_start_ingress_is_idempotent() {
1997        let config = ProxyManagerConfig::default();
1998        let registry = Arc::new(ServiceRegistry::new());
1999        let manager = ProxyManager::new(config, registry, None);
2000
2001        // Use free ephemeral ports so the test does not need root to bind
2002        // 80/443. No CertManager is configured, so only the HTTP listener
2003        // registers (HTTPS warns + returns early).
2004        let http_port = reserve_free_tcp_port();
2005        let https_port = reserve_free_tcp_port();
2006
2007        manager.start_ingress_on(http_port, https_port).await;
2008        // The HTTP ingress server should be registered on its port.
2009        assert!(
2010            manager.servers.read().await.contains_key(&http_port),
2011            "HTTP ingress should be registered"
2012        );
2013        assert!(
2014            manager.ingress_started.load(Ordering::SeqCst),
2015            "ingress_started flag should be set"
2016        );
2017        let count_after_first = manager.servers.read().await.len();
2018
2019        // Second call is a no-op: the idempotency guard short-circuits, so the
2020        // server map does not grow.
2021        manager.start_ingress_on(http_port, https_port).await;
2022        assert_eq!(
2023            manager.servers.read().await.len(),
2024            count_after_first,
2025            "second start_ingress call must not register additional servers"
2026        );
2027    }
2028}