Skip to main content

zlayer_agent/
proxy_manager.rs

1//! Proxy management for agent-controlled services
2//!
3//! This module provides the `ProxyManager` struct that integrates the proxy crate
4//! with the agent's service management. It handles:
5//! - Managing proxy routes based on `ServiceSpec` endpoints (HTTP/HTTPS/WebSocket)
6//! - Managing L4 stream proxy listeners (TCP/UDP)
7//! - Tracking and updating backend servers for load balancing
8//! - Coordinating proxy server lifecycle
9
10use crate::error::Result;
11use std::collections::{HashMap, HashSet};
12use std::net::{IpAddr, Ipv4Addr, SocketAddr};
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::RwLock;
17use tracing::{debug, info, warn};
18use zlayer_proxy::{
19    endpoint_lb_key, load_existing_certs_into_resolver, tls_acceptor_from_resolver, Activator,
20    CertManager, LbStrategy, LoadBalancer, NetworkPolicyChecker, ProxyConfig, ProxyServer,
21    RouteEntry, RpsRegistry, ServiceRegistry, SniCertResolver, StreamHealthProbe,
22    StreamProxyConfig, StreamRegistry, StreamService, TcpStreamService, UdpStreamService,
23};
24use zlayer_scheduler::scalers::RpsProvider;
25use zlayer_spec::{
26    ExposeType, PortMapping, PortProtocol, Protocol, ServiceSpec, StreamEndpointConfig,
27    StreamHealthCheck,
28};
29
30/// Default activation floor: the replica count a [`ServiceActivator`] scales a
31/// scaled-to-zero service up to on the first request.
32const DEFAULT_ACTIVATION_FLOOR: u32 = 1;
33
34/// Bind an explicitly IPv6-only TCP listener on `[::]:port`.
35///
36/// The socket is forced `IPV6_V6ONLY` so it never conflicts with the sibling
37/// `0.0.0.0:port` IPv4 listener — on Linux the default `IPV6_V6ONLY=0` would
38/// otherwise make `[::]` claim v4-mapped traffic too and the second bind would
39/// fail with `EADDRINUSE`. Returned as a non-blocking [`tokio::net::TcpListener`]
40/// ready to hand to the proxy accept loop.
41fn bind_v6_only(port: u16) -> std::io::Result<tokio::net::TcpListener> {
42    use socket2::{Domain, Protocol, Socket, Type};
43    let sock = Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))?;
44    sock.set_only_v6(true)?;
45    sock.set_reuse_address(true)?;
46    sock.set_nonblocking(true)?;
47    let addr: std::net::SocketAddr = (std::net::Ipv6Addr::UNSPECIFIED, port).into();
48    sock.bind(&addr.into())?;
49    sock.listen(1024)?;
50    tokio::net::TcpListener::from_std(sock.into())
51}
52
53/// Translate an endpoint's `stream:` block (the controlling
54/// [`StreamEndpointConfig`] from `zlayer-types`) into the proxy crate's
55/// runtime [`StreamProxyConfig`].
56///
57/// This is where the spec-level types are decoded into proxy-local ones:
58/// `session_timeout` strings are parsed into [`Duration`], and a configured
59/// [`StreamHealthCheck`] is lowered into a [`StreamHealthProbe`] (decoding the
60/// `\xNN` hex escapes in the UDP probe request/expect payloads).
61///
62/// `None` (no `stream:` block) yields the default config (no TLS, no PROXY
63/// protocol, no session-timeout override, no health probe).
64fn translate_stream_config(stream: Option<&StreamEndpointConfig>) -> StreamProxyConfig {
65    let Some(stream) = stream else {
66        return StreamProxyConfig::default();
67    };
68
69    let health_check = stream.health_check.as_ref().map(|hc| match hc {
70        StreamHealthCheck::TcpConnect => StreamHealthProbe::TcpConnect,
71        StreamHealthCheck::UdpProbe { request, expect } => StreamHealthProbe::UdpProbe {
72            request: unescape_hex(request),
73            expect: expect.as_deref().map(unescape_hex),
74        },
75    });
76
77    StreamProxyConfig {
78        tls: stream.tls,
79        proxy_protocol: stream.proxy_protocol,
80        session_timeout: stream.session_timeout_duration(),
81        health_check,
82    }
83}
84
85/// Decode `\xNN` hex escapes (and `\\` for a literal backslash) in a probe
86/// payload string into raw bytes. Any other character (or a malformed escape)
87/// is passed through as its UTF-8 bytes, so plain ASCII payloads work as-is.
88fn unescape_hex(s: &str) -> Vec<u8> {
89    let bytes = s.as_bytes();
90    let mut out = Vec::with_capacity(bytes.len());
91    let mut i = 0;
92    while i < bytes.len() {
93        if bytes[i] == b'\\' && i + 1 < bytes.len() {
94            match bytes[i + 1] {
95                b'x' | b'X' if i + 3 < bytes.len() => {
96                    let hi = (bytes[i + 2] as char).to_digit(16);
97                    let lo = (bytes[i + 3] as char).to_digit(16);
98                    if let (Some(hi), Some(lo)) = (hi, lo) {
99                        // hi/lo are each a single hex digit (0..=15), so the
100                        // combined value is 0..=255 — try_from never fails.
101                        out.push(u8::try_from((hi << 4) | lo).unwrap_or(0));
102                        i += 4;
103                        continue;
104                    }
105                    // Malformed \x escape: emit the backslash literally.
106                    out.push(b'\\');
107                    i += 1;
108                }
109                b'\\' => {
110                    out.push(b'\\');
111                    i += 2;
112                }
113                _ => {
114                    out.push(b'\\');
115                    i += 1;
116                }
117            }
118        } else {
119            out.push(bytes[i]);
120            i += 1;
121        }
122    }
123    out
124}
125
126/// Maximum time [`ServiceActivator::activate`] waits for a healthy backend to
127/// appear after triggering a scale-up before returning so the proxy can fall
128/// back to its own deadline/`503`.
129const ACTIVATOR_READY_DEADLINE: Duration = Duration::from_secs(30);
130
131/// Poll step while waiting for a backend after a scale-up trigger.
132const ACTIVATOR_POLL_STEP: Duration = Duration::from_millis(200);
133
134/// Abstraction over "scale this service to N replicas", implemented by the
135/// agent's `ServiceManager`.
136///
137/// [`ProxyManager`] is constructed *before* the `ServiceManager` and the
138/// `ServiceManager` holds an `Arc<ProxyManager>` (one-way wiring), so the
139/// activator cannot hold a concrete `ServiceManager` without a reference cycle.
140/// This trait lets the activator depend only on the scale-up capability, which
141/// the `ServiceManager` provides via its existing `scale_service` method. The
142/// daemon installs a concrete trigger after both halves exist
143/// (see [`ProxyManager::set_activator`]).
144#[async_trait::async_trait]
145pub trait ScaleTrigger: Send + Sync {
146    /// Scale `service` to at least `replicas` running replicas.
147    ///
148    /// # Errors
149    ///
150    /// Returns a human-readable error if the scale-up could not be initiated.
151    async fn scale_to(&self, service: &str, replicas: u32) -> std::result::Result<(), String>;
152}
153
154/// On-demand [`Activator`] for scale-to-zero services.
155///
156/// When the proxy resolves a route with no healthy backend, it calls
157/// [`Activator::activate`] with the resolved LB group key. This implementation
158/// derives the bare service name from that key, triggers a scale-up to the
159/// activation floor via a [`ScaleTrigger`], then polls the shared
160/// [`LoadBalancer`] until a healthy backend for the key appears (or the
161/// deadline elapses).
162pub struct ServiceActivator {
163    /// The scale-up capability (the agent's `ServiceManager`).
164    trigger: Arc<dyn ScaleTrigger>,
165    /// Shared load balancer the proxy selects from; polled for readiness.
166    load_balancer: Arc<LoadBalancer>,
167    /// Replica floor to scale up to on activation.
168    floor: u32,
169}
170
171impl ServiceActivator {
172    /// Create an activator that scales up via `trigger` and waits on
173    /// `load_balancer` for readiness, using [`DEFAULT_ACTIVATION_FLOOR`].
174    #[must_use]
175    pub fn new(trigger: Arc<dyn ScaleTrigger>, load_balancer: Arc<LoadBalancer>) -> Self {
176        Self {
177            trigger,
178            load_balancer,
179            floor: DEFAULT_ACTIVATION_FLOOR,
180        }
181    }
182
183    /// Override the activation replica floor (default
184    /// [`DEFAULT_ACTIVATION_FLOOR`]). A floor of `0` is clamped to `1` so an
185    /// activation always brings the service up.
186    #[must_use]
187    pub fn with_floor(mut self, floor: u32) -> Self {
188        self.floor = floor.max(1);
189        self
190    }
191
192    /// Derive the bare service name from a resolved LB group key.
193    ///
194    /// `add_service` registers per-endpoint LB groups under the composite key
195    /// `{deployment}/{service}#{endpoint}` (and a bare service-level group).
196    /// `scale_service` operates on the bare service name, so strip the
197    /// `#endpoint` suffix and the `deployment/` prefix.
198    fn service_name_from_key(key: &str) -> &str {
199        let without_endpoint = key.split('#').next().unwrap_or(key);
200        without_endpoint
201            .rsplit('/')
202            .next()
203            .unwrap_or(without_endpoint)
204    }
205}
206
207#[async_trait::async_trait]
208impl Activator for ServiceActivator {
209    async fn activate(&self, service: &str) -> std::result::Result<(), String> {
210        let bare = Self::service_name_from_key(service);
211        info!(
212            lb_key = %service,
213            service = %bare,
214            floor = self.floor,
215            "Activating scaled-to-zero service on demand"
216        );
217
218        // Trigger the scale-up. A failure here is surfaced to the proxy, which
219        // still polls for a backend (a concurrent activation may bring it up).
220        self.trigger.scale_to(bare, self.floor).await?;
221
222        // Wait for a healthy backend on the SAME key the proxy selects with.
223        let deadline = Instant::now() + ACTIVATOR_READY_DEADLINE;
224        loop {
225            if self.load_balancer.select(service).is_some() {
226                return Ok(());
227            }
228            if Instant::now() >= deadline {
229                return Err(format!(
230                    "service '{bare}' did not become ready within {ACTIVATOR_READY_DEADLINE:?} after scale-up"
231                ));
232            }
233            tokio::time::sleep(ACTIVATOR_POLL_STEP).await;
234        }
235    }
236}
237
238/// Adapter exposing a [`RpsRegistry`] as a scheduler [`RpsProvider`].
239///
240/// The `zlayer-proxy` crate must not depend on `zlayer-scheduler`, so the
241/// `RpsProvider` impl lives here in the agent (which depends on both). The
242/// scheduler reads per-service RPS through this adapter; the proxy populates the
243/// underlying registry on every routed request.
244#[derive(Debug, Clone)]
245pub struct RpsRegistryProvider {
246    /// The registry populated by the proxy request path.
247    registry: Arc<RpsRegistry>,
248}
249
250impl RpsRegistryProvider {
251    /// Wrap a shared [`RpsRegistry`] as an [`RpsProvider`].
252    #[must_use]
253    pub fn new(registry: Arc<RpsRegistry>) -> Self {
254        Self { registry }
255    }
256}
257
258#[async_trait::async_trait]
259impl RpsProvider for RpsRegistryProvider {
260    async fn rps(&self, service: &str) -> f64 {
261        self.registry.rps(service).await
262    }
263}
264
265/// Default HTTP ingress port. The lone daemon IS the cluster ingress: it binds
266/// `0.0.0.0:80` so any deployed service with an HTTP endpoint is reachable on
267/// the standard web port without per-service listener configuration.
268pub const DEFAULT_INGRESS_HTTP_PORT: u16 = 80;
269
270/// Default HTTPS ingress port. See [`DEFAULT_INGRESS_HTTP_PORT`].
271pub const DEFAULT_INGRESS_HTTPS_PORT: u16 = 443;
272
273/// Configuration for the `ProxyManager`
274#[derive(Debug, Clone)]
275pub struct ProxyManagerConfig {
276    /// HTTP bind address
277    pub http_addr: SocketAddr,
278    /// HTTPS bind address (optional)
279    pub https_addr: Option<SocketAddr>,
280    /// Whether to enable HTTP/2
281    pub http2_enabled: bool,
282}
283
284impl Default for ProxyManagerConfig {
285    fn default() -> Self {
286        Self {
287            http_addr: "0.0.0.0:80".parse().unwrap(),
288            https_addr: None,
289            http2_enabled: true,
290        }
291    }
292}
293
294impl ProxyManagerConfig {
295    /// Create a new configuration with the specified HTTP address
296    #[must_use]
297    pub fn new(http_addr: SocketAddr) -> Self {
298        Self {
299            http_addr,
300            https_addr: None,
301            http2_enabled: true,
302        }
303    }
304
305    /// Set the HTTPS address
306    #[must_use]
307    pub fn with_https(mut self, addr: SocketAddr) -> Self {
308        self.https_addr = Some(addr);
309        self
310    }
311
312    /// Set HTTP/2 support
313    #[must_use]
314    pub fn with_http2(mut self, enabled: bool) -> Self {
315        self.http2_enabled = enabled;
316        self
317    }
318}
319
320/// Per-service tracking information for cleanup purposes.
321#[derive(Debug, Clone)]
322struct ServiceTracking {
323    /// Owning deployment name, when known. Threaded from
324    /// `ServiceSpec.deployment` by `add_service`. `None` for standalone /
325    /// single-deployment callers (`docker run`). Used to build the
326    /// deployment-scoped LB group key so two deployments sharing a
327    /// service+endpoint name keep independent backend pools.
328    deployment: Option<String>,
329    /// Endpoint names (used to derive per-endpoint LB group keys for
330    /// cleanup on `remove_service`).
331    endpoint_names: Vec<String>,
332    /// TCP ports owned by this service
333    tcp_ports: Vec<u16>,
334    /// UDP ports owned by this service
335    udp_ports: Vec<u16>,
336    /// HTTP/HTTPS/WebSocket ports owned by this service
337    http_ports: Vec<u16>,
338}
339
340/// Manages proxy routing for agent-controlled services
341///
342/// The `ProxyManager` coordinates between the agent's service lifecycle and
343/// the proxy crate's routing/load balancing infrastructure. It supports:
344///
345/// - **HTTP/HTTPS/WebSocket (L7)**: Multiple port listeners sharing the same
346///   `ServiceRegistry` for request matching and load balancing.
347/// - **TCP/UDP (L4)**: Standalone stream proxy listeners that forward raw
348///   connections/datagrams to backends via the `StreamRegistry`.
349pub struct ProxyManager {
350    /// Configuration
351    config: ProxyManagerConfig,
352    /// Shared service registry for HTTP request matching and backend management
353    registry: Arc<ServiceRegistry>,
354    /// Load balancer for health-aware backend selection
355    load_balancer: Arc<LoadBalancer>,
356    /// Per-port HTTP proxy server handles
357    servers: RwLock<HashMap<u16, Arc<ProxyServer>>>,
358    /// Tracked services and their endpoints (includes port ownership for cleanup)
359    services: RwLock<HashMap<String, ServiceTracking>>,
360    /// Stream registry for L4 TCP/UDP proxy routing
361    stream_registry: Option<Arc<StreamRegistry>>,
362    /// Certificate manager for TLS
363    cert_manager: Option<Arc<CertManager>>,
364    /// Shared SNI certificate resolver used by EVERY live HTTPS listener this
365    /// manager starts (`listen_on_tls` + the `:443` ingress).
366    ///
367    /// Created once in [`ProxyManager::new`] and cloned into each listener so a
368    /// certificate hot-loaded into it (by the ACME provisioning trigger in
369    /// [`Self::add_service`] or the daemon's renewal task) takes effect on the
370    /// LIVE listener immediately — no restart. Before this was shared, each
371    /// listener built its own resolver and dropped the handle, so a freshly
372    /// provisioned cert could never reach the running listener and public
373    /// HTTPS vhosts served "No certificate found" forever.
374    sni_resolver: Arc<SniCertResolver>,
375    /// Set of hosts for which certificate provisioning has already been
376    /// scheduled, so a reconcile that re-runs [`Self::add_service`] for the same
377    /// vhost does not re-spawn an ACME attempt every time. `CertManager::get_cert`
378    /// is already cheap on a cache/disk hit, but a host whose provisioning is
379    /// in-flight or has FAILED would otherwise be retried on every reconcile and
380    /// burn Let's Encrypt rate limit; this guard makes the trigger fire at most
381    /// once per host for the manager's lifetime.
382    provisioning_requested: RwLock<HashSet<String>>,
383    /// Ports with active TCP stream listeners (to avoid double-binding)
384    tcp_listeners: RwLock<HashSet<u16>>,
385    /// Ports with active UDP stream listeners (to avoid double-binding)
386    udp_listeners: RwLock<HashSet<u16>>,
387    /// Number of active proxy connections (for graceful drain on shutdown)
388    active_connections: Arc<AtomicU64>,
389    /// Optional network policy checker for access control enforcement
390    network_policy_checker: Option<NetworkPolicyChecker>,
391    /// Dedicated stream registry for node-loopback (`127.0.0.1:<port>`)
392    /// publishing.
393    ///
394    /// This is intentionally separate from [`Self::stream_registry`]: the
395    /// latter is keyed by endpoint port and entangled with the L7/L4 +
396    /// Public/Internal binding matrix (`ensure_ports_for_service`). The
397    /// loopback path forwards the node's `127.0.0.1:<endpoint.port>` to the
398    /// container's real backend, independent of how the endpoint is exposed,
399    /// so it owns its own registry and listener set.
400    loopback_registry: Arc<StreamRegistry>,
401    /// Active loopback TCP listeners keyed by published port. The
402    /// [`JoinHandle`] owns the bound socket via its accept loop; aborting it
403    /// frees the OS port. Used for both dedup and cleanup.
404    loopback_tcp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
405    /// Active loopback UDP listeners keyed by published port. See
406    /// [`Self::loopback_tcp`].
407    loopback_udp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
408    /// Dedicated stream registry for explicit `port_mappings` host-port
409    /// publishing (Docker-style `host_port -> container_port`). Separate from
410    /// `loopback_registry`: the loopback path always binds `127.0.0.1:<port>`
411    /// and forwards to the endpoint's target port; this path binds the
412    /// mapping's `host_ip:host_port` (default `0.0.0.0`) and forwards to the
413    /// container's `container_port`, independent of any endpoint.
414    port_map_registry: Arc<StreamRegistry>,
415    /// Active `port_mappings` TCP listeners keyed by bound host port.
416    port_map_tcp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
417    /// Active `port_mappings` UDP listeners keyed by bound host port.
418    port_map_udp: RwLock<HashMap<u16, tokio::task::JoinHandle<()>>>,
419    /// Ownership map for published host-port loopback bindings.
420    ///
421    /// A host port (`127.0.0.1:<port>`) is a GLOBAL host resource — it can be
422    /// bound exactly once. This maps each published port to the
423    /// `(deployment, service)` that owns it. The first publisher claims the
424    /// port; a second publisher for the SAME `(deployment, service)` is a
425    /// legitimate scale-up (replica backend appended); a publisher for a
426    /// DIFFERENT `(deployment, service)` is REFUSED (it would otherwise be
427    /// silently appended into the foreign pool, so `:<port>` would serve the
428    /// wrong deployment's backends — Bug 7). The entry is freed when the
429    /// owning service's last backend on the port is unpublished.
430    ///
431    /// The key is the published port; the value is
432    /// `(deployment, service_name)` where `deployment` is `None` for
433    /// standalone callers.
434    published_ports: RwLock<HashMap<u16, (Option<String>, String)>>,
435    /// Background TCP health-check task for the L7 load balancer. Periodically
436    /// TCP-connects to every registered backend and flips its health status,
437    /// so a backend that was marked unhealthy by a transient request-path
438    /// failure (e.g. the overlay momentarily reconfiguring while sibling
439    /// containers churn during a CI build) AUTO-RECOVERS once it answers
440    /// connects again. Without this the L7 LB had no recovery path of its own
441    /// and a single transient blip left a service stuck on "no healthy
442    /// backends" until a daemon restart. Aborted on drop.
443    lb_health_checker: tokio::task::JoinHandle<()>,
444    /// Whether the standing `0.0.0.0:80` / `0.0.0.0:443` ingress listeners have
445    /// already been started. Makes [`ProxyManager::start_ingress`] idempotent
446    /// so a double call (or a restart path) does not spawn duplicate
447    /// retry-bind tasks that would fight over the same ports.
448    ingress_started: AtomicBool,
449    /// Per-service request-rate counter populated by the proxy request path.
450    /// Threaded into every [`ProxyServer`] this manager builds and exposed via
451    /// [`ProxyManager::rps_registry`] so the scheduler can read a real RPS
452    /// signal (wrap it with [`RpsRegistryProvider`]).
453    rps_registry: Arc<RpsRegistry>,
454    /// Optional on-demand activator for scale-to-zero services. Installed by the
455    /// daemon AFTER the `ServiceManager` exists (see
456    /// [`ProxyManager::set_activator`]); threaded into every [`ProxyServer`]
457    /// built thereafter. A `None` here preserves the existing immediate-`503`
458    /// behavior.
459    activator: RwLock<Option<Arc<dyn Activator>>>,
460}
461
462impl Drop for ProxyManager {
463    fn drop(&mut self) {
464        self.lb_health_checker.abort();
465    }
466}
467
468impl ProxyManager {
469    /// Create a new `ProxyManager` with the given configuration, service registry,
470    /// and optional certificate manager.
471    pub fn new(
472        config: ProxyManagerConfig,
473        registry: Arc<ServiceRegistry>,
474        cert_manager: Option<Arc<CertManager>>,
475    ) -> Self {
476        let load_balancer = Arc::new(LoadBalancer::new());
477
478        // Spawn the L7 load balancer's own TCP health checker so unhealthy
479        // backends auto-recover. Probe every 5s with a 2s per-probe timeout:
480        // fast enough that a transient blip during a CI build (sibling
481        // containers churning the overlay) clears well within a single e2e
482        // step, without hammering backends.
483        let lb_health_checker =
484            load_balancer.spawn_health_checker(Duration::from_secs(5), Duration::from_secs(2));
485
486        Self {
487            config,
488            registry,
489            load_balancer,
490            servers: RwLock::new(HashMap::new()),
491            services: RwLock::new(HashMap::new()),
492            stream_registry: None,
493            cert_manager,
494            sni_resolver: Arc::new(SniCertResolver::new()),
495            provisioning_requested: RwLock::new(HashSet::new()),
496            tcp_listeners: RwLock::new(HashSet::new()),
497            udp_listeners: RwLock::new(HashSet::new()),
498            active_connections: Arc::new(AtomicU64::new(0)),
499            network_policy_checker: None,
500            loopback_registry: Arc::new(StreamRegistry::new()),
501            loopback_tcp: RwLock::new(HashMap::new()),
502            loopback_udp: RwLock::new(HashMap::new()),
503            port_map_registry: Arc::new(StreamRegistry::new()),
504            port_map_tcp: RwLock::new(HashMap::new()),
505            port_map_udp: RwLock::new(HashMap::new()),
506            published_ports: RwLock::new(HashMap::new()),
507            lb_health_checker,
508            ingress_started: AtomicBool::new(false),
509            rps_registry: Arc::new(RpsRegistry::new()),
510            activator: RwLock::new(None),
511        }
512    }
513
514    /// Get a reference to the service registry
515    pub fn registry(&self) -> Arc<ServiceRegistry> {
516        self.registry.clone()
517    }
518
519    /// Get a reference to the load balancer
520    pub fn load_balancer(&self) -> Arc<LoadBalancer> {
521        self.load_balancer.clone()
522    }
523
524    /// Get the shared per-service [`RpsRegistry`].
525    ///
526    /// The proxy request path records every routed request into this registry;
527    /// wrap it with [`RpsRegistryProvider`] to hand the scheduler an
528    /// [`RpsProvider`] backed by real proxy traffic.
529    #[must_use]
530    pub fn rps_registry(&self) -> Arc<RpsRegistry> {
531        Arc::clone(&self.rps_registry)
532    }
533
534    /// Get the shared [`RpsRegistry`] adapted as a scheduler [`RpsProvider`].
535    ///
536    /// Convenience wrapper around [`Self::rps_registry`] for the scaling
537    /// wiring: the returned value implements
538    /// [`zlayer_scheduler::scalers::RpsProvider`].
539    #[must_use]
540    pub fn rps_provider(&self) -> RpsRegistryProvider {
541        RpsRegistryProvider::new(self.rps_registry())
542    }
543
544    /// Install the on-demand [`Activator`] used for scale-to-zero wake-ups.
545    ///
546    /// Call this once the agent's `ServiceManager` exists: build a
547    /// [`ServiceActivator`] from a [`ScaleTrigger`] (the `ServiceManager`) plus
548    /// [`Self::load_balancer`], then pass it here. Servers built AFTER this call
549    /// (including [`Self::start_ingress`]) carry the activator; a request to an
550    /// idle service then triggers a wake-up instead of an immediate `503`.
551    pub async fn set_activator(&self, activator: Arc<dyn Activator>) {
552        *self.activator.write().await = Some(activator);
553    }
554
555    /// Build a [`ServiceActivator`] from `trigger` and this manager's load
556    /// balancer, then install it via [`Self::set_activator`].
557    ///
558    /// Convenience for the common wiring where the agent's `ServiceManager`
559    /// implements [`ScaleTrigger`].
560    pub async fn install_service_activator(&self, trigger: Arc<dyn ScaleTrigger>) {
561        let activator = Arc::new(ServiceActivator::new(trigger, self.load_balancer.clone()));
562        self.set_activator(activator).await;
563    }
564
565    /// Snapshot the currently-installed activator, if any. Used internally when
566    /// building servers so each server carries the activator present at build
567    /// time.
568    async fn current_activator(&self) -> Option<Arc<dyn Activator>> {
569        self.activator.read().await.clone()
570    }
571
572    /// Get the number of currently active proxy connections.
573    pub fn active_connections(&self) -> u64 {
574        self.active_connections.load(Ordering::Relaxed)
575    }
576
577    /// Get a reference to the certificate manager (if configured)
578    pub fn cert_manager(&self) -> Option<&Arc<CertManager>> {
579        self.cert_manager.as_ref()
580    }
581
582    /// Get the shared [`SniCertResolver`] backing every live HTTPS listener.
583    ///
584    /// Hand this to the daemon's certificate renewal task so renewed certs are
585    /// hot-loaded into the SAME resolver the running `:443` listener serves
586    /// from — `refresh_cert` on this handle is visible to live connections
587    /// immediately, with no listener restart.
588    #[must_use]
589    pub fn sni_resolver(&self) -> Arc<SniCertResolver> {
590        Arc::clone(&self.sni_resolver)
591    }
592
593    /// Set the stream registry for L4 proxy integration (TCP/UDP)
594    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
595        self.stream_registry = Some(registry);
596    }
597
598    /// Builder pattern: add stream registry for L4 proxy integration
599    #[must_use]
600    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
601        self.stream_registry = Some(registry);
602        self
603    }
604
605    /// Get the stream registry (if configured)
606    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
607        self.stream_registry.as_ref()
608    }
609
610    /// Set the network policy checker for access control enforcement
611    pub fn set_network_policy_checker(&mut self, checker: NetworkPolicyChecker) {
612        self.network_policy_checker = Some(checker);
613    }
614
615    /// Builder pattern: add network policy checker for access control enforcement
616    #[must_use]
617    pub fn with_network_policy_checker(mut self, checker: NetworkPolicyChecker) -> Self {
618        self.network_policy_checker = Some(checker);
619        self
620    }
621
622    /// Start listening on a specific port bound to the given address.
623    ///
624    /// If already listening on this port, skip.
625    /// All port listeners share the same `ServiceRegistry` for request matching.
626    ///
627    /// # Errors
628    /// Returns an error if the proxy server cannot be started.
629    pub async fn listen_on(&self, port: u16, bind_ip: IpAddr) -> Result<()> {
630        let mut servers = self.servers.write().await;
631
632        if servers.contains_key(&port) {
633            debug!(port = port, "Already listening on port");
634            return Ok(());
635        }
636
637        let addr = SocketAddr::new(bind_ip, port);
638        let mut proxy_config = ProxyConfig::default();
639        proxy_config.server.http_addr = addr;
640        proxy_config.server.http2_enabled = self.config.http2_enabled;
641
642        let mut server = ProxyServer::with_registry(
643            proxy_config,
644            self.registry.clone(),
645            self.load_balancer.clone(),
646        )
647        .with_rps_registry(self.rps_registry());
648        if let Some(ref checker) = self.network_policy_checker {
649            server = server.with_network_policy_checker(checker.clone());
650        }
651        if let Some(activator) = self.current_activator().await {
652            server = server.with_activator(activator);
653        }
654        let server = Arc::new(server);
655
656        info!(port = port, bind = %addr, "Proxy listening on port");
657
658        let server_clone = server.clone();
659        tokio::spawn(async move {
660            if let Err(e) = server_clone.run().await {
661                tracing::error!(port = port, error = %e, "Proxy server error on port");
662            }
663        });
664
665        servers.insert(port, server);
666        Ok(())
667    }
668
669    /// Start an HTTPS listener on the given port using `SniCertResolver` for dynamic cert selection.
670    ///
671    /// If already listening on this port, skip.
672    /// Requires a `CertManager` to be configured; logs a warning and returns `Ok(())` if not.
673    ///
674    /// # Errors
675    /// Returns an error if the HTTPS proxy server cannot be started.
676    pub async fn listen_on_tls(&self, port: u16, bind_ip: IpAddr) -> Result<()> {
677        let mut servers = self.servers.write().await;
678
679        if servers.contains_key(&port) {
680            debug!(port = port, "Already listening on port (TLS)");
681            return Ok(());
682        }
683
684        let Some(cert_manager) = &self.cert_manager else {
685            warn!(
686                port = port,
687                "Cannot start TLS listener: no CertManager configured"
688            );
689            return Ok(());
690        };
691
692        // Use the manager's SHARED SniCertResolver so certs hot-loaded later
693        // (ACME provisioning trigger / renewal task) reach this live listener.
694        let sni_resolver = self.sni_resolver();
695
696        // Load existing certificates (best-effort; log warnings on failure)
697        let _ = load_existing_certs_into_resolver(cert_manager, &sni_resolver).await;
698
699        let addr = SocketAddr::new(bind_ip, port);
700        let mut proxy_config = ProxyConfig::default();
701        proxy_config.server.https_addr = addr;
702
703        let mut server = ProxyServer::with_tls_resolver(
704            proxy_config,
705            self.registry.clone(),
706            self.load_balancer.clone(),
707            sni_resolver,
708        )
709        .with_cert_manager(Arc::clone(cert_manager))
710        .with_rps_registry(self.rps_registry());
711        if let Some(ref checker) = self.network_policy_checker {
712            server = server.with_network_policy_checker(checker.clone());
713        }
714        if let Some(activator) = self.current_activator().await {
715            server = server.with_activator(activator);
716        }
717        let server = Arc::new(server);
718
719        info!(port = port, bind = %addr, "HTTPS proxy listening on port");
720
721        let server_clone = server.clone();
722        tokio::spawn(async move {
723            if let Err(e) = server_clone.run_https().await {
724                tracing::error!(port = port, error = %e, "HTTPS proxy server error");
725            }
726        });
727
728        servers.insert(port, server);
729        Ok(())
730    }
731
732    /// Start the standing HTTP/HTTPS ingress on `0.0.0.0:80` and `0.0.0.0:443`.
733    ///
734    /// The lone daemon IS the ingress: this binds the two well-known web ports
735    /// so every deployed service with an HTTP/HTTPS endpoint is reachable on
736    /// the standard ports, routed by the shared [`ServiceRegistry`]/SNI cert
737    /// resolver the manager already holds — no per-service listener config.
738    ///
739    /// **Conflict policy: WARN, never error.** If 80/443 is already held the
740    /// underlying [`ProxyServer::run_with_retry`] /
741    /// [`ProxyServer::run_https_with_retry`] log a warning and keep retrying
742    /// the bind forever, grabbing the port the moment it frees. This NEVER
743    /// aborts startup, NEVER blocks deployments, and NEVER hard-errors. Binding
744    /// 80/443 needs root or `CAP_NET_BIND_SERVICE`; a non-root daemon simply
745    /// never grabs them and keeps warning — that is fine.
746    ///
747    /// Idempotent: a second call is a no-op (it will not spawn duplicate
748    /// retry-bind tasks).
749    pub async fn start_ingress(&self) {
750        self.start_ingress_on(DEFAULT_INGRESS_HTTP_PORT, DEFAULT_INGRESS_HTTPS_PORT)
751            .await;
752    }
753
754    /// Build (but do not start) the `:80` HTTP ingress [`ProxyServer`].
755    ///
756    /// Applies the shared registry, RPS registry, network-policy checker, the
757    /// supplied on-demand `activator`, AND — crucially — this manager's
758    /// [`CertManager`] when one is configured. The cert manager on `:80` exists
759    /// for exactly one reason: ACME HTTP-01 challenges
760    /// (`/.well-known/acme-challenge/<token>`) ALWAYS arrive on `:80`, and the
761    /// request path serves them only when `cert_manager` is `Some`. There is no
762    /// TLS on `:80`; this is purely the HTTP-01 carve-out.
763    ///
764    /// Network-free: constructs the server without binding any port, so callers
765    /// (and tests) can inspect the result before [`Self::start_ingress_on`]
766    /// spawns the retry-bind task.
767    fn build_http_ingress_server(
768        &self,
769        http_addr: SocketAddr,
770        activator: Option<Arc<dyn Activator>>,
771    ) -> ProxyServer {
772        let mut http_proxy_config = ProxyConfig::default();
773        http_proxy_config.server.http_addr = http_addr;
774        http_proxy_config.server.http2_enabled = self.config.http2_enabled;
775
776        let mut http_server = ProxyServer::with_registry(
777            http_proxy_config,
778            self.registry.clone(),
779            self.load_balancer.clone(),
780        )
781        .with_rps_registry(self.rps_registry());
782        if let Some(ref checker) = self.network_policy_checker {
783            http_server = http_server.with_network_policy_checker(checker.clone());
784        }
785        if let Some(activator) = activator {
786            http_server = http_server.with_activator(activator);
787        }
788        // Wire the cert manager so the `:80` listener can serve ACME HTTP-01
789        // challenges. Without this the interception is skipped and the request
790        // falls through to the vhost match, which 403s when no HTTP service is
791        // registered for the challenge domain — so certs can never be issued.
792        if let Some(ref cm) = self.cert_manager {
793            http_server = http_server.with_cert_manager(Arc::clone(cm));
794        }
795        http_server
796    }
797
798    /// Like [`Self::start_ingress`] but with explicit HTTP/HTTPS ports.
799    ///
800    /// Used by tests; the production path uses [`DEFAULT_INGRESS_HTTP_PORT`] /
801    /// [`DEFAULT_INGRESS_HTTPS_PORT`].
802    #[allow(clippy::similar_names)]
803    pub async fn start_ingress_on(&self, http_port: u16, https_port: u16) {
804        // Idempotency: only the first caller wins.
805        if self
806            .ingress_started
807            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
808            .is_err()
809        {
810            debug!("Ingress already started; skipping");
811            return;
812        }
813
814        let bind_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED); // 0.0.0.0
815
816        // ---- HTTP ingress (:80) ----
817        let http_addr = SocketAddr::new(bind_ip, http_port);
818        let activator = self.current_activator().await;
819        let http_server = Arc::new(self.build_http_ingress_server(http_addr, activator));
820        info!(port = http_port, bind = %http_addr, "Starting HTTP ingress (retry-never-error)");
821        {
822            let server = http_server.clone();
823            tokio::spawn(async move {
824                if let Err(e) = server.run_with_retry(http_addr).await {
825                    // Only reached on a post-bind fatal accept-loop error; the
826                    // bind itself never errors out.
827                    warn!(port = http_port, error = %e, "HTTP ingress accept loop exited");
828                }
829            });
830        }
831        // Additional v6-only listener on `[::]:<http_port>` so IPv6 clients can
832        // reach hosted vhosts. Shares the same `http_server` Arc (and thus its
833        // shutdown signal); a missing-IPv6 host stays non-fatal.
834        {
835            let server = http_server.clone();
836            tokio::spawn(async move {
837                match bind_v6_only(http_port) {
838                    Ok(l) => {
839                        if let Err(e) = server.run_on_listener(l).await {
840                            warn!(port = http_port, error = %e, "HTTP v6 ingress exited");
841                        }
842                    }
843                    Err(e) => {
844                        warn!(port = http_port, error = %e, "HTTP v6 ingress bind failed (non-fatal)");
845                    }
846                }
847            });
848        }
849        self.servers.write().await.insert(http_port, http_server);
850
851        // ---- HTTPS ingress (:443) ----
852        let Some(cert_manager) = &self.cert_manager else {
853            warn!(
854                port = https_port,
855                "Cannot start HTTPS ingress: no CertManager configured (HTTP ingress is up)"
856            );
857            return;
858        };
859
860        // Use the manager's SHARED SniCertResolver so certs hot-loaded later
861        // (ACME provisioning trigger / renewal task) reach this live listener.
862        let sni_resolver = self.sni_resolver();
863        // Load existing certificates (best-effort; log warnings on failure).
864        let _ = load_existing_certs_into_resolver(cert_manager, &sni_resolver).await;
865
866        let https_addr = SocketAddr::new(bind_ip, https_port);
867        let mut https_proxy_config = ProxyConfig::default();
868        https_proxy_config.server.https_addr = https_addr;
869
870        let mut https_server = ProxyServer::with_tls_resolver(
871            https_proxy_config,
872            self.registry.clone(),
873            self.load_balancer.clone(),
874            sni_resolver,
875        )
876        .with_cert_manager(Arc::clone(cert_manager))
877        .with_rps_registry(self.rps_registry());
878        if let Some(ref checker) = self.network_policy_checker {
879            https_server = https_server.with_network_policy_checker(checker.clone());
880        }
881        if let Some(activator) = self.current_activator().await {
882            https_server = https_server.with_activator(activator);
883        }
884        let https_server = Arc::new(https_server);
885        info!(port = https_port, bind = %https_addr, "Starting HTTPS ingress (retry-never-error)");
886        {
887            let server = https_server.clone();
888            tokio::spawn(async move {
889                if let Err(e) = server.run_https_with_retry(https_addr).await {
890                    warn!(port = https_port, error = %e, "HTTPS ingress accept loop exited");
891                }
892            });
893        }
894        // Additional v6-only TLS listener on `[::]:<https_port>`. Reuses the
895        // server's configured TlsAcceptor (same SNI resolver) via
896        // `run_https_on_listener`; shares the `https_server` Arc's shutdown.
897        {
898            let server = https_server.clone();
899            tokio::spawn(async move {
900                match bind_v6_only(https_port) {
901                    Ok(l) => {
902                        if let Err(e) = server.run_https_on_listener(l).await {
903                            warn!(port = https_port, error = %e, "HTTPS v6 ingress exited");
904                        }
905                    }
906                    Err(e) => {
907                        warn!(port = https_port, error = %e, "HTTPS v6 ingress bind failed (non-fatal)");
908                    }
909                }
910            });
911        }
912        self.servers.write().await.insert(https_port, https_server);
913    }
914
915    /// Stop all proxy servers on all ports.
916    ///
917    /// After signalling each server to shut down, waits up to 30 seconds for
918    /// active connections to drain before returning.
919    pub async fn stop(&self) {
920        let mut servers = self.servers.write().await;
921        for (port, server) in servers.drain() {
922            info!(port = port, "Stopping proxy on port");
923            server.shutdown();
924        }
925
926        // Wait up to 30s for active connections to drain
927        let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
928        while self.active_connections.load(Ordering::Relaxed) > 0 {
929            if tokio::time::Instant::now() >= deadline {
930                let remaining = self.active_connections.load(Ordering::Relaxed);
931                warn!(
932                    remaining = remaining,
933                    "Drain timeout reached, forcing shutdown"
934                );
935                break;
936            }
937            tokio::time::sleep(Duration::from_millis(100)).await;
938        }
939
940        info!("All proxy servers stopped");
941    }
942
943    /// Remove and shut down the listener on a specific port.
944    pub async fn unbind(&self, port: u16) {
945        let mut servers = self.servers.write().await;
946        if let Some(server) = servers.remove(&port) {
947            info!(port = port, "Unbinding proxy from port");
948            server.shutdown();
949        }
950    }
951
952    /// Scan a service's endpoints and ensure the proxy is listening on all
953    /// required ports.
954    ///
955    /// - **HTTP/HTTPS/WebSocket** endpoints start an HTTP proxy listener.
956    /// - **TCP** endpoints bind a `TcpListener` and spawn a `TcpStreamService`.
957    /// - **UDP** endpoints bind a `UdpSocket` and spawn a `UdpStreamService`.
958    ///
959    /// Bind address is determined by the `expose` type:
960    /// - **Public** endpoints bind to `0.0.0.0` (all interfaces).
961    /// - **Internal** endpoints bind to the overlay IP so they are only
962    ///   reachable from within the overlay network.  If no overlay is
963    ///   available, internal endpoints bind to `127.0.0.1` (localhost only).
964    ///
965    /// # Errors
966    /// Returns an error if an HTTP/HTTPS listener cannot be started.
967    pub async fn ensure_ports_for_service(
968        &self,
969        spec: &ServiceSpec,
970        overlay_ip: Option<IpAddr>,
971    ) -> Result<()> {
972        for endpoint in &spec.endpoints {
973            let bind_ip = match endpoint.expose {
974                ExposeType::Public => IpAddr::V4(Ipv4Addr::UNSPECIFIED), // 0.0.0.0
975                ExposeType::Internal => {
976                    // Prefer overlay IP; fall back to loopback if overlay is unavailable.
977                    let ip = overlay_ip.unwrap_or(IpAddr::V4(Ipv4Addr::LOCALHOST));
978                    if overlay_ip.is_none() {
979                        warn!(
980                            endpoint = %endpoint.name,
981                            port = endpoint.port,
982                            "No overlay IP available for internal endpoint; binding to 127.0.0.1"
983                        );
984                    }
985                    ip
986                }
987            };
988
989            match endpoint.protocol {
990                Protocol::Https => {
991                    // L7 TLS: start HTTPS proxy listener with SNI cert resolution
992                    self.listen_on_tls(endpoint.port, bind_ip).await?;
993                }
994                Protocol::Http | Protocol::Websocket => {
995                    // L7: start HTTP proxy listener
996                    self.listen_on(endpoint.port, bind_ip).await?;
997                }
998                Protocol::Tcp => {
999                    // L4 TCP: bind listener and spawn TcpStreamService.
1000                    // Translate the endpoint's `stream:` block into the proxy's
1001                    // runtime config (TLS / PROXY-protocol / health-check).
1002                    let cfg = translate_stream_config(endpoint.stream.as_ref());
1003                    self.ensure_tcp_listener(endpoint.port, bind_ip, &cfg).await;
1004                }
1005                Protocol::Udp => {
1006                    // L4 UDP: bind socket and spawn UdpStreamService.
1007                    let cfg = translate_stream_config(endpoint.stream.as_ref());
1008                    self.ensure_udp_listener(endpoint.port, bind_ip, &cfg).await;
1009                }
1010            }
1011        }
1012        Ok(())
1013    }
1014
1015    /// Ensure a TCP stream listener is running on the given port.
1016    ///
1017    /// If a listener is already active on this port, this is a no-op.
1018    /// Requires `stream_registry` to be configured; logs a warning if not.
1019    ///
1020    /// `cfg` carries the endpoint's translated `stream:` settings: when
1021    /// `cfg.tls` is set and a [`CertManager`] is configured, the listener
1022    /// terminates TLS using the shared SNI cert resolver; `cfg.proxy_protocol`
1023    /// makes the listener prepend a PROXY v2 header to upstream connections;
1024    /// and `cfg.health_check` is applied to the registered service so the
1025    /// background health checker probes it.
1026    async fn ensure_tcp_listener(&self, port: u16, bind_ip: IpAddr, cfg: &StreamProxyConfig) {
1027        // Apply the L4 health-check config to the service registered for this
1028        // port (registration of backends happens out-of-band in ServiceManager).
1029        self.apply_stream_health_check_tcp(port, cfg);
1030
1031        // Check if already listening
1032        {
1033            let listeners = self.tcp_listeners.read().await;
1034            if listeners.contains(&port) {
1035                debug!(port = port, "TCP stream listener already active");
1036                return;
1037            }
1038        }
1039
1040        let registry = if let Some(r) = &self.stream_registry {
1041            Arc::clone(r)
1042        } else {
1043            warn!(
1044                port = port,
1045                "Cannot start TCP listener: StreamRegistry not configured"
1046            );
1047            return;
1048        };
1049
1050        let addr = SocketAddr::new(bind_ip, port);
1051        let listener = match tokio::net::TcpListener::bind(addr).await {
1052            Ok(l) => l,
1053            Err(e) => {
1054                warn!(
1055                    port = port,
1056                    bind = %addr,
1057                    error = %e,
1058                    "Failed to bind TCP stream listener, continuing"
1059                );
1060                return;
1061            }
1062        };
1063
1064        // Mark as active before spawning
1065        {
1066            let mut listeners = self.tcp_listeners.write().await;
1067            listeners.insert(port);
1068        }
1069
1070        let mut tcp_service = TcpStreamService::new(registry, port);
1071        if cfg.proxy_protocol {
1072            tcp_service = tcp_service.with_proxy_protocol(true);
1073        }
1074        // L4 TLS termination: reuse the shared SNI cert resolver (same certs as
1075        // HTTPS endpoints). Only enabled when a CertManager backs the resolver.
1076        if cfg.tls {
1077            if self.cert_manager.is_some() {
1078                let resolver = self.sni_resolver();
1079                if let Some(cert_manager) = &self.cert_manager {
1080                    let _ = load_existing_certs_into_resolver(cert_manager, &resolver).await;
1081                }
1082                let acceptor = tls_acceptor_from_resolver(resolver);
1083                tcp_service = tcp_service.with_tls_acceptor(acceptor);
1084                info!(port = port, "L4 TCP TLS termination enabled");
1085            } else {
1086                warn!(
1087                    port = port,
1088                    "stream.tls requested but no CertManager configured; serving TCP without TLS"
1089                );
1090            }
1091        }
1092
1093        let tcp_service = Arc::new(tcp_service);
1094        tokio::spawn(async move {
1095            tcp_service.serve(listener).await;
1096        });
1097
1098        info!(port = port, bind = %addr, "TCP stream proxy listening");
1099    }
1100
1101    /// Apply a translated L4 config (incl. the health probe) to the TCP service
1102    /// registered on `port` (if any). No-op when no `stream_registry` / service
1103    /// is present.
1104    fn apply_stream_health_check_tcp(&self, port: u16, cfg: &StreamProxyConfig) {
1105        if let Some(registry) = &self.stream_registry {
1106            registry.set_tcp_config(port, cfg.clone());
1107        }
1108    }
1109
1110    /// Apply a translated L4 config (incl. the health probe) to the UDP service
1111    /// registered on `port` (if any). No-op when no `stream_registry` / service
1112    /// is present.
1113    fn apply_stream_health_check_udp(&self, port: u16, cfg: &StreamProxyConfig) {
1114        if let Some(registry) = &self.stream_registry {
1115            registry.set_udp_config(port, cfg.clone());
1116        }
1117    }
1118
1119    /// Ensure a UDP stream listener is running on the given port.
1120    ///
1121    /// If a listener is already active on this port, this is a no-op.
1122    /// Requires `stream_registry` to be configured; logs a warning if not.
1123    ///
1124    /// `cfg.session_timeout` overrides the per-session UDP idle timeout, and
1125    /// `cfg.health_check` is applied to the registered service so the
1126    /// background health checker performs the configured UDP probe.
1127    async fn ensure_udp_listener(&self, port: u16, bind_ip: IpAddr, cfg: &StreamProxyConfig) {
1128        // Apply the L4 health-check config to the service registered for this
1129        // port (backend registration happens out-of-band in ServiceManager).
1130        self.apply_stream_health_check_udp(port, cfg);
1131
1132        // Check if already listening
1133        {
1134            let listeners = self.udp_listeners.read().await;
1135            if listeners.contains(&port) {
1136                debug!(port = port, "UDP stream listener already active");
1137                return;
1138            }
1139        }
1140
1141        let registry = if let Some(r) = &self.stream_registry {
1142            Arc::clone(r)
1143        } else {
1144            warn!(
1145                port = port,
1146                "Cannot start UDP listener: StreamRegistry not configured"
1147            );
1148            return;
1149        };
1150
1151        let addr = SocketAddr::new(bind_ip, port);
1152        let socket = match tokio::net::UdpSocket::bind(addr).await {
1153            Ok(s) => s,
1154            Err(e) => {
1155                warn!(
1156                    port = port,
1157                    bind = %addr,
1158                    error = %e,
1159                    "Failed to bind UDP stream listener, continuing"
1160                );
1161                return;
1162            }
1163        };
1164
1165        // Mark as active before spawning
1166        {
1167            let mut listeners = self.udp_listeners.write().await;
1168            listeners.insert(port);
1169        }
1170
1171        let udp_service = Arc::new(UdpStreamService::new(registry, port, cfg.session_timeout));
1172        tokio::spawn(async move {
1173            if let Err(e) = udp_service.serve(socket).await {
1174                tracing::error!(
1175                    port = port,
1176                    error = %e,
1177                    "UDP stream proxy service failed"
1178                );
1179            }
1180        });
1181
1182        info!(port = port, bind = %addr, "UDP stream proxy listening");
1183    }
1184
1185    /// Publish a single container's exposed ports on the node loopback
1186    /// (`127.0.0.1:<endpoint.port>`), forwarding to wherever the container
1187    /// actually listens.
1188    ///
1189    /// This implements the GitHub-Actions "service published to localhost"
1190    /// convention so a consumer sharing the node loopback can reach the
1191    /// service at `localhost:<port>`. The published port is always
1192    /// `endpoint.port`; the backend the listener forwards to is
1193    /// `(container_ip, port_override.unwrap_or(endpoint.target_port()))`,
1194    /// which is already runtime-resolved by the caller:
1195    ///
1196    /// - On the macOS seatbelt/libkrun runtimes every replica shares the host
1197    ///   `127.0.0.1` and gets a unique `port_override`, so the container
1198    ///   listens on `127.0.0.1:<port_override>` and we forward there.
1199    /// - On Linux/VZ/HCS the container listens on its overlay IP, so
1200    ///   `container_ip` is the overlay address and `port_override` is `None`,
1201    ///   forwarding to `overlay_ip:<target_port>`.
1202    ///
1203    /// Backends accumulate across replicas so multiple members round-robin
1204    /// behind the single loopback port. `Public` endpoints are skipped: they
1205    /// are already bound on `0.0.0.0` and therefore already reachable on
1206    /// loopback — binding `127.0.0.1:<port>` again would fail with
1207    /// `EADDRINUSE`.
1208    ///
1209    /// This NEVER rewrites a container's own loopback: it only binds the
1210    /// NODE's `127.0.0.1` and forwards to the container's runtime-resolved
1211    /// address.
1212    ///
1213    /// Bind failures are tolerated (logged at `warn!`); this never panics on
1214    /// them.
1215    ///
1216    /// A published host port (`127.0.0.1:<port>`) is a GLOBAL host resource and
1217    /// is OWNED by the first `(deployment, service)` to publish it. A second
1218    /// publish for the SAME `(deployment, service)` appends a replica backend
1219    /// (legitimate scale-up). A publish for a DIFFERENT `(deployment, service)`
1220    /// is REFUSED with [`AgentError::PortConflict`] rather than silently
1221    /// appended into the foreign pool (Bug 7: that would make `:<port>` serve
1222    /// the wrong deployment's backends). On a conflict for any endpoint this
1223    /// returns `Err` after having published the conflict-free endpoints up to
1224    /// that point.
1225    ///
1226    /// `deployment` is the owning deployment name (`Some`) or `None` for
1227    /// standalone callers.
1228    ///
1229    /// # Errors
1230    /// Returns [`AgentError::PortConflict`] when a published port is already
1231    /// owned by a different `(deployment, service)`.
1232    pub async fn publish_loopback_for_container(
1233        &self,
1234        deployment: Option<&str>,
1235        service_name: &str,
1236        spec: &ServiceSpec,
1237        container_ip: IpAddr,
1238        port_override: Option<u16>,
1239    ) -> Result<()> {
1240        for endpoint in &spec.endpoints {
1241            // Public endpoints already bind 0.0.0.0 -> already on loopback.
1242            if matches!(endpoint.expose, ExposeType::Public) {
1243                continue;
1244            }
1245
1246            let backend = SocketAddr::new(
1247                container_ip,
1248                port_override.unwrap_or_else(|| endpoint.target_port()),
1249            );
1250            let publish_port = endpoint.port;
1251
1252            // Enforce host-port ownership before touching any registry.
1253            self.claim_published_port(deployment, service_name, publish_port)
1254                .await?;
1255
1256            match endpoint.protocol {
1257                Protocol::Tcp | Protocol::Http | Protocol::Https | Protocol::Websocket => {
1258                    // A raw TCP forward carries HTTP/HTTPS/WS just fine, so
1259                    // all L7 protocols ride the loopback TCP path.
1260                    self.publish_loopback_tcp(service_name, publish_port, backend)
1261                        .await;
1262                }
1263                Protocol::Udp => {
1264                    self.publish_loopback_udp(service_name, publish_port, backend)
1265                        .await;
1266                }
1267            }
1268        }
1269        Ok(())
1270    }
1271
1272    /// Claim ownership of host port `publish_port` for `(deployment, service)`.
1273    ///
1274    /// - Unowned → claim it and return `Ok`.
1275    /// - Owned by the SAME `(deployment, service)` → return `Ok` (scale-up).
1276    /// - Owned by a DIFFERENT `(deployment, service)` → return
1277    ///   [`AgentError::PortConflict`] (refuse the cross-wire).
1278    async fn claim_published_port(
1279        &self,
1280        deployment: Option<&str>,
1281        service_name: &str,
1282        publish_port: u16,
1283    ) -> Result<()> {
1284        let mut owners = self.published_ports.write().await;
1285        if let Some((owner_dep, owner_svc)) = owners.get(&publish_port) {
1286            if owner_dep.as_deref() == deployment && owner_svc == service_name {
1287                // Same owner: legitimate scale-up / re-publish.
1288                return Ok(());
1289            }
1290            let owner = format!("{}/{}", owner_dep.as_deref().unwrap_or("_"), owner_svc);
1291            let requester = format!("{}/{}", deployment.unwrap_or("_"), service_name);
1292            warn!(
1293                port = publish_port,
1294                owner = %owner,
1295                requester = %requester,
1296                "Refusing to publish host port already owned by a different deployment/service (would cross-wire backends)"
1297            );
1298            return Err(crate::error::AgentError::PortConflict {
1299                port: publish_port,
1300                owner,
1301                requester,
1302            });
1303        }
1304        owners.insert(
1305            publish_port,
1306            (deployment.map(str::to_string), service_name.to_string()),
1307        );
1308        Ok(())
1309    }
1310
1311    /// Register `backend` for the loopback TCP listener on `publish_port`,
1312    /// binding `127.0.0.1:<publish_port>` if it is not already bound.
1313    async fn publish_loopback_tcp(
1314        &self,
1315        service_name: &str,
1316        publish_port: u16,
1317        backend: SocketAddr,
1318    ) {
1319        // Accumulate the backend in the loopback registry.
1320        if let Some(existing) = self.loopback_registry.resolve_tcp(publish_port) {
1321            let mut backends = existing.backends;
1322            if !backends.contains(&backend) {
1323                backends.push(backend);
1324            }
1325            self.loopback_registry
1326                .update_tcp_backends(publish_port, backends);
1327        } else {
1328            self.loopback_registry.register_tcp(
1329                publish_port,
1330                StreamService::new(service_name.to_string(), vec![backend]),
1331            );
1332        }
1333
1334        // Bind the loopback listener once per port.
1335        let mut listeners = self.loopback_tcp.write().await;
1336        if listeners.contains_key(&publish_port) {
1337            debug!(port = publish_port, "Loopback TCP listener already active");
1338            return;
1339        }
1340
1341        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), publish_port);
1342        let listener = match tokio::net::TcpListener::bind(addr).await {
1343            Ok(l) => l,
1344            Err(e) => {
1345                warn!(
1346                    port = publish_port,
1347                    bind = %addr,
1348                    error = %e,
1349                    "Failed to bind loopback TCP listener, continuing"
1350                );
1351                return;
1352            }
1353        };
1354
1355        let tcp_service = Arc::new(TcpStreamService::new(
1356            Arc::clone(&self.loopback_registry),
1357            publish_port,
1358        ));
1359        let handle = tokio::spawn(async move {
1360            tcp_service.serve(listener).await;
1361        });
1362        listeners.insert(publish_port, handle);
1363        drop(listeners);
1364
1365        info!(
1366            service = service_name,
1367            port = publish_port,
1368            bind = %addr,
1369            backend = %backend,
1370            "Published service port on node loopback (TCP)"
1371        );
1372    }
1373
1374    /// Register `backend` for the loopback UDP listener on `publish_port`,
1375    /// binding `127.0.0.1:<publish_port>` if it is not already bound.
1376    async fn publish_loopback_udp(
1377        &self,
1378        service_name: &str,
1379        publish_port: u16,
1380        backend: SocketAddr,
1381    ) {
1382        if let Some(existing) = self.loopback_registry.resolve_udp(publish_port) {
1383            let mut backends = existing.backends;
1384            if !backends.contains(&backend) {
1385                backends.push(backend);
1386            }
1387            self.loopback_registry
1388                .update_udp_backends(publish_port, backends);
1389        } else {
1390            self.loopback_registry.register_udp(
1391                publish_port,
1392                StreamService::new(service_name.to_string(), vec![backend]),
1393            );
1394        }
1395
1396        let mut listeners = self.loopback_udp.write().await;
1397        if listeners.contains_key(&publish_port) {
1398            debug!(port = publish_port, "Loopback UDP listener already active");
1399            return;
1400        }
1401
1402        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), publish_port);
1403        let socket = match tokio::net::UdpSocket::bind(addr).await {
1404            Ok(s) => s,
1405            Err(e) => {
1406                warn!(
1407                    port = publish_port,
1408                    bind = %addr,
1409                    error = %e,
1410                    "Failed to bind loopback UDP listener, continuing"
1411                );
1412                return;
1413            }
1414        };
1415
1416        let udp_service = Arc::new(UdpStreamService::new(
1417            Arc::clone(&self.loopback_registry),
1418            publish_port,
1419            None,
1420        ));
1421        let handle = tokio::spawn(async move {
1422            if let Err(e) = udp_service.serve(socket).await {
1423                tracing::error!(
1424                    port = publish_port,
1425                    error = %e,
1426                    "Loopback UDP stream proxy service failed"
1427                );
1428            }
1429        });
1430        listeners.insert(publish_port, handle);
1431        drop(listeners);
1432
1433        info!(
1434            service = service_name,
1435            port = publish_port,
1436            bind = %addr,
1437            backend = %backend,
1438            "Published service port on node loopback (UDP)"
1439        );
1440    }
1441
1442    /// Remove a single container's backend from the node-loopback publish
1443    /// path. Mirrors [`Self::publish_loopback_for_container`]: it recomputes
1444    /// the same `(container_ip, port_override.unwrap_or(target_port))` backend
1445    /// per endpoint and drops it from the loopback registry.
1446    ///
1447    /// When a published port's backend set becomes empty, the registry entry
1448    /// is unregistered and the loopback listener is forgotten so the port is
1449    /// freed for the next bind. `Public` endpoints are skipped (they were
1450    /// never published here).
1451    pub async fn unpublish_loopback_for_container(
1452        &self,
1453        spec: &ServiceSpec,
1454        container_ip: IpAddr,
1455        port_override: Option<u16>,
1456    ) {
1457        for endpoint in &spec.endpoints {
1458            if matches!(endpoint.expose, ExposeType::Public) {
1459                continue;
1460            }
1461
1462            let backend = SocketAddr::new(
1463                container_ip,
1464                port_override.unwrap_or_else(|| endpoint.target_port()),
1465            );
1466            let publish_port = endpoint.port;
1467
1468            match endpoint.protocol {
1469                Protocol::Tcp | Protocol::Http | Protocol::Https | Protocol::Websocket => {
1470                    self.unpublish_loopback_tcp(publish_port, backend).await;
1471                }
1472                Protocol::Udp => {
1473                    self.unpublish_loopback_udp(publish_port, backend).await;
1474                }
1475            }
1476        }
1477    }
1478
1479    /// Drop `backend` from the loopback TCP service on `publish_port`,
1480    /// freeing the listener when no backends remain.
1481    async fn unpublish_loopback_tcp(&self, publish_port: u16, backend: SocketAddr) {
1482        let Some(existing) = self.loopback_registry.resolve_tcp(publish_port) else {
1483            return;
1484        };
1485        let remaining: Vec<SocketAddr> = existing
1486            .backends
1487            .into_iter()
1488            .filter(|b| *b != backend)
1489            .collect();
1490
1491        if remaining.is_empty() {
1492            let _ = self.loopback_registry.unregister_tcp(publish_port);
1493            let mut listeners = self.loopback_tcp.write().await;
1494            if let Some(handle) = listeners.remove(&publish_port) {
1495                handle.abort();
1496            }
1497            // Release host-port ownership so a different (deployment, service)
1498            // may bind it next.
1499            self.published_ports.write().await.remove(&publish_port);
1500            debug!(
1501                port = publish_port,
1502                "Freed loopback TCP listener (no backends remain)"
1503            );
1504        } else {
1505            self.loopback_registry
1506                .update_tcp_backends(publish_port, remaining);
1507        }
1508    }
1509
1510    /// Drop `backend` from the loopback UDP service on `publish_port`,
1511    /// freeing the listener when no backends remain.
1512    async fn unpublish_loopback_udp(&self, publish_port: u16, backend: SocketAddr) {
1513        let Some(existing) = self.loopback_registry.resolve_udp(publish_port) else {
1514            return;
1515        };
1516        let remaining: Vec<SocketAddr> = existing
1517            .backends
1518            .into_iter()
1519            .filter(|b| *b != backend)
1520            .collect();
1521
1522        if remaining.is_empty() {
1523            let _ = self.loopback_registry.unregister_udp(publish_port);
1524            let mut listeners = self.loopback_udp.write().await;
1525            if let Some(handle) = listeners.remove(&publish_port) {
1526                handle.abort();
1527            }
1528            // Release host-port ownership so a different (deployment, service)
1529            // may bind it next.
1530            self.published_ports.write().await.remove(&publish_port);
1531            debug!(
1532                port = publish_port,
1533                "Freed loopback UDP listener (no backends remain)"
1534            );
1535        } else {
1536            self.loopback_registry
1537                .update_udp_backends(publish_port, remaining);
1538        }
1539    }
1540
1541    /// Publish one Docker-style port mapping (`host_ip:host_port -> container`)
1542    /// on the host, forwarding to the container's `container_port`.
1543    ///
1544    /// Unlike [`Self::publish_loopback_for_container`] this is driven by
1545    /// `spec.port_mappings`, NOT by endpoints, so a workload with port
1546    /// mappings and zero endpoints still gets a host listener. The listener
1547    /// binds `mapping.host_ip` (default `0.0.0.0`) on `mapping.host_port`; a
1548    /// `host_port` of `None` or `0` requests an OS-assigned ephemeral port.
1549    /// The backend forwarded to is `backend` (the caller resolves it to the
1550    /// container's runtime address + `container_port`/override).
1551    ///
1552    /// Returns the actually-bound host port (resolves ephemeral). Host-port
1553    /// ownership is enforced via the shared `published_ports` map: a second
1554    /// publish for the SAME `(deployment, owner)` appends a replica backend; a
1555    /// DIFFERENT owner on the same explicit port is refused with
1556    /// [`AgentError::PortConflict`].
1557    ///
1558    /// # Errors
1559    /// Returns [`AgentError::PortConflict`] on a cross-owner explicit-port
1560    /// collision, or an IO error if an ephemeral bind fails.
1561    pub async fn publish_port_mapping(
1562        &self,
1563        deployment: Option<&str>,
1564        owner_name: &str,
1565        mapping: &PortMapping,
1566        backend: SocketAddr,
1567    ) -> Result<u16> {
1568        let bind_ip: IpAddr = mapping
1569            .host_ip
1570            .parse()
1571            .unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED));
1572        let requested = mapping.host_port.filter(|p| *p != 0);
1573        let is_udp = matches!(mapping.protocol, PortProtocol::Udp);
1574        let port = match mapping.protocol {
1575            PortProtocol::Udp => {
1576                self.publish_port_mapping_udp(deployment, owner_name, bind_ip, requested, backend)
1577                    .await?
1578            }
1579            // TCP (and any non-UDP) rides the TCP forward.
1580            PortProtocol::Tcp => {
1581                self.publish_port_mapping_tcp(deployment, owner_name, bind_ip, requested, backend)
1582                    .await?
1583            }
1584        };
1585
1586        // Open the resolved host port on the host firewall so peers on other
1587        // nodes can reach a service published here — required for an
1588        // `OverlayMode::Shared` service (whose only ingress is the free-port
1589        // proxy) and correct for Docker-style published ports in general.
1590        // Idempotent (multiple replicas behind one host port re-call this),
1591        // best-effort + non-fatal.
1592        if let Err(e) = zlayer_overlay::firewall::ensure_published_port(port, is_udp) {
1593            warn!(
1594                error = %e,
1595                port,
1596                udp = is_udp,
1597                "could not open published port on host firewall (non-fatal)"
1598            );
1599        }
1600        Ok(port)
1601    }
1602
1603    /// Bind `bind_ip:<port>` for a TCP port mapping, accumulating `backend`.
1604    /// `requested` is the explicit host port, or `None` for ephemeral.
1605    async fn publish_port_mapping_tcp(
1606        &self,
1607        deployment: Option<&str>,
1608        owner_name: &str,
1609        bind_ip: IpAddr,
1610        requested: Option<u16>,
1611        backend: SocketAddr,
1612    ) -> Result<u16> {
1613        if let Some(port) = requested {
1614            // Explicit port: claim ownership before binding.
1615            self.claim_published_port(deployment, owner_name, port)
1616                .await?;
1617            self.accumulate_port_map_tcp_backend(port, owner_name, backend);
1618
1619            let mut listeners = self.port_map_tcp.write().await;
1620            if listeners.contains_key(&port) {
1621                debug!(port = port, "port-mapping TCP listener already active");
1622                return Ok(port);
1623            }
1624            let addr = SocketAddr::new(bind_ip, port);
1625            match tokio::net::TcpListener::bind(addr).await {
1626                Ok(listener) => {
1627                    let svc = Arc::new(TcpStreamService::new(
1628                        Arc::clone(&self.port_map_registry),
1629                        port,
1630                    ));
1631                    let handle = tokio::spawn(async move {
1632                        svc.serve(listener).await;
1633                    });
1634                    listeners.insert(port, handle);
1635                    drop(listeners);
1636                    info!(service = owner_name, port = port, bind = %addr, backend = %backend,
1637                        "Published port mapping (TCP)");
1638                    Ok(port)
1639                }
1640                Err(e) => {
1641                    warn!(port = port, bind = %addr, error = %e,
1642                        "Failed to bind port-mapping TCP listener, continuing");
1643                    Ok(port)
1644                }
1645            }
1646        } else {
1647            // Ephemeral: bind first to learn the port, then claim it.
1648            let addr = SocketAddr::new(bind_ip, 0);
1649            let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
1650                crate::error::AgentError::Network(format!(
1651                    "failed to bind ephemeral port-mapping TCP listener on {bind_ip}: {e}"
1652                ))
1653            })?;
1654            let port = listener.local_addr().map(|a| a.port()).map_err(|e| {
1655                crate::error::AgentError::Network(format!(
1656                    "failed to read ephemeral port-mapping TCP local addr: {e}"
1657                ))
1658            })?;
1659            self.claim_published_port(deployment, owner_name, port)
1660                .await?;
1661            self.accumulate_port_map_tcp_backend(port, owner_name, backend);
1662            let svc = Arc::new(TcpStreamService::new(
1663                Arc::clone(&self.port_map_registry),
1664                port,
1665            ));
1666            let handle = tokio::spawn(async move {
1667                svc.serve(listener).await;
1668            });
1669            self.port_map_tcp.write().await.insert(port, handle);
1670            info!(service = owner_name, port = port, bind = %bind_ip, backend = %backend,
1671                "Published port mapping on ephemeral host port (TCP)");
1672            Ok(port)
1673        }
1674    }
1675
1676    /// Accumulate `backend` into the port-map TCP registry under `port`.
1677    fn accumulate_port_map_tcp_backend(&self, port: u16, owner_name: &str, backend: SocketAddr) {
1678        if let Some(existing) = self.port_map_registry.resolve_tcp(port) {
1679            let mut backends = existing.backends;
1680            if !backends.contains(&backend) {
1681                backends.push(backend);
1682            }
1683            self.port_map_registry.update_tcp_backends(port, backends);
1684        } else {
1685            self.port_map_registry.register_tcp(
1686                port,
1687                StreamService::new(owner_name.to_string(), vec![backend]),
1688            );
1689        }
1690    }
1691
1692    /// Bind `bind_ip:<port>` for a UDP port mapping, accumulating `backend`.
1693    /// `requested` is the explicit host port, or `None` for ephemeral.
1694    async fn publish_port_mapping_udp(
1695        &self,
1696        deployment: Option<&str>,
1697        owner_name: &str,
1698        bind_ip: IpAddr,
1699        requested: Option<u16>,
1700        backend: SocketAddr,
1701    ) -> Result<u16> {
1702        if let Some(port) = requested {
1703            // Explicit port: claim ownership before binding.
1704            self.claim_published_port(deployment, owner_name, port)
1705                .await?;
1706            self.accumulate_port_map_udp_backend(port, owner_name, backend);
1707
1708            let mut listeners = self.port_map_udp.write().await;
1709            if listeners.contains_key(&port) {
1710                debug!(port = port, "port-mapping UDP listener already active");
1711                return Ok(port);
1712            }
1713            let addr = SocketAddr::new(bind_ip, port);
1714            match tokio::net::UdpSocket::bind(addr).await {
1715                Ok(socket) => {
1716                    let svc = Arc::new(UdpStreamService::new(
1717                        Arc::clone(&self.port_map_registry),
1718                        port,
1719                        None,
1720                    ));
1721                    let handle = tokio::spawn(async move {
1722                        if let Err(e) = svc.serve(socket).await {
1723                            tracing::error!(
1724                                port = port,
1725                                error = %e,
1726                                "port-mapping UDP stream proxy service failed"
1727                            );
1728                        }
1729                    });
1730                    listeners.insert(port, handle);
1731                    drop(listeners);
1732                    info!(service = owner_name, port = port, bind = %addr, backend = %backend,
1733                        "Published port mapping (UDP)");
1734                    Ok(port)
1735                }
1736                Err(e) => {
1737                    warn!(port = port, bind = %addr, error = %e,
1738                        "Failed to bind port-mapping UDP listener, continuing");
1739                    Ok(port)
1740                }
1741            }
1742        } else {
1743            // Ephemeral: bind first to learn the port, then claim it.
1744            let addr = SocketAddr::new(bind_ip, 0);
1745            let socket = tokio::net::UdpSocket::bind(addr).await.map_err(|e| {
1746                crate::error::AgentError::Network(format!(
1747                    "failed to bind ephemeral port-mapping UDP listener on {bind_ip}: {e}"
1748                ))
1749            })?;
1750            let port = socket.local_addr().map(|a| a.port()).map_err(|e| {
1751                crate::error::AgentError::Network(format!(
1752                    "failed to read ephemeral port-mapping UDP local addr: {e}"
1753                ))
1754            })?;
1755            self.claim_published_port(deployment, owner_name, port)
1756                .await?;
1757            self.accumulate_port_map_udp_backend(port, owner_name, backend);
1758            let svc = Arc::new(UdpStreamService::new(
1759                Arc::clone(&self.port_map_registry),
1760                port,
1761                None,
1762            ));
1763            let handle = tokio::spawn(async move {
1764                if let Err(e) = svc.serve(socket).await {
1765                    tracing::error!(
1766                        port = port,
1767                        error = %e,
1768                        "port-mapping UDP stream proxy service failed"
1769                    );
1770                }
1771            });
1772            self.port_map_udp.write().await.insert(port, handle);
1773            info!(service = owner_name, port = port, bind = %bind_ip, backend = %backend,
1774                "Published port mapping on ephemeral host port (UDP)");
1775            Ok(port)
1776        }
1777    }
1778
1779    /// Accumulate `backend` into the port-map UDP registry under `port`.
1780    fn accumulate_port_map_udp_backend(&self, port: u16, owner_name: &str, backend: SocketAddr) {
1781        if let Some(existing) = self.port_map_registry.resolve_udp(port) {
1782            let mut backends = existing.backends;
1783            if !backends.contains(&backend) {
1784                backends.push(backend);
1785            }
1786            self.port_map_registry.update_udp_backends(port, backends);
1787        } else {
1788            self.port_map_registry.register_udp(
1789                port,
1790                StreamService::new(owner_name.to_string(), vec![backend]),
1791            );
1792        }
1793    }
1794
1795    /// Remove a port mapping's backend; free the listener + ownership when the
1796    /// last backend on that host port is gone. `port` is the bound host port
1797    /// (the caller passes the value returned by `publish_port_mapping`).
1798    pub async fn unpublish_port_mapping(
1799        &self,
1800        port: u16,
1801        protocol: PortProtocol,
1802        backend: SocketAddr,
1803    ) {
1804        match protocol {
1805            PortProtocol::Udp => self.unpublish_port_mapping_udp(port, backend).await,
1806            PortProtocol::Tcp => self.unpublish_port_mapping_tcp(port, backend).await,
1807        }
1808    }
1809
1810    /// Drop `backend` from the port-map TCP service on `port`, freeing the
1811    /// listener when no backends remain.
1812    async fn unpublish_port_mapping_tcp(&self, port: u16, backend: SocketAddr) {
1813        let Some(existing) = self.port_map_registry.resolve_tcp(port) else {
1814            return;
1815        };
1816        let remaining: Vec<SocketAddr> = existing
1817            .backends
1818            .into_iter()
1819            .filter(|b| *b != backend)
1820            .collect();
1821
1822        if remaining.is_empty() {
1823            let _ = self.port_map_registry.unregister_tcp(port);
1824            let mut listeners = self.port_map_tcp.write().await;
1825            if let Some(handle) = listeners.remove(&port) {
1826                handle.abort();
1827            }
1828            // Release host-port ownership so a different (deployment, service)
1829            // may bind it next.
1830            self.published_ports.write().await.remove(&port);
1831            // Close the host-firewall allow-rule now that the last backend is
1832            // gone (only here, so a sibling replica still using the port keeps
1833            // its rule). Best-effort.
1834            zlayer_overlay::firewall::remove_published_port(port, false);
1835            debug!(
1836                port = port,
1837                "Freed port-mapping TCP listener (no backends remain)"
1838            );
1839        } else {
1840            self.port_map_registry.update_tcp_backends(port, remaining);
1841        }
1842    }
1843
1844    /// Drop `backend` from the port-map UDP service on `port`, freeing the
1845    /// listener when no backends remain.
1846    async fn unpublish_port_mapping_udp(&self, port: u16, backend: SocketAddr) {
1847        let Some(existing) = self.port_map_registry.resolve_udp(port) else {
1848            return;
1849        };
1850        let remaining: Vec<SocketAddr> = existing
1851            .backends
1852            .into_iter()
1853            .filter(|b| *b != backend)
1854            .collect();
1855
1856        if remaining.is_empty() {
1857            let _ = self.port_map_registry.unregister_udp(port);
1858            let mut listeners = self.port_map_udp.write().await;
1859            if let Some(handle) = listeners.remove(&port) {
1860                handle.abort();
1861            }
1862            // Release host-port ownership so a different (deployment, service)
1863            // may bind it next.
1864            self.published_ports.write().await.remove(&port);
1865            // Close the host-firewall allow-rule now that the last backend is
1866            // gone (only here, so a sibling replica still using the port keeps
1867            // its rule). Best-effort.
1868            zlayer_overlay::firewall::remove_published_port(port, true);
1869            debug!(
1870                port = port,
1871                "Freed port-mapping UDP listener (no backends remain)"
1872            );
1873        } else {
1874            self.port_map_registry.update_udp_backends(port, remaining);
1875        }
1876    }
1877
1878    /// Decide whether an endpoint should have a managed TLS certificate
1879    /// provisioned via ACME, returning the FQDN to provision when so.
1880    ///
1881    /// An endpoint qualifies ONLY when ALL of the following hold:
1882    /// 1. its protocol is [`Protocol::Https`],
1883    /// 2. it is publicly exposed ([`ExposeType::Public`]) — never for
1884    ///    internal-only endpoints (ACME HTTP-01 validation would fail and waste
1885    ///    Let's Encrypt rate limit), and
1886    /// 3. it carries a concrete, non-wildcard FQDN `host` (a real hostname,
1887    ///    not `None`, empty, `*`, or a `*.example.com` wildcard pattern — ACME
1888    ///    cannot satisfy an HTTP-01 challenge for a wildcard, and a hostless
1889    ///    endpoint has no name to validate).
1890    ///
1891    /// Returns the trimmed host string to provision, or `None` if the endpoint
1892    /// does not qualify. This is a pure function of the endpoint so it can be
1893    /// unit-tested without any ACME network access.
1894    #[must_use]
1895    pub fn endpoint_wants_managed_cert(endpoint: &zlayer_spec::EndpointSpec) -> Option<&str> {
1896        if endpoint.protocol != Protocol::Https {
1897            return None;
1898        }
1899        if endpoint.expose != ExposeType::Public {
1900            return None;
1901        }
1902        let host = endpoint.host.as_deref()?.trim();
1903        // Reject empty hosts and any wildcard pattern (`*`, `*.foo`, or a host
1904        // that contains a wildcard label). ACME HTTP-01 cannot validate these.
1905        if host.is_empty() || host.contains('*') {
1906            return None;
1907        }
1908        Some(host)
1909    }
1910
1911    /// Add routes for a service based on its specification
1912    ///
1913    /// This creates proxy routes for each endpoint defined in the `ServiceSpec`.
1914    /// HTTP/HTTPS/WebSocket endpoints get L7 routes via the `ServiceRegistry`.
1915    /// TCP/UDP endpoints are tracked but their L4 registration is handled
1916    /// by the `ServiceManager::register_service_routes()` method.
1917    ///
1918    /// The owning `deployment` (from `ServiceSpec.deployment`) scopes the LB
1919    /// group keys so two deployments that share a `service`+`endpoint` name
1920    /// keep independent backend pools (Bug 7). `None` for standalone /
1921    /// single-deployment callers.
1922    #[allow(clippy::too_many_lines)]
1923    pub async fn add_service(&self, name: &str, spec: &ServiceSpec) {
1924        let deployment = spec.deployment.as_deref();
1925        let mut services = self.services.write().await;
1926
1927        // Track which endpoints and ports we're adding
1928        let mut endpoint_names = Vec::new();
1929        let mut tcp_ports = Vec::new();
1930        let mut udp_ports = Vec::new();
1931        let mut http_ports = Vec::new();
1932
1933        for endpoint in &spec.endpoints {
1934            match endpoint.protocol {
1935                Protocol::Http | Protocol::Https | Protocol::Websocket => {
1936                    // L7: register route in the ServiceRegistry
1937                    let entry = RouteEntry::from_endpoint(deployment, name, endpoint);
1938                    self.registry.register(entry).await;
1939                    http_ports.push(endpoint.port);
1940
1941                    // Register one LB group per L7 endpoint, keyed by the
1942                    // deployment-scoped composite
1943                    // `{deployment}/{service}#{endpoint}`. This matches the
1944                    // `resolved.name` set by `RouteEntry::from_endpoint` and
1945                    // is required so that (a) different endpoints on the same
1946                    // service (potentially with different `target_role`
1947                    // filters) maintain independent backend pools, and (b)
1948                    // two deployments sharing a service+endpoint name do not
1949                    // cross-wire into one pool.
1950                    let lb_key = endpoint_lb_key(deployment, name, &endpoint.name);
1951                    self.load_balancer
1952                        .register(&lb_key, vec![], LbStrategy::RoundRobin);
1953
1954                    info!(
1955                        service = name,
1956                        endpoint = %endpoint.name,
1957                        protocol = ?endpoint.protocol,
1958                        path = ?endpoint.path,
1959                        expose = ?endpoint.expose,
1960                        "Added HTTP proxy route for service"
1961                    );
1962                }
1963                Protocol::Tcp => {
1964                    tcp_ports.push(endpoint.port);
1965                    info!(
1966                        service = name,
1967                        endpoint = %endpoint.name,
1968                        protocol = ?endpoint.protocol,
1969                        port = endpoint.port,
1970                        expose = ?endpoint.expose,
1971                        "Tracking TCP stream endpoint for service"
1972                    );
1973                }
1974                Protocol::Udp => {
1975                    udp_ports.push(endpoint.port);
1976                    info!(
1977                        service = name,
1978                        endpoint = %endpoint.name,
1979                        protocol = ?endpoint.protocol,
1980                        port = endpoint.port,
1981                        expose = ?endpoint.expose,
1982                        "Tracking UDP stream endpoint for service"
1983                    );
1984                }
1985            }
1986
1987            endpoint_names.push(endpoint.name.clone());
1988        }
1989
1990        // Register a service-level LB group as well so legacy callers that
1991        // use `update_backends(service, ...)` (which fans out to all
1992        // endpoints) and any code that selects by bare service name still
1993        // resolve. Per-endpoint LB groups (registered above) are the
1994        // primary source for L7 select; this is a no-op for callers that
1995        // already use composite keys.
1996        self.load_balancer
1997            .register(name, vec![], LbStrategy::RoundRobin);
1998
1999        services.insert(
2000            name.to_string(),
2001            ServiceTracking {
2002                deployment: deployment.map(str::to_string),
2003                endpoint_names,
2004                tcp_ports,
2005                udp_ports,
2006                http_ports,
2007            },
2008        );
2009
2010        // Release the services write lock before touching ACME state below.
2011        drop(services);
2012
2013        // ACME provisioning trigger: for every endpoint that wants a managed
2014        // certificate (public HTTPS vhost with a concrete FQDN), kick off a
2015        // detached, best-effort provisioning task. This is the keystone that
2016        // makes public HTTPS vhosts actually serve TLS — without it the ACME
2017        // engine exists but is never invoked, so the live `:443` listener has
2018        // no certificate for the host and serves "No certificate found".
2019        //
2020        // Each task is spawned (never awaited) so `add_service` does NOT block
2021        // on ACME, which can take seconds. `get_cert` is idempotent: it returns
2022        // a cached/on-disk cert without touching the network on a hit, and only
2023        // provisions via ACME HTTP-01 on a miss. The per-host
2024        // `provisioning_requested` guard ensures we fire at most once per host
2025        // for this manager's lifetime, so a reconcile re-running `add_service`
2026        // does not re-attempt a failed/in-flight provision and burn rate limit.
2027        if let Some(cert_manager) = &self.cert_manager {
2028            for endpoint in &spec.endpoints {
2029                let Some(host) = Self::endpoint_wants_managed_cert(endpoint) else {
2030                    continue;
2031                };
2032                let host = host.to_string();
2033
2034                // De-dupe: only the first request for a given host schedules a task.
2035                {
2036                    let mut requested = self.provisioning_requested.write().await;
2037                    if !requested.insert(host.clone()) {
2038                        continue;
2039                    }
2040                }
2041
2042                let cert_manager = Arc::clone(cert_manager);
2043                let sni_resolver = self.sni_resolver();
2044                let service_name = name.to_string();
2045                tokio::spawn(async move {
2046                    info!(
2047                        service = %service_name,
2048                        host = %host,
2049                        "Provisioning managed TLS certificate for public HTTPS vhost"
2050                    );
2051                    match cert_manager.get_cert(&host).await {
2052                        Ok((cert_pem, key_pem)) => {
2053                            // Hot-load into the SHARED resolver so the live
2054                            // listener serves it immediately.
2055                            if let Err(e) = sni_resolver.refresh_cert(&host, &cert_pem, &key_pem) {
2056                                warn!(
2057                                    host = %host,
2058                                    error = %e,
2059                                    "Provisioned certificate but failed to load it into the SNI resolver"
2060                                );
2061                            } else {
2062                                info!(
2063                                    host = %host,
2064                                    "Loaded managed TLS certificate into live HTTPS listener"
2065                                );
2066                            }
2067                        }
2068                        Err(e) => {
2069                            warn!(
2070                                service = %service_name,
2071                                host = %host,
2072                                error = %e,
2073                                "Failed to provision managed TLS certificate (best-effort; route is still registered)"
2074                            );
2075                        }
2076                    }
2077                });
2078            }
2079        }
2080    }
2081
2082    /// Remove all routes, L4 listeners, and HTTP server handles for a service.
2083    ///
2084    /// This performs a full cleanup of all proxy resources associated with the
2085    /// service:
2086    /// - Removes L7 (HTTP/HTTPS/WebSocket) routes from the `ServiceRegistry`
2087    /// - Unregisters TCP/UDP stream services from the `StreamRegistry`
2088    /// - Removes port tracking for TCP/UDP listeners
2089    /// - Shuts down HTTP proxy server handles that were exclusively owned by
2090    ///   this service (only if no other service uses the same port)
2091    pub async fn remove_service(&self, name: &str) {
2092        let mut services = self.services.write().await;
2093
2094        if let Some(tracking) = services.remove(name) {
2095            // 1. Remove L7 routes from the ServiceRegistry
2096            self.registry.unregister_service(name).await;
2097
2098            // 1b. Remove from the load balancer (both the service-level
2099            //     group and every per-endpoint composite group). The
2100            //     per-endpoint keys are deployment-scoped to match the keys
2101            //     registered in `add_service`.
2102            self.load_balancer.unregister(name);
2103            let deployment = tracking.deployment.as_deref();
2104            for endpoint_name in &tracking.endpoint_names {
2105                let lb_key = endpoint_lb_key(deployment, name, endpoint_name);
2106                self.load_balancer.unregister(&lb_key);
2107            }
2108
2109            // 2. Unregister TCP stream services and clear port tracking
2110            if !tracking.tcp_ports.is_empty() {
2111                let mut tcp_set = self.tcp_listeners.write().await;
2112                for port in &tracking.tcp_ports {
2113                    if let Some(registry) = &self.stream_registry {
2114                        let _ = registry.unregister_tcp(*port);
2115                    }
2116                    tcp_set.remove(port);
2117                    debug!(service = name, port = port, "Removed TCP listener tracking");
2118                }
2119            }
2120
2121            // 3. Unregister UDP stream services and clear port tracking
2122            if !tracking.udp_ports.is_empty() {
2123                let mut udp_set = self.udp_listeners.write().await;
2124                for port in &tracking.udp_ports {
2125                    if let Some(registry) = &self.stream_registry {
2126                        let _ = registry.unregister_udp(*port);
2127                    }
2128                    udp_set.remove(port);
2129                    debug!(service = name, port = port, "Removed UDP listener tracking");
2130                }
2131            }
2132
2133            // 4. Shut down HTTP proxy servers on ports exclusively owned by
2134            //    this service (skip ports still used by other services)
2135            if !tracking.http_ports.is_empty() {
2136                let ports_still_in_use: HashSet<u16> = services
2137                    .values()
2138                    .flat_map(|t| t.http_ports.iter().copied())
2139                    .collect();
2140
2141                let mut servers = self.servers.write().await;
2142                for port in &tracking.http_ports {
2143                    if !ports_still_in_use.contains(port) {
2144                        if let Some(server) = servers.remove(port) {
2145                            server.shutdown();
2146                            info!(
2147                                service = name,
2148                                port = port,
2149                                "Shut down HTTP proxy server (no remaining services on port)"
2150                            );
2151                        }
2152                    }
2153                }
2154            }
2155
2156            info!(service = name, "Removed all proxy resources for service");
2157        }
2158    }
2159
2160    /// Add a single backend to a service.
2161    ///
2162    /// Adds to the service-level LB group **and** to every per-endpoint LB
2163    /// group tracked for `service`. Per-endpoint role filtering happens at
2164    /// collection time in the agent's service manager, so any backend
2165    /// surfaced here is already eligible for every endpoint.
2166    pub async fn add_backend(&self, service: &str, addr: SocketAddr) {
2167        self.registry.add_backend(service, addr).await;
2168        self.load_balancer.add_backend(service, addr);
2169        // Fan out to every per-endpoint LB group for backward-compat.
2170        let services = self.services.read().await;
2171        if let Some(tracking) = services.get(service) {
2172            let deployment = tracking.deployment.as_deref();
2173            for endpoint_name in &tracking.endpoint_names {
2174                let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
2175                self.load_balancer.add_backend(&lb_key, addr);
2176            }
2177        }
2178        info!(service = service, backend = %addr, "Registered backend with proxy");
2179    }
2180
2181    /// Remove a backend from a service.
2182    ///
2183    /// Removes from the service-level LB group **and** from every
2184    /// per-endpoint LB group.
2185    pub async fn remove_backend(&self, service: &str, addr: SocketAddr) {
2186        self.registry.remove_backend(service, addr).await;
2187        self.load_balancer.remove_backend(service, &addr);
2188        let services = self.services.read().await;
2189        if let Some(tracking) = services.get(service) {
2190            let deployment = tracking.deployment.as_deref();
2191            for endpoint_name in &tracking.endpoint_names {
2192                let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
2193                self.load_balancer.remove_backend(&lb_key, &addr);
2194            }
2195        }
2196        debug!(service = service, backend = %addr, "Removed backend from service");
2197    }
2198
2199    /// Update the health status of a backend in the load balancer.
2200    ///
2201    /// Delegates to [`LoadBalancer::mark_health`] so that unhealthy backends
2202    /// are skipped during selection. Health is tracked on both the
2203    /// service-level group and every per-endpoint group that contains
2204    /// this address.
2205    #[allow(clippy::unused_async)]
2206    pub async fn update_backend_health(&self, service: &str, addr: SocketAddr, healthy: bool) {
2207        self.load_balancer.mark_health(service, &addr, healthy);
2208        let services = self.services.read().await;
2209        if let Some(tracking) = services.get(service) {
2210            let deployment = tracking.deployment.as_deref();
2211            for endpoint_name in &tracking.endpoint_names {
2212                let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
2213                self.load_balancer.mark_health(&lb_key, &addr, healthy);
2214            }
2215        }
2216        debug!(
2217            service = service,
2218            backend = %addr,
2219            healthy = healthy,
2220            "Updated backend health in load balancer"
2221        );
2222    }
2223
2224    /// Update the backends for **every** endpoint of a service with the
2225    /// same list.
2226    ///
2227    /// Use this only when caller cannot distinguish per-endpoint backend
2228    /// sets (e.g., legacy paths that do not honor `target_role`). Prefer
2229    /// [`Self::update_endpoint_backends`] when per-endpoint filtering is
2230    /// possible.
2231    pub async fn update_backends(&self, service: &str, addrs: Vec<SocketAddr>) {
2232        self.registry.update_backends(service, addrs.clone()).await;
2233        // Update the service-level LB group plus every per-endpoint group.
2234        self.load_balancer.update_backends(service, addrs.clone());
2235        let services = self.services.read().await;
2236        if let Some(tracking) = services.get(service) {
2237            let deployment = tracking.deployment.as_deref();
2238            for endpoint_name in &tracking.endpoint_names {
2239                let lb_key = endpoint_lb_key(deployment, service, endpoint_name);
2240                self.load_balancer.update_backends(&lb_key, addrs.clone());
2241            }
2242        }
2243        debug!(service = service, "Updated backends for service");
2244    }
2245
2246    /// Update backends for a single L7 endpoint of a service.
2247    ///
2248    /// This honors [`EndpointSpec::target_role`] filtering: the caller
2249    /// supplies the role-filtered backend list and this method updates
2250    /// only the routes and LB group corresponding to `(service,
2251    /// endpoint_name)`.
2252    pub async fn update_endpoint_backends(
2253        &self,
2254        service: &str,
2255        endpoint_name: &str,
2256        addrs: Vec<SocketAddr>,
2257    ) {
2258        self.registry
2259            .update_backends_for_endpoint(service, endpoint_name, addrs.clone())
2260            .await;
2261        // Resolve the owning deployment so the LB key matches what
2262        // `add_service` registered.
2263        let deployment = {
2264            let services = self.services.read().await;
2265            services.get(service).and_then(|t| t.deployment.clone())
2266        };
2267        let lb_key = endpoint_lb_key(deployment.as_deref(), service, endpoint_name);
2268        self.load_balancer.update_backends(&lb_key, addrs);
2269        debug!(
2270            service = service,
2271            endpoint = endpoint_name,
2272            "Updated backends for service endpoint"
2273        );
2274    }
2275
2276    /// Get the number of registered routes
2277    pub async fn route_count(&self) -> usize {
2278        self.registry.route_count().await
2279    }
2280
2281    /// Get the list of registered service names
2282    pub async fn list_services(&self) -> Vec<String> {
2283        self.services.read().await.keys().cloned().collect()
2284    }
2285
2286    /// Check if a service has any registered endpoints
2287    pub async fn has_service(&self, name: &str) -> bool {
2288        self.services.read().await.contains_key(name)
2289    }
2290}
2291
2292#[cfg(test)]
2293mod tests {
2294    use super::*;
2295
2296    fn mock_service_spec_with_endpoints() -> ServiceSpec {
2297        use zlayer_spec::*;
2298        serde_yaml::from_str::<DeploymentSpec>(
2299            r"
2300version: v1
2301deployment: test
2302services:
2303  test:
2304    rtype: service
2305    image:
2306      name: test:latest
2307    endpoints:
2308      - name: http
2309        protocol: http
2310        port: 8080
2311        path: /api
2312        expose: public
2313      - name: websocket
2314        protocol: websocket
2315        port: 8081
2316        path: /ws
2317        expose: internal
2318",
2319        )
2320        .unwrap()
2321        .services
2322        .remove("test")
2323        .unwrap()
2324    }
2325
2326    fn mock_service_spec_tcp_only() -> ServiceSpec {
2327        mock_service_spec_tcp_only_port(9000)
2328    }
2329
2330    fn mock_service_spec_tcp_only_port(port: u16) -> ServiceSpec {
2331        use zlayer_spec::*;
2332        let yaml = format!(
2333            "
2334version: v1
2335deployment: test
2336services:
2337  test:
2338    rtype: service
2339    image:
2340      name: test:latest
2341    endpoints:
2342      - name: grpc
2343        protocol: tcp
2344        port: {port}
2345"
2346        );
2347        serde_yaml::from_str::<DeploymentSpec>(&yaml)
2348            .unwrap()
2349            .services
2350            .remove("test")
2351            .unwrap()
2352    }
2353
2354    /// Reserve an unused localhost TCP port by binding a listener on `:0`,
2355    /// reading the assigned port, and dropping the listener.
2356    ///
2357    /// There is an inherent race between dropping the listener and the test
2358    /// re-binding the port, but this is dramatically more reliable than
2359    /// hard-coding a port (e.g., 9000) which is commonly in use on dev
2360    /// machines (php-fpm, the running zlayer daemon, etc.).
2361    fn reserve_free_tcp_port() -> u16 {
2362        let listener =
2363            std::net::TcpListener::bind("127.0.0.1:0").expect("failed to bind ephemeral test port");
2364        listener.local_addr().unwrap().port()
2365    }
2366
2367    #[tokio::test]
2368    async fn test_proxy_manager_new() {
2369        let config = ProxyManagerConfig::default();
2370        let registry = Arc::new(ServiceRegistry::new());
2371        let manager = ProxyManager::new(config, registry, None);
2372
2373        assert_eq!(manager.route_count().await, 0);
2374        assert!(manager.list_services().await.is_empty());
2375    }
2376
2377    #[tokio::test]
2378    async fn test_add_service_with_http_endpoints() {
2379        let config = ProxyManagerConfig::default();
2380        let registry = Arc::new(ServiceRegistry::new());
2381        let manager = ProxyManager::new(config, registry, None);
2382
2383        let spec = mock_service_spec_with_endpoints();
2384        manager.add_service("api", &spec).await;
2385
2386        // Should have 2 routes (http and websocket)
2387        assert_eq!(manager.route_count().await, 2);
2388        assert!(manager.has_service("api").await);
2389    }
2390
2391    #[tokio::test]
2392    async fn test_tcp_endpoints_tracked_not_routed() {
2393        let config = ProxyManagerConfig::default();
2394        let registry = Arc::new(ServiceRegistry::new());
2395        let manager = ProxyManager::new(config, registry, None);
2396
2397        let spec = mock_service_spec_tcp_only();
2398        manager.add_service("grpc-service", &spec).await;
2399
2400        // TCP endpoints don't add HTTP routes
2401        assert_eq!(manager.route_count().await, 0);
2402        // But the service is still tracked with its endpoint name
2403        assert!(manager.has_service("grpc-service").await);
2404    }
2405
2406    #[tokio::test]
2407    async fn test_remove_service() {
2408        let config = ProxyManagerConfig::default();
2409        let registry = Arc::new(ServiceRegistry::new());
2410        let manager = ProxyManager::new(config, registry, None);
2411
2412        let spec = mock_service_spec_with_endpoints();
2413        manager.add_service("api", &spec).await;
2414        assert_eq!(manager.route_count().await, 2);
2415
2416        manager.remove_service("api").await;
2417        assert_eq!(manager.route_count().await, 0);
2418        assert!(!manager.has_service("api").await);
2419    }
2420
2421    #[tokio::test]
2422    async fn test_backend_management() {
2423        let config = ProxyManagerConfig::default();
2424        let registry = Arc::new(ServiceRegistry::new());
2425        let manager = ProxyManager::new(config, registry.clone(), None);
2426
2427        let spec = mock_service_spec_with_endpoints();
2428        manager.add_service("api", &spec).await;
2429
2430        // Add backends
2431        let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
2432        let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();
2433
2434        manager.add_backend("api", addr1).await;
2435        manager.add_backend("api", addr2).await;
2436
2437        // Verify backends via the registry's resolve
2438        let resolved = registry.resolve(None, "/api").await.unwrap();
2439        assert_eq!(resolved.backends.len(), 2);
2440
2441        // Remove a backend
2442        manager.remove_backend("api", addr1).await;
2443        let resolved = registry.resolve(None, "/api").await.unwrap();
2444        assert_eq!(resolved.backends.len(), 1);
2445    }
2446
2447    #[tokio::test]
2448    async fn test_update_backends_replaces_all() {
2449        let config = ProxyManagerConfig::default();
2450        let registry = Arc::new(ServiceRegistry::new());
2451        let manager = ProxyManager::new(config, registry.clone(), None);
2452
2453        let spec = mock_service_spec_with_endpoints();
2454        manager.add_service("api", &spec).await;
2455
2456        // Add initial backend
2457        let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
2458        manager.add_backend("api", addr1).await;
2459
2460        // Update with new backends (replaces)
2461        let new_backends: Vec<SocketAddr> = vec![
2462            "127.0.0.1:9000".parse().unwrap(),
2463            "127.0.0.1:9001".parse().unwrap(),
2464            "127.0.0.1:9002".parse().unwrap(),
2465        ];
2466        manager.update_backends("api", new_backends).await;
2467
2468        let resolved = registry.resolve(None, "/api").await.unwrap();
2469        assert_eq!(resolved.backends.len(), 3);
2470    }
2471
2472    #[tokio::test]
2473    async fn test_config_builder() {
2474        let config = ProxyManagerConfig::new("0.0.0.0:8080".parse().unwrap())
2475            .with_https("0.0.0.0:8443".parse().unwrap())
2476            .with_http2(false);
2477
2478        assert_eq!(
2479            config.http_addr,
2480            "0.0.0.0:8080".parse::<SocketAddr>().unwrap()
2481        );
2482        assert_eq!(
2483            config.https_addr,
2484            Some("0.0.0.0:8443".parse::<SocketAddr>().unwrap())
2485        );
2486        assert!(!config.http2_enabled);
2487    }
2488
2489    /// Test that `ensure_ports_for_service` correctly differentiates
2490    /// Public (0.0.0.0) vs Internal (overlay or 127.0.0.1) bind addresses.
2491    /// We can't actually bind in unit tests, but we verify the function
2492    /// processes both endpoint types without error.
2493    #[tokio::test]
2494    async fn test_ensure_ports_differentiates_public_and_internal() {
2495        let config = ProxyManagerConfig::default();
2496        let registry = Arc::new(ServiceRegistry::new());
2497        let manager = ProxyManager::new(config, registry, None);
2498
2499        let spec = mock_service_spec_with_endpoints();
2500        // Passing None for overlay_ip: internal endpoints should fall back to 127.0.0.1
2501        let result = manager.ensure_ports_for_service(&spec, None).await;
2502        // listen_on may fail because we can't actually bind in tests, but
2503        // the function itself should run without panicking.
2504        let _ = result;
2505    }
2506
2507    #[tokio::test]
2508    async fn test_ensure_ports_with_overlay_ip() {
2509        let config = ProxyManagerConfig::default();
2510        let registry = Arc::new(ServiceRegistry::new());
2511        let manager = ProxyManager::new(config, registry, None);
2512
2513        let spec = mock_service_spec_with_endpoints();
2514        // Pass an overlay IP -- internal endpoints should bind there
2515        let overlay_ip: IpAddr = "10.200.0.5".parse().unwrap();
2516        let result = manager
2517            .ensure_ports_for_service(&spec, Some(overlay_ip))
2518            .await;
2519        let _ = result;
2520    }
2521
2522    fn mock_mixed_service_spec() -> ServiceSpec {
2523        use zlayer_spec::*;
2524        serde_yaml::from_str::<DeploymentSpec>(
2525            r"
2526version: v1
2527deployment: test
2528services:
2529  mixed:
2530    rtype: service
2531    image:
2532      name: test:latest
2533    endpoints:
2534      - name: http
2535        protocol: http
2536        port: 8080
2537        path: /api
2538        expose: public
2539      - name: grpc
2540        protocol: tcp
2541        port: 9000
2542        expose: public
2543      - name: game
2544        protocol: udp
2545        port: 27015
2546        expose: public
2547",
2548        )
2549        .unwrap()
2550        .services
2551        .remove("mixed")
2552        .unwrap()
2553    }
2554
2555    #[tokio::test]
2556    async fn test_add_mixed_service_tracks_all_endpoints() {
2557        let config = ProxyManagerConfig::default();
2558        let registry = Arc::new(ServiceRegistry::new());
2559        let manager = ProxyManager::new(config, registry, None);
2560
2561        let spec = mock_mixed_service_spec();
2562        manager.add_service("mixed", &spec).await;
2563
2564        // Only 1 HTTP route (tcp and udp don't add HTTP routes)
2565        assert_eq!(manager.route_count().await, 1);
2566        // Service is tracked
2567        assert!(manager.has_service("mixed").await);
2568    }
2569
2570    #[tokio::test]
2571    async fn test_ensure_ports_tcp_with_stream_registry() {
2572        use zlayer_proxy::StreamService;
2573
2574        // `reserve_free_tcp_port` binds `127.0.0.1:0`, reads the assigned port,
2575        // then drops the listener — so there is a TOCTOU window where, under
2576        // parallel test threads, another process/test can grab that port before
2577        // `ensure_ports_for_service` rebinds it. When that happens the bind is
2578        // skipped and the port is never tracked. Retry with a fresh OS-assigned
2579        // port (collisions are rare) so this test is deterministic.
2580        let mut bound = false;
2581        for _ in 0..16 {
2582            let stream_registry = Arc::new(StreamRegistry::new());
2583            let config = ProxyManagerConfig::default();
2584            let registry = Arc::new(ServiceRegistry::new());
2585            let mut manager = ProxyManager::new(config, registry, None);
2586            manager.set_stream_registry(stream_registry.clone());
2587
2588            let port = reserve_free_tcp_port();
2589            let spec = mock_service_spec_tcp_only_port(port);
2590
2591            // Register the TCP service in the stream registry first (as ServiceManager does)
2592            stream_registry
2593                .register_tcp(port, StreamService::new("grpc-service".to_string(), vec![]));
2594
2595            // Ensure ports -- should bind TCP listener
2596            let result = manager.ensure_ports_for_service(&spec, None).await;
2597            assert!(result.is_ok());
2598
2599            // Verify the TCP listener port is tracked; retry on a lost-the-port race.
2600            if manager.tcp_listeners.read().await.contains(&port) {
2601                bound = true;
2602                break;
2603            }
2604        }
2605        assert!(
2606            bound,
2607            "ensure_ports_for_service never tracked an OS-assigned TCP port across 16 attempts"
2608        );
2609    }
2610
2611    #[tokio::test]
2612    async fn test_ensure_ports_tcp_without_stream_registry() {
2613        let config = ProxyManagerConfig::default();
2614        let registry = Arc::new(ServiceRegistry::new());
2615        let manager = ProxyManager::new(config, registry, None);
2616
2617        let spec = mock_service_spec_tcp_only();
2618
2619        // Without stream registry, ensure_ports should not fail, just warn
2620        let result = manager.ensure_ports_for_service(&spec, None).await;
2621        assert!(result.is_ok());
2622
2623        // No TCP listeners should be tracked
2624        let tcp_ports = manager.tcp_listeners.read().await;
2625        assert!(tcp_ports.is_empty());
2626    }
2627
2628    #[tokio::test]
2629    async fn test_stream_registry_setter() {
2630        let stream_registry = Arc::new(StreamRegistry::new());
2631        let config = ProxyManagerConfig::default();
2632        let registry = Arc::new(ServiceRegistry::new());
2633        let mut manager = ProxyManager::new(config, registry, None);
2634
2635        assert!(manager.stream_registry().is_none());
2636        manager.set_stream_registry(stream_registry.clone());
2637        assert!(manager.stream_registry().is_some());
2638    }
2639
2640    /// Single-member service spec with one INTERNAL TCP endpoint published on
2641    /// `port`. Internal (not Public) so the loopback path actually binds it.
2642    fn mock_internal_tcp_spec(port: u16) -> ServiceSpec {
2643        use zlayer_spec::*;
2644        let yaml = format!(
2645            "
2646version: v1
2647deployment: test
2648services:
2649  test:
2650    rtype: service
2651    image:
2652      name: test:latest
2653    scale:
2654      mode: fixed
2655      replicas: 1
2656    endpoints:
2657      - name: tcp
2658        protocol: tcp
2659        port: {port}
2660        expose: internal
2661"
2662        );
2663        serde_yaml::from_str::<DeploymentSpec>(&yaml)
2664            .unwrap()
2665            .services
2666            .remove("test")
2667            .unwrap()
2668    }
2669
2670    /// End-to-end loopback publish: spin up a real backend `TcpListener`,
2671    /// publish it on the node loopback, connect to `127.0.0.1:<publish_port>`
2672    /// and assert bytes round-trip through the forward; then unpublish and
2673    /// assert the port is freed (a fresh bind succeeds).
2674    #[tokio::test]
2675    async fn test_publish_loopback_round_trips_then_frees_port() {
2676        use tokio::io::{AsyncReadExt, AsyncWriteExt};
2677
2678        // Real backend that echoes a single line back with a known reply.
2679        let backend = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
2680        let backend_addr = backend.local_addr().unwrap();
2681        let backend_ip = backend_addr.ip();
2682        let backend_port = backend_addr.port();
2683        tokio::spawn(async move {
2684            if let Ok((mut sock, _)) = backend.accept().await {
2685                let mut buf = [0u8; 16];
2686                let n = sock.read(&mut buf).await.unwrap_or(0);
2687                // Echo back what we received, prefixed.
2688                let _ = sock.write_all(b"pong:").await;
2689                let _ = sock.write_all(&buf[..n]).await;
2690                let _ = sock.flush().await;
2691            }
2692        });
2693
2694        let config = ProxyManagerConfig::default();
2695        let registry = Arc::new(ServiceRegistry::new());
2696        let manager = ProxyManager::new(config, registry, None);
2697
2698        // Reserve a free publish port (the node-loopback address).
2699        let publish_port = reserve_free_tcp_port();
2700        let spec = mock_internal_tcp_spec(publish_port);
2701        assert!(
2702            spec.publish_to_node_loopback(),
2703            "single-member internal spec should publish to loopback"
2704        );
2705
2706        // The backend is the real listener; port_override forces the forward
2707        // target to the backend's actual ephemeral port (the macOS-style path).
2708        manager
2709            .publish_loopback_for_container(
2710                Some("dep-a"),
2711                "test",
2712                &spec,
2713                backend_ip,
2714                Some(backend_port),
2715            )
2716            .await
2717            .expect("publish should succeed on a free port");
2718
2719        // Connect to 127.0.0.1:<publish_port> and round-trip a payload.
2720        let mut client = tokio::net::TcpStream::connect((Ipv4Addr::LOCALHOST, publish_port))
2721            .await
2722            .expect("connect to published loopback port");
2723        client.write_all(b"ping").await.unwrap();
2724        client.flush().await.unwrap();
2725        let mut reply = Vec::new();
2726        client.read_to_end(&mut reply).await.unwrap();
2727        assert_eq!(&reply, b"pong:ping");
2728        drop(client);
2729
2730        // Unpublish; the last backend's removal frees the listener.
2731        manager
2732            .unpublish_loopback_for_container(&spec, backend_ip, Some(backend_port))
2733            .await;
2734
2735        // The aborted accept task drops the listener asynchronously; retry a
2736        // few times so the OS reclaims the port before we assert it is free.
2737        let mut bound = None;
2738        for _ in 0..50 {
2739            match std::net::TcpListener::bind((Ipv4Addr::LOCALHOST, publish_port)) {
2740                Ok(l) => {
2741                    bound = Some(l);
2742                    break;
2743                }
2744                Err(_) => tokio::time::sleep(Duration::from_millis(20)).await,
2745            }
2746        }
2747        assert!(
2748            bound.is_some(),
2749            "loopback port {publish_port} should be freed after unpublish"
2750        );
2751    }
2752
2753    #[tokio::test]
2754    async fn test_publish_loopback_skips_public_endpoints() {
2755        // Public endpoints are already on 0.0.0.0, so the loopback path must
2756        // NOT bind 127.0.0.1:<port> again. mock_mixed_service_spec exposes
2757        // everything as public.
2758        let config = ProxyManagerConfig::default();
2759        let registry = Arc::new(ServiceRegistry::new());
2760        let manager = ProxyManager::new(config, registry, None);
2761
2762        let spec = mock_mixed_service_spec();
2763        let backend_ip: IpAddr = "127.0.0.1".parse().unwrap();
2764        manager
2765            .publish_loopback_for_container(Some("dep-a"), "mixed", &spec, backend_ip, None)
2766            .await
2767            .expect("public-only spec publishes nothing and must not error");
2768
2769        // No loopback listeners should have been created for public endpoints.
2770        assert!(manager.loopback_tcp.read().await.is_empty());
2771        assert!(manager.loopback_udp.read().await.is_empty());
2772    }
2773
2774    #[tokio::test]
2775    async fn test_registry_accessor() {
2776        let config = ProxyManagerConfig::default();
2777        let registry = Arc::new(ServiceRegistry::new());
2778        let manager = ProxyManager::new(config, registry.clone(), None);
2779
2780        // registry() should return the same Arc
2781        assert_eq!(Arc::as_ptr(&manager.registry()), Arc::as_ptr(&registry));
2782    }
2783
2784    /// Bug 7: a host port published by deployment A must NOT be cross-wired
2785    /// into deployment B's backend pool. B's publish on the same port is
2786    /// REFUSED with `PortConflict`, and `:<port>` keeps resolving to A's
2787    /// backend only.
2788    #[tokio::test]
2789    async fn test_published_port_ownership_rejects_cross_deployment() {
2790        let config = ProxyManagerConfig::default();
2791        let registry = Arc::new(ServiceRegistry::new());
2792        let manager = ProxyManager::new(config, registry, None);
2793
2794        // Reserve a free publish port shared by both deployments.
2795        let publish_port = reserve_free_tcp_port();
2796        let spec = mock_internal_tcp_spec(publish_port);
2797
2798        // Distinct container backends for the two deployments.
2799        let backend_a: IpAddr = "10.0.0.1".parse().unwrap();
2800        let tgt_a = 5001u16;
2801        let backend_b: IpAddr = "10.0.0.2".parse().unwrap();
2802        let tgt_b = 5002u16;
2803
2804        // Deployment A claims the port -> succeeds.
2805        manager
2806            .publish_loopback_for_container(Some("dep-a"), "svc", &spec, backend_a, Some(tgt_a))
2807            .await
2808            .expect("deployment A should claim the free port");
2809
2810        // Deployment B publishing the SAME port -> REFUSED.
2811        let err = manager
2812            .publish_loopback_for_container(Some("dep-b"), "svc", &spec, backend_b, Some(tgt_b))
2813            .await
2814            .expect_err("deployment B must be refused on an owned port");
2815        match err {
2816            crate::error::AgentError::PortConflict { port, .. } => {
2817                assert_eq!(port, publish_port);
2818            }
2819            other => panic!("expected PortConflict, got {other:?}"),
2820        }
2821
2822        // `:<port>` must still serve ONLY deployment A's backend — B was never
2823        // appended into the foreign pool.
2824        let svc = manager
2825            .loopback_registry
2826            .resolve_tcp(publish_port)
2827            .expect("port should still be registered to deployment A");
2828        let expected_a = SocketAddr::new(backend_a, tgt_a);
2829        let foreign_b = SocketAddr::new(backend_b, tgt_b);
2830        assert_eq!(svc.backends, vec![expected_a]);
2831        assert!(
2832            !svc.backends.contains(&foreign_b),
2833            "deployment B's backend must NOT be cross-wired into the pool"
2834        );
2835    }
2836
2837    /// A second replica of the SAME (deployment, service) on an already-owned
2838    /// port is a legitimate scale-up: the replica backend IS appended.
2839    #[tokio::test]
2840    async fn test_published_port_same_owner_appends_replica() {
2841        let config = ProxyManagerConfig::default();
2842        let registry = Arc::new(ServiceRegistry::new());
2843        let manager = ProxyManager::new(config, registry, None);
2844
2845        let publish_port = reserve_free_tcp_port();
2846        let spec = mock_internal_tcp_spec(publish_port);
2847
2848        let replica1: IpAddr = "10.0.0.1".parse().unwrap();
2849        let replica2: IpAddr = "10.0.0.2".parse().unwrap();
2850        let target_port = 6000u16;
2851
2852        // First replica claims the port.
2853        manager
2854            .publish_loopback_for_container(
2855                Some("dep-a"),
2856                "svc",
2857                &spec,
2858                replica1,
2859                Some(target_port),
2860            )
2861            .await
2862            .expect("first replica claims the port");
2863
2864        // Second replica of the SAME (deployment, service) -> appended.
2865        manager
2866            .publish_loopback_for_container(
2867                Some("dep-a"),
2868                "svc",
2869                &spec,
2870                replica2,
2871                Some(target_port),
2872            )
2873            .await
2874            .expect("same-owner second replica should be accepted");
2875
2876        let svc = manager
2877            .loopback_registry
2878            .resolve_tcp(publish_port)
2879            .expect("port should be registered");
2880        let b1 = SocketAddr::new(replica1, target_port);
2881        let b2 = SocketAddr::new(replica2, target_port);
2882        assert_eq!(svc.backends.len(), 2, "both replicas should be in the pool");
2883        assert!(svc.backends.contains(&b1));
2884        assert!(svc.backends.contains(&b2));
2885    }
2886
2887    /// After the owning service unpublishes its last backend, the host-port
2888    /// ownership entry is released so a different (deployment, service) may
2889    /// claim it.
2890    #[tokio::test]
2891    async fn test_published_port_freed_on_unpublish() {
2892        let config = ProxyManagerConfig::default();
2893        let registry = Arc::new(ServiceRegistry::new());
2894        let manager = ProxyManager::new(config, registry, None);
2895
2896        let publish_port = reserve_free_tcp_port();
2897        let spec = mock_internal_tcp_spec(publish_port);
2898        let backend_a: IpAddr = "10.0.0.1".parse().unwrap();
2899        let target_port = 7000u16;
2900
2901        manager
2902            .publish_loopback_for_container(
2903                Some("dep-a"),
2904                "svc",
2905                &spec,
2906                backend_a,
2907                Some(target_port),
2908            )
2909            .await
2910            .expect("deployment A claims the port");
2911        assert!(manager
2912            .published_ports
2913            .read()
2914            .await
2915            .contains_key(&publish_port));
2916
2917        // Unpublish A's only backend -> ownership released.
2918        manager
2919            .unpublish_loopback_for_container(&spec, backend_a, Some(target_port))
2920            .await;
2921        assert!(
2922            !manager
2923                .published_ports
2924                .read()
2925                .await
2926                .contains_key(&publish_port),
2927            "ownership entry should be cleared once the last backend is gone"
2928        );
2929
2930        // A different deployment can now claim the freed port.
2931        let backend_b: IpAddr = "10.0.0.2".parse().unwrap();
2932        manager
2933            .publish_loopback_for_container(
2934                Some("dep-b"),
2935                "svc",
2936                &spec,
2937                backend_b,
2938                Some(target_port),
2939            )
2940            .await
2941            .expect("freed port should be claimable by another deployment");
2942    }
2943
2944    #[tokio::test]
2945    #[allow(clippy::similar_names)]
2946    async fn test_start_ingress_is_idempotent() {
2947        let config = ProxyManagerConfig::default();
2948        let registry = Arc::new(ServiceRegistry::new());
2949        let manager = ProxyManager::new(config, registry, None);
2950
2951        // Use free ephemeral ports so the test does not need root to bind
2952        // 80/443. No CertManager is configured, so only the HTTP listener
2953        // registers (HTTPS warns + returns early).
2954        let http_port = reserve_free_tcp_port();
2955        let https_port = reserve_free_tcp_port();
2956
2957        manager.start_ingress_on(http_port, https_port).await;
2958        // The HTTP ingress server should be registered on its port.
2959        assert!(
2960            manager.servers.read().await.contains_key(&http_port),
2961            "HTTP ingress should be registered"
2962        );
2963        assert!(
2964            manager.ingress_started.load(Ordering::SeqCst),
2965            "ingress_started flag should be set"
2966        );
2967        let count_after_first = manager.servers.read().await.len();
2968
2969        // Second call is a no-op: the idempotency guard short-circuits, so the
2970        // server map does not grow.
2971        manager.start_ingress_on(http_port, https_port).await;
2972        assert_eq!(
2973            manager.servers.read().await.len(),
2974            count_after_first,
2975            "second start_ingress call must not register additional servers"
2976        );
2977    }
2978
2979    /// Regression: the `:80` HTTP ingress server MUST carry the manager's
2980    /// `CertManager` so it can serve ACME HTTP-01 challenges
2981    /// (`/.well-known/acme-challenge/<token>`), which always arrive on `:80`.
2982    ///
2983    /// Without this wiring the challenge interception is skipped (its guard is
2984    /// `if let Some(cm) = self.cert_manager`) and the request falls through to
2985    /// the vhost match, which 403s — so certs can never be issued. The
2986    /// server-level `test_acme_challenge_served_not_denied` did NOT catch this
2987    /// because it builds the `ProxyServer` with a cert manager directly; the
2988    /// gap was in `proxy_manager` failing to wire it into the `:80` server.
2989    ///
2990    /// Network-free: builds the server via the extracted helper without binding
2991    /// any port.
2992    #[tokio::test]
2993    async fn test_http_ingress_carries_cert_manager_for_acme() {
2994        let tmp = tempfile::tempdir().unwrap();
2995        let cm = Arc::new(
2996            CertManager::new(tmp.path().to_string_lossy().into_owned(), None)
2997                .await
2998                .unwrap(),
2999        );
3000
3001        let config = ProxyManagerConfig::default();
3002        let registry = Arc::new(ServiceRegistry::new());
3003        let manager = ProxyManager::new(config, registry, Some(Arc::clone(&cm)));
3004
3005        let http_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 80);
3006        let http_server = manager.build_http_ingress_server(http_addr, None);
3007
3008        assert!(
3009            http_server.cert_manager().is_some(),
3010            "the :80 HTTP ingress server must carry the CertManager so ACME \
3011             HTTP-01 challenges (which always arrive on :80) are served, not 403'd"
3012        );
3013    }
3014
3015    /// The HTTP ingress server is still built when NO `CertManager` is
3016    /// configured — only the `with_cert_manager` call is gated on `Some`, never
3017    /// the whole `:80` ingress. (A non-ACME deployment still needs `:80`.)
3018    #[tokio::test]
3019    async fn test_http_ingress_built_without_cert_manager() {
3020        let config = ProxyManagerConfig::default();
3021        let registry = Arc::new(ServiceRegistry::new());
3022        let manager = ProxyManager::new(config, registry, None);
3023
3024        let http_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 80);
3025        let http_server = manager.build_http_ingress_server(http_addr, None);
3026
3027        assert!(
3028            http_server.cert_manager().is_none(),
3029            "without a configured CertManager the :80 server carries none"
3030        );
3031    }
3032
3033    #[tokio::test]
3034    async fn publish_port_mapping_binds_explicit_host_port() {
3035        let config = ProxyManagerConfig::default();
3036        let registry = Arc::new(ServiceRegistry::new());
3037        let manager = ProxyManager::new(config, registry, None);
3038
3039        // Grab a free explicit port deterministically: bind, read, drop.
3040        let port = {
3041            let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
3042            l.local_addr().unwrap().port()
3043        };
3044
3045        let mapping = PortMapping {
3046            host_port: Some(port),
3047            container_port: 8080,
3048            protocol: PortProtocol::Tcp,
3049            host_ip: "127.0.0.1".to_string(),
3050        };
3051        // Dummy backend; the connect below only exercises the accept side.
3052        let backend: SocketAddr = "127.0.0.1:1".parse().unwrap();
3053
3054        let bound = manager
3055            .publish_port_mapping(Some("dep"), "svc", &mapping, backend)
3056            .await
3057            .expect("publish should succeed");
3058        assert_eq!(bound, port, "explicit host port must be honored");
3059
3060        // A real host listener now exists: a TCP connect must be accepted.
3061        let conn = tokio::net::TcpStream::connect(("127.0.0.1", bound)).await;
3062        assert!(conn.is_ok(), "expected a live host listener on {bound}");
3063    }
3064
3065    #[tokio::test]
3066    async fn publish_port_mapping_rejects_cross_owner_explicit_port() {
3067        let config = ProxyManagerConfig::default();
3068        let registry = Arc::new(ServiceRegistry::new());
3069        let manager = ProxyManager::new(config, registry, None);
3070
3071        // Free explicit port: bind, read, drop.
3072        let port = {
3073            let l = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
3074            l.local_addr().unwrap().port()
3075        };
3076
3077        let mapping = PortMapping {
3078            host_port: Some(port),
3079            container_port: 8080,
3080            protocol: PortProtocol::Tcp,
3081            host_ip: "127.0.0.1".to_string(),
3082        };
3083        let backend_a: SocketAddr = "127.0.0.1:1".parse().unwrap();
3084        let backend_b: SocketAddr = "127.0.0.1:2".parse().unwrap();
3085
3086        // Owner A claims the port.
3087        manager
3088            .publish_port_mapping(Some("depA"), "ownerA", &mapping, backend_a)
3089            .await
3090            .expect("owner A publish should succeed");
3091
3092        // Owner B on the SAME explicit port must be refused.
3093        let err = manager
3094            .publish_port_mapping(Some("depB"), "ownerB", &mapping, backend_b)
3095            .await
3096            .expect_err("cross-owner explicit port must be refused");
3097        assert!(
3098            matches!(err, crate::error::AgentError::PortConflict { .. }),
3099            "expected PortConflict, got {err:?}"
3100        );
3101    }
3102
3103    #[tokio::test]
3104    async fn publish_port_mapping_ephemeral_returns_nonzero() {
3105        let config = ProxyManagerConfig::default();
3106        let registry = Arc::new(ServiceRegistry::new());
3107        let manager = ProxyManager::new(config, registry, None);
3108
3109        let mapping = PortMapping {
3110            host_port: None,
3111            container_port: 8080,
3112            protocol: PortProtocol::Tcp,
3113            host_ip: "127.0.0.1".to_string(),
3114        };
3115        let backend: SocketAddr = "127.0.0.1:1".parse().unwrap();
3116
3117        let bound = manager
3118            .publish_port_mapping(Some("dep"), "svc", &mapping, backend)
3119            .await
3120            .expect("ephemeral publish should succeed");
3121        assert_ne!(bound, 0, "ephemeral publish must resolve to a real port");
3122    }
3123
3124    // --- Activator + RPS provider ------------------------------------------
3125
3126    /// A `ScaleTrigger` that records its calls and registers a healthy backend
3127    /// on the shared load balancer, simulating the `ServiceManager` bringing a
3128    /// scaled-to-zero service up.
3129    struct FakeScaleTrigger {
3130        lb: Arc<LoadBalancer>,
3131        lb_key: String,
3132        calls: Arc<std::sync::Mutex<Vec<(String, u32)>>>,
3133    }
3134
3135    #[async_trait::async_trait]
3136    impl ScaleTrigger for FakeScaleTrigger {
3137        async fn scale_to(&self, service: &str, replicas: u32) -> std::result::Result<(), String> {
3138            self.calls
3139                .lock()
3140                .unwrap()
3141                .push((service.to_string(), replicas));
3142            // Bring the LB group up with a healthy backend.
3143            self.lb.register(
3144                &self.lb_key,
3145                vec!["127.0.0.1:9".parse().unwrap()],
3146                LbStrategy::RoundRobin,
3147            );
3148            Ok(())
3149        }
3150    }
3151
3152    #[test]
3153    fn service_name_from_key_strips_deployment_and_endpoint() {
3154        assert_eq!(
3155            ServiceActivator::service_name_from_key("dep/api#http"),
3156            "api"
3157        );
3158        assert_eq!(ServiceActivator::service_name_from_key("api#http"), "api");
3159        assert_eq!(ServiceActivator::service_name_from_key("dep/api"), "api");
3160        assert_eq!(ServiceActivator::service_name_from_key("api"), "api");
3161    }
3162
3163    #[tokio::test]
3164    async fn service_activator_scales_and_waits_for_backend() {
3165        let lb = Arc::new(LoadBalancer::new());
3166        let lb_key = "dep/api#http".to_string();
3167        // Idle group: present but no backend.
3168        lb.register(&lb_key, vec![], LbStrategy::RoundRobin);
3169
3170        let calls = Arc::new(std::sync::Mutex::new(Vec::new()));
3171        let trigger = Arc::new(FakeScaleTrigger {
3172            lb: Arc::clone(&lb),
3173            lb_key: lb_key.clone(),
3174            calls: Arc::clone(&calls),
3175        });
3176        let activator = ServiceActivator::new(trigger, Arc::clone(&lb));
3177
3178        activator
3179            .activate(&lb_key)
3180            .await
3181            .expect("activation should succeed once a backend is registered");
3182
3183        let recorded = calls.lock().unwrap().clone();
3184        assert_eq!(
3185            recorded,
3186            vec![("api".to_string(), DEFAULT_ACTIVATION_FLOOR)],
3187            "scale_to should be called with the bare service name and the floor"
3188        );
3189    }
3190
3191    #[tokio::test]
3192    async fn rps_registry_provider_reflects_recorded_requests() {
3193        let reg = Arc::new(RpsRegistry::new());
3194        let provider = RpsRegistryProvider::new(Arc::clone(&reg));
3195
3196        assert!((provider.rps("svc").await - 0.0).abs() < f64::EPSILON);
3197
3198        reg.record("svc").await;
3199        reg.record("svc").await;
3200        assert!(
3201            provider.rps("svc").await > 0.0,
3202            "provider must reflect recorded requests"
3203        );
3204    }
3205
3206    #[tokio::test]
3207    async fn rps_registry_accessor_is_shared() {
3208        let config = ProxyManagerConfig::default();
3209        let registry = Arc::new(ServiceRegistry::new());
3210        let manager = ProxyManager::new(config, registry, None);
3211
3212        let reg = manager.rps_registry();
3213        reg.record("svc").await;
3214        // The provider built from the manager observes the same underlying data.
3215        let provider = manager.rps_provider();
3216        assert!(provider.rps("svc").await > 0.0);
3217    }
3218
3219    #[tokio::test]
3220    async fn set_activator_is_installed() {
3221        let config = ProxyManagerConfig::default();
3222        let registry = Arc::new(ServiceRegistry::new());
3223        let manager = ProxyManager::new(config, registry, None);
3224
3225        assert!(manager.current_activator().await.is_none());
3226
3227        let lb = manager.load_balancer();
3228        let trigger = Arc::new(FakeScaleTrigger {
3229            lb: Arc::clone(&lb),
3230            lb_key: "dep/api#http".to_string(),
3231            calls: Arc::new(std::sync::Mutex::new(Vec::new())),
3232        });
3233        manager.install_service_activator(trigger).await;
3234
3235        assert!(
3236            manager.current_activator().await.is_some(),
3237            "activator should be installed after install_service_activator"
3238        );
3239    }
3240
3241    /// Build an `EndpointSpec` with the given protocol/expose/host for the
3242    /// managed-cert gating tests. All other fields are defaults.
3243    fn cert_test_endpoint(
3244        protocol: Protocol,
3245        expose: ExposeType,
3246        host: Option<&str>,
3247    ) -> zlayer_spec::EndpointSpec {
3248        zlayer_spec::EndpointSpec {
3249            name: "web".to_string(),
3250            protocol,
3251            port: 443,
3252            target_port: None,
3253            path: None,
3254            host: host.map(str::to_string),
3255            expose,
3256            stream: None,
3257            target_role: None,
3258            tunnel: None,
3259        }
3260    }
3261
3262    #[test]
3263    fn endpoint_wants_managed_cert_gates_correctly() {
3264        // Qualifies: public HTTPS with a concrete FQDN.
3265        assert_eq!(
3266            ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3267                Protocol::Https,
3268                ExposeType::Public,
3269                Some("console.zatabase.io"),
3270            )),
3271            Some("console.zatabase.io")
3272        );
3273
3274        // Host is trimmed.
3275        assert_eq!(
3276            ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3277                Protocol::Https,
3278                ExposeType::Public,
3279                Some("  console.zatabase.io  "),
3280            )),
3281            Some("console.zatabase.io")
3282        );
3283
3284        // Rejected: internal exposure (ACME HTTP-01 would fail).
3285        assert!(
3286            ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3287                Protocol::Https,
3288                ExposeType::Internal,
3289                Some("console.zatabase.io"),
3290            ))
3291            .is_none()
3292        );
3293
3294        // Rejected: not HTTPS.
3295        for proto in [
3296            Protocol::Http,
3297            Protocol::Websocket,
3298            Protocol::Tcp,
3299            Protocol::Udp,
3300        ] {
3301            assert!(
3302                ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3303                    proto,
3304                    ExposeType::Public,
3305                    Some("console.zatabase.io"),
3306                ))
3307                .is_none()
3308            );
3309        }
3310
3311        // Rejected: no host.
3312        assert!(
3313            ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3314                Protocol::Https,
3315                ExposeType::Public,
3316                None,
3317            ))
3318            .is_none()
3319        );
3320
3321        // Rejected: empty / whitespace host.
3322        assert!(
3323            ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3324                Protocol::Https,
3325                ExposeType::Public,
3326                Some("   "),
3327            ))
3328            .is_none()
3329        );
3330
3331        // Rejected: wildcard patterns (ACME HTTP-01 cannot satisfy these).
3332        for wildcard in ["*", "*.zatabase.io", "console.*.io"] {
3333            assert!(
3334                ProxyManager::endpoint_wants_managed_cert(&cert_test_endpoint(
3335                    Protocol::Https,
3336                    ExposeType::Public,
3337                    Some(wildcard),
3338                ))
3339                .is_none()
3340            );
3341        }
3342    }
3343
3344    #[tokio::test]
3345    async fn sni_resolver_accessor_is_shared_and_stable() {
3346        let config = ProxyManagerConfig::default();
3347        let registry = Arc::new(ServiceRegistry::new());
3348        let manager = ProxyManager::new(config, registry, None);
3349
3350        // Every call returns a clone of the SAME Arc the live listeners use, so
3351        // a cert hot-loaded into one handle is visible through any other.
3352        let a = manager.sni_resolver();
3353        let b = manager.sni_resolver();
3354        assert!(
3355            Arc::ptr_eq(&a, &b),
3356            "sni_resolver() must return clones of one shared resolver"
3357        );
3358        assert_eq!(a.cert_count(), 0);
3359    }
3360
3361    #[tokio::test]
3362    async fn add_service_without_cert_manager_does_not_panic_or_provision() {
3363        // No CertManager configured: the provisioning trigger must be a no-op
3364        // (and must never reach the network), even for a public HTTPS vhost.
3365        let config = ProxyManagerConfig::default();
3366        let registry = Arc::new(ServiceRegistry::new());
3367        let manager = ProxyManager::new(config, registry, None);
3368
3369        let spec = serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3370            r"
3371version: v1
3372deployment: test
3373services:
3374  web:
3375    rtype: service
3376    image:
3377      name: test:latest
3378    endpoints:
3379      - name: web
3380        protocol: https
3381        port: 443
3382        host: console.zatabase.io
3383        expose: public
3384",
3385        )
3386        .unwrap()
3387        .services
3388        .remove("web")
3389        .unwrap();
3390
3391        manager.add_service("web", &spec).await;
3392        // Route is still registered even though no cert manager exists.
3393        assert!(manager.has_service("web").await);
3394        // Shared resolver is untouched (no provisioning attempted).
3395        assert_eq!(manager.sni_resolver().cert_count(), 0);
3396    }
3397}