Skip to main content

zlayer_agent/
service.rs

1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6    DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::{IpAddr, SocketAddr};
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType, ServiceSpec};
24
25/// Service instance manages a single service's containers
26pub struct ServiceInstance {
27    pub service_name: String,
28    pub spec: ServiceSpec,
29    runtime: Arc<dyn Runtime + Send + Sync>,
30    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31    /// Overlay network manager for container networking (optional, not needed for Docker runtime)
32    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33    /// Proxy manager for updating backend health (optional)
34    proxy_manager: Option<Arc<ProxyManager>>,
35    /// DNS server for service discovery (optional)
36    dns_server: Option<Arc<DnsServer>>,
37    /// Container-injectable overlay resolver IP (optional). When set, this
38    /// node's overlay DNS server is reachable on `<ip>:53` and we inject it
39    /// into the container's resolv.conf so workloads resolve through the
40    /// overlay instead of inheriting the host's resolv.conf.
41    container_dns: Option<IpAddr>,
42    /// Shared health states map so callbacks can update ServiceManager-level health
43    health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
44    /// Most recently observed image digest after a successful pull. Used by
45    /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
46    /// requiring callers to track digest state externally. Wrapped in a
47    /// `RwLock` so `&self` methods (`scale_to`) can update it.
48    last_pulled_digest: tokio::sync::RwLock<Option<String>>,
49    /// Local cluster node id used when constructing new `ContainerId`s during
50    /// scale-up. `0` in single-node deployments or when the cluster handle is
51    /// not yet wired. Populated by `ServiceManager` from `Cluster::node_id()`
52    /// at instance construction time.
53    node_id: u64,
54    /// Owning deployment name (the `zlayer up` / deploy request's deployment),
55    /// when known. Threaded from `ServiceSpec.deployment` by `upsert_service`.
56    ///
57    /// Used to scope service-discovery DNS per-deployment: records are
58    /// registered as `{service}.{deployment}` / `{service}.service.{deployment}`
59    /// (within the daemon zone) and the container's resolv.conf gets a
60    /// per-deployment `search` domain so a bare `<svc>` / `<svc>.service` query
61    /// resolves to THIS deployment's instance and never clobbers another
62    /// deployment that happens to share a service name. `None` for standalone /
63    /// single-deployment callers (falls back to the daemon's global zone).
64    deployment: Option<String>,
65    /// Whether THIS node holds the standing HTTP/HTTPS ingress on
66    /// `0.0.0.0:80` / `0.0.0.0:443` (mirrors `NodeConfig.ingress`). When `true`
67    /// and the node has an overlay IP, external service domains
68    /// (`EndpointSpec.host`) are resolved to this node's own overlay IP;
69    /// otherwise the selected ingress peer's overlay IP is used. Defaults to
70    /// `false`. Threaded from `ServiceManager`.
71    ingress_enabled: bool,
72    /// Cluster handle used to select an ingress-capable peer's overlay IP when
73    /// THIS node is not itself the ingress. `None` in standalone / single-node
74    /// mode (the funnel is then THIS node when it is ingress-capable).
75    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
76}
77
78impl ServiceInstance {
79    /// Create a new service instance
80    pub fn new(
81        service_name: String,
82        spec: ServiceSpec,
83        runtime: Arc<dyn Runtime + Send + Sync>,
84        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
85    ) -> Self {
86        let deployment = spec.deployment.clone();
87        Self {
88            service_name,
89            spec,
90            runtime,
91            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
92            overlay_manager,
93            proxy_manager: None,
94            dns_server: None,
95            container_dns: None,
96            health_states: None,
97            last_pulled_digest: tokio::sync::RwLock::new(None),
98            node_id: 0,
99            deployment,
100            ingress_enabled: false,
101            cluster: None,
102        }
103    }
104
105    /// Create a new service instance with proxy manager for health-aware load balancing
106    pub fn with_proxy(
107        service_name: String,
108        spec: ServiceSpec,
109        runtime: Arc<dyn Runtime + Send + Sync>,
110        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
111        proxy_manager: Arc<ProxyManager>,
112    ) -> Self {
113        let deployment = spec.deployment.clone();
114        Self {
115            service_name,
116            spec,
117            runtime,
118            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
119            overlay_manager,
120            proxy_manager: Some(proxy_manager),
121            dns_server: None,
122            container_dns: None,
123            health_states: None,
124            last_pulled_digest: tokio::sync::RwLock::new(None),
125            node_id: 0,
126            deployment,
127            ingress_enabled: false,
128            cluster: None,
129        }
130    }
131
132    /// Set the local cluster node id. Used by `ServiceManager` to thread
133    /// `Cluster::node_id()` down to container construction so new
134    /// `ContainerId`s carry the owning node identity. Defaults to `0` (the
135    /// single-node sentinel) when unset.
136    pub fn set_node_id(&mut self, node_id: u64) {
137        self.node_id = node_id;
138    }
139
140    /// Set the owning deployment name for service-discovery DNS scoping.
141    ///
142    /// Idempotent with construction: the constructors already capture
143    /// `spec.deployment`, but `ServiceManager` calls this so a deployment
144    /// stamped after the fact (or via a different code path) is honored.
145    pub fn set_deployment(&mut self, deployment: Option<String>) {
146        self.deployment = deployment;
147    }
148
149    /// Set whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
150    /// `NodeConfig.ingress`). Threaded by `ServiceManager` so external service
151    /// domains can be resolved to this node's overlay IP when it is the funnel.
152    pub fn set_ingress_enabled(&mut self, enabled: bool) {
153        self.ingress_enabled = enabled;
154    }
155
156    /// Set the cluster handle used to select an ingress-capable peer's overlay
157    /// IP when THIS node is not itself the ingress.
158    pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
159        self.cluster = Some(cluster);
160    }
161
162    /// Resolve the overlay IP of the node that should serve external domains for
163    /// this service (the ingress funnel), as an `IpAddr`:
164    ///
165    /// 1. If THIS node is ingress-capable and has an overlay IP, use it.
166    /// 2. Otherwise ask the cluster for the deterministically-selected ingress
167    ///    peer's overlay IP.
168    /// 3. Otherwise (standalone, ingress-disabled local node, no peer) return
169    ///    `None` — the caller WARNs and skips registering the domain.
170    async fn resolve_ingress_ip(&self) -> Option<IpAddr> {
171        if self.ingress_enabled {
172            if let Some(om) = &self.overlay_manager {
173                if let Some(ip) = om.read().await.node_ip() {
174                    return Some(ip);
175                }
176            }
177        }
178        // Not the local funnel (or no local overlay IP): defer to the cluster's
179        // chosen ingress peer.
180        if let Some(cluster) = &self.cluster {
181            if let Some(ip_str) = cluster.select_ingress_overlay_ip().await {
182                match ip_str.parse::<IpAddr>() {
183                    Ok(ip) => return Some(ip),
184                    Err(e) => tracing::warn!(
185                        ingress_overlay_ip = %ip_str,
186                        error = %e,
187                        "selected ingress peer overlay IP is not a valid IP address; \
188                         skipping external-domain DNS registration"
189                    ),
190                }
191            }
192        }
193        None
194    }
195
196    /// Register an A/AAAA record for every endpoint's external vhost domain
197    /// (`EndpointSpec.host`) pointing at the ingress funnel's overlay IP, so a
198    /// client resolving `<host>` reaches an ingress-capable node whose 80/443
199    /// proxy fans out to this service's backends across the mesh.
200    ///
201    /// Wildcard host patterns (`*.example.com`) are routing matchers, not
202    /// resolvable names, so they are skipped. When no ingress node exists
203    /// anywhere in the mesh yet (no 80/443 entrypoint reachable), each host is
204    /// logged at WARN and skipped — never an error; it re-registers on the
205    /// next container attach once an entrypoint appears.
206    async fn register_external_domains(&self, dns: &Arc<DnsServer>) {
207        // Collect resolvable external domains for this service's endpoints.
208        let hosts: Vec<String> = self
209            .spec
210            .endpoints
211            .iter()
212            .filter_map(|ep| ep.host.as_ref())
213            .map(|h| h.trim().to_string())
214            .filter(|h| !h.is_empty() && !h.contains('*'))
215            .collect();
216        if hosts.is_empty() {
217            return;
218        }
219
220        let Some(ingress_ip) = self.resolve_ingress_ip().await else {
221            for host in &hosts {
222                tracing::warn!(
223                    service = %self.service_name,
224                    host = %host,
225                    "no 80/443 entrypoint reachable yet — open one to serve {host}; \
226                     skipping external-domain DNS registration (will retry on next attach)"
227                );
228            }
229            return;
230        };
231
232        for host in &hosts {
233            // `host` is already a fully-qualified external domain; pass it with
234            // a trailing dot so `DnsServer::add_record` treats it as an FQDN and
235            // does NOT append the daemon's internal zone origin.
236            let fqdn = if host.ends_with('.') {
237                host.clone()
238            } else {
239                format!("{host}.")
240            };
241            match dns.add_record(&fqdn, ingress_ip).await {
242                Ok(()) => tracing::info!(
243                    service = %self.service_name,
244                    host = %host,
245                    ingress_ip = %ingress_ip,
246                    "registered external-domain DNS record (host -> ingress overlay IP)"
247                ),
248                Err(e) => tracing::warn!(
249                    service = %self.service_name,
250                    host = %host,
251                    error = %e,
252                    "failed to register external-domain DNS record"
253                ),
254            }
255        }
256    }
257
258    /// The per-deployment resolv.conf `search` domain list for containers in
259    /// this service's deployment, given the daemon's global DNS `zone` (e.g.
260    /// `zlayer.local`).
261    ///
262    /// Returns a space-separated `search` value placing the deployment scope
263    /// FIRST so a guest's bare `<svc>` / `<svc>.service` query expands to THIS
264    /// deployment's record before anything else, with the bare zone last so
265    /// cross-deployment by-FQDN names (`<svc>.<otherdeployment>`) still resolve:
266    ///
267    /// ```text
268    /// search <deployment>.<zone> <zone>
269    /// ```
270    ///
271    /// When this instance has no deployment (standalone / single-deployment),
272    /// returns `None` so callers fall back to the daemon's global zone domain.
273    #[must_use]
274    pub fn dns_search_domain(&self, zone: &str) -> Option<String> {
275        let zone = zone.trim_end_matches('.');
276        self.deployment.as_deref().map(|d| {
277            // `<deployment>.<zone>` first (deployment scope wins), `<zone>` last
278            // (cross-deployment FQDN + global names still resolve).
279            format!("{d}.{zone} {zone}")
280        })
281    }
282
283    /// The set of service-discovery hostnames to register for one container,
284    /// relative to the daemon's DNS zone (each gets `<zone>` appended by
285    /// [`DnsServer::add_record`]).
286    ///
287    /// Two families are emitted:
288    ///
289    /// 1. **Deployment-scoped** (only when this instance carries a
290    ///    `deployment`): `<svc>.<D>`, `<svc>.service.<D>`,
291    ///    `<replica>.<svc>.service.<D>`, the documented example FQDN form
292    ///    `<svc>.<D>.service`, and (for non-default replica groups)
293    ///    `<role>.<svc>.service.<D>`. Paired with the per-deployment
294    ///    `search <D>.<zone> <zone>` resolv.conf domain (see
295    ///    [`Self::dns_search_domain`]), a guest's bare `<svc>` expands to
296    ///    `<svc>.<D>.<zone>` and `<svc>.service` to `<svc>.service.<D>.<zone>`,
297    ///    so both resolve to THIS deployment's instance and NEVER clobber a
298    ///    different deployment that happens to share a service name.
299    ///
300    /// 2. **Unscoped / legacy** (always): the bare `<svc>` name plus
301    ///    `<svc>.service.local`, `<replica>.<svc>.service.local`, and the role
302    ///    form. These preserve the historical compose-style discovery used by
303    ///    native containers (no per-deployment search domain) and existing
304    ///    deployments. NOTE: the bare `<svc>` is the last-writer-wins,
305    ///    cross-deployment-ambiguous key — it is kept for back-compat but the
306    ///    deployment-scoped names above are what make discovery correct.
307    #[must_use]
308    fn dns_hostnames(&self, id: &ContainerId) -> Vec<String> {
309        let svc = &self.service_name;
310        let mut names: Vec<String> = Vec::new();
311
312        // --- Deployment-scoped family (correct, no cross-deployment leak) ---
313        if let Some(d) = self.deployment.as_deref() {
314            // bare `<svc>` -> `<svc>.<D>` (resolves via `search <D>.<zone>`)
315            names.push(format!("{svc}.{d}"));
316            // `<svc>.service` -> `<svc>.service.<D>`
317            names.push(format!("{svc}.service.{d}"));
318            // documented example FQDN form `<svc>.<D>.service`
319            names.push(format!("{svc}.{d}.service"));
320            // replica-specific `<replica>.<svc>.service` -> `.<D>`
321            names.push(format!("{}.{svc}.service.{d}", id.replica));
322            // per-role group form for non-default replica groups
323            if id.role != "default" {
324                names.push(format!("{}.{svc}.service.{d}", id.role));
325            }
326        }
327
328        // --- Unscoped / legacy family (compose back-compat) ---
329        // Bare compose service name (e.g. `postgres`); multiple replicas upsert
330        // the same key and the in-memory authority keeps the most recent A.
331        names.push(svc.clone());
332        names.push(format!("{svc}.service.local"));
333        names.push(format!("{}.{svc}.service.local", id.replica));
334        if id.role != "default" {
335            names.push(format!("{}.{svc}.service.local", id.role));
336        }
337
338        names
339    }
340
341    /// Derive the replica group role for a 1-based `replica_idx`.
342    ///
343    /// When `spec.replica_groups` is unset, returns `"default"` (the implicit
344    /// single-group case). Otherwise walks groups in declaration order,
345    /// accumulating each group's `count` until `replica_idx` falls within the
346    /// current group's range, and returns that group's `role`.
347    ///
348    /// Replicas beyond the declared total fall back to `"default"`.
349    #[must_use]
350    pub fn role_for_replica(&self, replica_idx: u32) -> String {
351        let Some(groups) = self.spec.replica_groups.as_ref() else {
352            return "default".to_string();
353        };
354        let mut cumulative = 0u32;
355        for group in groups {
356            cumulative = cumulative.saturating_add(group.count);
357            if replica_idx <= cumulative {
358                return group.role.clone();
359            }
360        }
361        "default".to_string()
362    }
363
364    /// Builder method to add DNS server for service discovery
365    #[must_use]
366    pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
367        self.dns_server = Some(dns_server);
368        self
369    }
370
371    /// Set the DNS server for service discovery
372    pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
373        self.dns_server = Some(dns_server);
374    }
375
376    /// Set the container-injectable overlay resolver IP and apply it to the
377    /// instance's spec.
378    ///
379    /// When `container_dns` is set and the spec is eligible (not host-network,
380    /// no user-supplied `dns`), this pre-populates `spec.dns` with the overlay
381    /// resolver so containers resolve through `<ip>:53` instead of inheriting
382    /// the host's `/etc/resolv.conf`.
383    ///
384    /// Why this exists: on overlay-enabled hosts the netbird `~.`
385    /// systemd-resolved hijack swallows the host resolver, so a container that
386    /// inherits the host resolv.conf cannot resolve anything. The overlay DNS
387    /// server forwards non-overlay queries upstream, so pointing the container
388    /// at it fixes resolution AND gives it service-name discovery.
389    ///
390    /// Port-53 constraint: `resolv.conf` `nameserver` lines (and Docker's
391    /// `--dns`) carry no port — they are always port 53. The injected IP is
392    /// therefore only useful because the daemon binds the overlay resolver on
393    /// `<ip>:53` (see `daemon.rs` Phase 4); the injected value is the bare IP,
394    /// not a `host:port`.
395    ///
396    /// User-supplied `spec.dns` is left untouched: an explicit resolver from
397    /// the deployment spec always wins.
398    pub fn set_container_dns(&mut self, container_dns: IpAddr) {
399        self.container_dns = Some(container_dns);
400        if !self.spec.host_network && self.spec.dns.is_empty() {
401            self.spec.dns = vec![container_dns.to_string()];
402        }
403    }
404
405    /// Set the proxy manager for health-aware load balancing
406    pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
407        self.proxy_manager = Some(proxy_manager);
408    }
409
410    /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
411    pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
412        self.health_states = Some(states);
413    }
414
415    /// Get the last observed image digest (after the most recent successful
416    /// pull). Returns `None` when no pull has happened yet, when the runtime
417    /// does not expose digests, or when no matching `ImageInfo` was found.
418    pub async fn last_pulled_digest(&self) -> Option<String> {
419        self.last_pulled_digest.read().await.clone()
420    }
421
422    /// Pull the service image using the spec's pull policy (literal Docker /
423    /// Kubernetes semantics — no silent auto-upgrade of `IfNotPresent` to
424    /// `Newer` for `:latest` tags) and refresh the cached digest from
425    /// `Runtime::list_images` when the runtime exposes it. Returns the digest
426    /// observed after the pull, when known.
427    ///
428    /// For `Never`, the runtime is still called so it can load the image
429    /// config from the local cache (without any remote round-trip); only the
430    /// remote digest refresh is skipped. Without this call the bundle builder
431    /// has no image entrypoint/cmd and falls back to `/bin/sh`.
432    async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
433        let image_str = self.spec.image.name.to_string();
434        let policy = self.spec.image.pull_policy;
435
436        self.runtime
437            .pull_image_with_policy(
438                &image_str,
439                policy,
440                None,
441                self.spec.image.source_policy.unwrap_or_default(),
442            )
443            .await
444            .map_err(|e| AgentError::PullFailed {
445                image: self.spec.image.name.to_string(),
446                reason: e.to_string(),
447            })?;
448
449        // Best-effort: try to discover the resolved digest via list_images.
450        // Runtimes that don't support introspection (Unsupported) leave the
451        // cached digest unchanged; drift detection then falls back to "always
452        // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
453        // when no digests are known".
454        let new_digest = match self.runtime.list_images().await {
455            Ok(images) => images
456                .into_iter()
457                .find(|info| info.reference == image_str)
458                .and_then(|info| info.digest),
459            Err(e) => {
460                tracing::debug!(
461                    image = %image_str,
462                    error = %e,
463                    "list_images unavailable; cannot record post-pull digest"
464                );
465                None
466            }
467        };
468
469        if let Some(ref digest) = new_digest {
470            *self.last_pulled_digest.write().await = Some(digest.clone());
471        }
472
473        Ok(new_digest)
474    }
475
476    /// Scale to the desired number of replicas
477    ///
478    /// This method uses short-lived locks to avoid blocking concurrent operations.
479    /// I/O operations (pull, create, start, stop, remove) are performed without
480    /// holding the containers lock to allow other operations to proceed.
481    ///
482    /// # Errors
483    /// Returns an error if image pull, container creation, or container lifecycle operations fail.
484    #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
485    pub async fn scale_to(&self, replicas: u32) -> Result<()> {
486        // Phase 1: Determine current state (short read lock)
487        let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
488
489        // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
490        // here with replicas == current_replicas in the steady state) actually
491        // refreshes the cached digest. We skip the call only when scaling
492        // strictly down (no new containers needed). For `Never` the runtime
493        // still needs to load the image config from the local cache so the
494        // bundle builder gets entrypoint/cmd/env — without it the container
495        // falls back to `/bin/sh` and exits instantly. `pull_and_refresh_digest`
496        // itself handles the Never case (no remote round-trip, cache-only).
497        if replicas >= current_replicas {
498            let _ = self.pull_and_refresh_digest().await?;
499        }
500
501        // Phase 2: Scale up - create new containers (no lock held during I/O)
502        //
503        // Compute (role, replica_index) tuples for each new replica. When
504        // `spec.replica_groups` is set, expand groups in declaration order so
505        // each created replica maps to its declared `(role, intra_group_index)`.
506        // Otherwise fall back to the implicit single "default" group. The
507        // `local_node_id` is captured once so every new `ContainerId` carries
508        // the owning node identity for cross-node disambiguation.
509        let local_node_id = self.node_id;
510        if replicas > current_replicas {
511            let replica_specs: Vec<(String, u32)> =
512                if let Some(groups) = self.spec.replica_groups.as_ref() {
513                    let mut specs: Vec<(String, u32)> = Vec::new();
514                    for group in groups {
515                        for idx in 0..group.count {
516                            specs.push((group.role.clone(), idx + 1));
517                        }
518                    }
519                    specs
520                        .into_iter()
521                        .skip(current_replicas as usize)
522                        .take((replicas - current_replicas) as usize)
523                        .collect()
524                } else {
525                    (current_replicas..replicas)
526                        .map(|i| ("default".to_string(), i + 1))
527                        .collect()
528                };
529
530            for (role, replica_idx) in replica_specs {
531                let id = ContainerId::with_role_and_node(
532                    self.service_name.clone(),
533                    replica_idx,
534                    role,
535                    local_node_id,
536                );
537
538                // Create container (no lock needed - I/O operation)
539                //
540                // RouteToPeer must propagate unchanged: the scheduler uses it
541                // to re-place the workload on a capable peer, and wrapping it
542                // in `CreateFailed` would hide the signal and mark the service
543                // dead instead of rescheduling it. All other errors are
544                // normalised to `CreateFailed` for upstream handling.
545                self.runtime
546                    .create_container(&id, &self.spec)
547                    .await
548                    .map_err(|e| match e {
549                        AgentError::RouteToPeer { .. } => e,
550                        other => AgentError::CreateFailed {
551                            id: id.to_string(),
552                            reason: other.to_string(),
553                        },
554                    })?;
555
556                // Run init actions with error policy enforcement (no lock needed)
557                let init_orchestrator = InitOrchestrator::with_error_policy(
558                    id.clone(),
559                    self.spec.init.clone(),
560                    self.spec.errors.clone(),
561                );
562                init_orchestrator.run().await?;
563
564                // Start container (no lock needed - I/O operation)
565                self.runtime
566                    .start_container(&id)
567                    .await
568                    .map_err(|e| AgentError::StartFailed {
569                        id: id.to_string(),
570                        reason: e.to_string(),
571                    })?;
572
573                // Get container PID with retries (may not be immediately available)
574                let mut container_pid = None;
575                for attempt in 1..=5u32 {
576                    match self.runtime.get_container_pid(&id).await {
577                        Ok(Some(pid)) => {
578                            container_pid = Some(pid);
579                            break;
580                        }
581                        Ok(None) if attempt < 5 => {
582                            tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
583                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
584                        }
585                        Ok(None) => {
586                            tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
587                        }
588                        Err(e) => {
589                            tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
590                            if attempt < 5 {
591                                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
592                            }
593                        }
594                    }
595                }
596
597                // Verify the container is still running before attempting
598                // overlay attach. If the init process crashed during start
599                // (bad image, missing libs, failed mount), the PID above is
600                // now dead and every `ip link set ... netns {pid}` will
601                // return a cryptic RTNETLINK error. Surface the real cause
602                // from the container's log tail instead of the cascade.
603                if container_pid.is_some() {
604                    let alive = match self.runtime.container_state(&id).await {
605                        Ok(
606                            ContainerState::Running
607                            | ContainerState::Pending
608                            | ContainerState::Initializing,
609                        ) => true,
610                        Ok(state) => {
611                            tracing::warn!(
612                                container = %id,
613                                ?state,
614                                "container exited before overlay attach could run"
615                            );
616                            false
617                        }
618                        Err(e) => {
619                            // State query failed — don't block the attach on
620                            // it. The overlay manager's own cleanup-on-error
621                            // path now handles the dead-PID case cleanly.
622                            tracing::warn!(
623                                container = %id,
624                                error = %e,
625                                "container state query failed before overlay attach, proceeding"
626                            );
627                            true
628                        }
629                    };
630                    if !alive {
631                        let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
632                            || "  <log read failed>".to_string(),
633                            |entries| {
634                                if entries.is_empty() {
635                                    "  <no log output>".to_string()
636                                } else {
637                                    entries
638                                        .into_iter()
639                                        .map(|e| format!("  {}", e.message))
640                                        .collect::<Vec<_>>()
641                                        .join("\n")
642                                }
643                            },
644                        );
645                        return Err(AgentError::StartFailed {
646                            id: id.to_string(),
647                            reason: format!("container exited during startup:\n{log_tail}"),
648                        });
649                    }
650                }
651
652                // Attach to overlay network if manager is available.
653                //
654                // Linux uses the container PID to enter the netns and attach a
655                // veth. Windows has no PID-addressable netns — the HCN namespace
656                // GUID (obtained from `get_container_namespace_id`) is used
657                // instead, and the endpoint's IP has already been populated by
658                // `EndpointAttachment::create_overlay` during container creation.
659                // We simply register that IP with the slice allocator so host
660                // accounting stays in sync.
661                let overlay_ip = if let Some(overlay) = &self.overlay_manager {
662                    let overlay_guard = overlay.read().await;
663                    #[cfg(target_os = "windows")]
664                    let attach_result: Option<std::net::IpAddr> = {
665                        // On Windows the overlay attach (HCN endpoint + per-container
666                        // namespace creation, via overlayd) already happened inside
667                        // `HcsRuntime::create_container`. Here we only need the IP it
668                        // assigned so we can register DNS for service discovery.
669                        let _ = (container_pid, &overlay_guard); // unused on Windows
670                        match self.runtime.get_container_ip(&id).await {
671                            Ok(Some(ip)) => Some(ip),
672                            Ok(None) => {
673                                tracing::debug!(
674                                    container = %id,
675                                    "no overlay IP recorded for container (overlay attach skipped at create time)"
676                                );
677                                None
678                            }
679                            Err(e) => {
680                                tracing::warn!(
681                                    container = %id,
682                                    error = %e,
683                                    "failed to fetch container overlay IP"
684                                );
685                                None
686                            }
687                        }
688                    };
689                    #[cfg(not(target_os = "windows"))]
690                    let attach_result: Option<std::net::IpAddr> = {
691                        match self.runtime.overlay_attach_kind() {
692                            // VM guest (macOS VZ-Linux): no host netns/PID, so
693                            // overlayd allocates the overlay identity and we push
694                            // it into the guest over vsock, where it brings up its
695                            // own kernel WireGuard device.
696                            crate::runtime::OverlayAttachKind::InGuestVsock => {
697                                let cid = id.to_string();
698                                // Per-deployment resolv.conf search domain so the
699                                // guest's bare `<svc>`/`<svc>.service` resolves to
700                                // THIS deployment (no cross-deployment clobber).
701                                let dns_override = overlay_guard
702                                    .dns_domain()
703                                    .and_then(|zone| self.dns_search_domain(zone));
704                                match overlay_guard
705                                    .attach_container_guest(
706                                        &cid,
707                                        &self.service_name,
708                                        true,
709                                        dns_override,
710                                    )
711                                    .await
712                                {
713                                    Ok(cfg) => {
714                                        let ip = cfg.overlay_ip;
715                                        match self.runtime.push_overlay_config(&id, &cfg).await {
716                                            Ok(()) => Some(ip),
717                                            Err(e) => {
718                                                tracing::warn!(
719                                                    container = %id,
720                                                    error = %e,
721                                                    "failed to push overlay config into guest; rolling back allocation"
722                                                );
723                                                // Don't leak the overlayd IP/peer.
724                                                if let Err(de) =
725                                                    overlay_guard.detach_container_guest(&cid).await
726                                                {
727                                                    tracing::warn!(
728                                                        container = %id,
729                                                        error = %de,
730                                                        "failed to roll back guest overlay allocation"
731                                                    );
732                                                }
733                                                None
734                                            }
735                                        }
736                                    }
737                                    Err(e) => {
738                                        tracing::warn!(
739                                            container = %id,
740                                            error = %e,
741                                            "failed to allocate guest overlay config from overlayd"
742                                        );
743                                        None
744                                    }
745                                }
746                            }
747                            // Host-process runtimes (Linux youki): plumb a veth
748                            // into the container's netns by PID.
749                            _ => {
750                                if let Some(pid) = container_pid {
751                                    // Per-deployment resolv.conf search domain so the
752                                    // container's bare `<svc>`/`<svc>.service` resolves
753                                    // to THIS deployment (no cross-deployment clobber).
754                                    let dns_override = overlay_guard
755                                        .dns_domain()
756                                        .and_then(|zone| self.dns_search_domain(zone));
757                                    match overlay_guard
758                                        .attach_container(
759                                            pid,
760                                            &self.service_name,
761                                            true,
762                                            dns_override,
763                                        )
764                                        .await
765                                    {
766                                        Ok(ip) => Some(ip),
767                                        Err(e) => {
768                                            tracing::warn!(
769                                                container = %id,
770                                                error = %e,
771                                                "failed to attach container to overlay network"
772                                            );
773                                            None
774                                        }
775                                    }
776                                } else {
777                                    // No PID available (e.g. WASM runtime) - skip overlay attachment
778                                    tracing::debug!(
779                                        container = %id,
780                                        "skipping overlay attachment - no PID available"
781                                    );
782                                    None
783                                }
784                            }
785                        }
786                    };
787
788                    if let Some(ip) = attach_result {
789                        tracing::info!(
790                            container = %id,
791                            overlay_ip = %ip,
792                            "attached container to overlay network"
793                        );
794
795                        // Register DNS for service discovery.
796                        if let Some(dns) = &self.dns_server {
797                            for hostname in self.dns_hostnames(&id) {
798                                match dns.add_record(&hostname, ip).await {
799                                    Ok(()) => tracing::debug!(
800                                        hostname = %hostname,
801                                        ip = %ip,
802                                        "registered service-discovery DNS record"
803                                    ),
804                                    Err(e) => tracing::warn!(
805                                        hostname = %hostname,
806                                        error = %e,
807                                        "failed to register service-discovery DNS record"
808                                    ),
809                                }
810                            }
811
812                            // Register external service domains (vhosts) so a
813                            // client resolving `<host>` lands on an
814                            // ingress-capable node, whose 80/443 proxy fans out
815                            // to this service's overlay-IP backends anywhere in
816                            // the mesh. This is ADDITIONAL to the
817                            // deployment-scoped service-discovery records above.
818                            self.register_external_domains(dns).await;
819                        }
820
821                        Some(ip)
822                    } else {
823                        None
824                    }
825                } else {
826                    None
827                };
828
829                // If overlay failed, try the container runtime's own IP as fallback
830                let effective_ip = if overlay_ip.is_none() {
831                    match self.runtime.get_container_ip(&id).await {
832                        Ok(Some(ip)) => {
833                            tracing::info!(
834                                container = %id,
835                                ip = %ip,
836                                "using runtime container IP for proxy (overlay unavailable)"
837                            );
838                            Some(ip)
839                        }
840                        Ok(None) => {
841                            tracing::warn!(
842                                container = %id,
843                                "no container IP available from runtime, proxy routing will be unavailable"
844                            );
845                            None
846                        }
847                        Err(e) => {
848                            tracing::warn!(
849                                container = %id,
850                                error = %e,
851                                "failed to get container IP from runtime"
852                            );
853                            None
854                        }
855                    }
856                } else {
857                    overlay_ip
858                };
859
860                tracing::info!(
861                    container = %id,
862                    service = %self.service_name,
863                    overlay_ip = ?overlay_ip,
864                    effective_ip = ?effective_ip,
865                    "Container IP resolution complete"
866                );
867
868                // Query port override from the runtime.
869                // On macOS sandbox, each container is assigned a unique port since
870                // all processes share the host network (no network namespaces).
871                // The runtime passes the port to the process via the PORT env var.
872                let port_override = match self.runtime.get_container_port_override(&id).await {
873                    Ok(Some(port)) => {
874                        tracing::info!(
875                            container = %id,
876                            port = port,
877                            "runtime assigned dynamic port override for this container"
878                        );
879                        Some(port)
880                    }
881                    Ok(None) => None,
882                    Err(e) => {
883                        tracing::warn!(
884                            container = %id,
885                            error = %e,
886                            "failed to query port override from runtime, using spec port"
887                        );
888                        None
889                    }
890                };
891
892                // Start health monitoring and store handle (no lock needed during start)
893                let health_monitor_handle = {
894                    let mut check = self.spec.health.check.clone();
895
896                    // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
897                    // port the container is listening on. With mac-sandbox, each
898                    // replica gets a unique assigned port via port_override.
899                    if let HealthCheck::Tcp { ref mut port } = check {
900                        if *port == 0 {
901                            *port = port_override.unwrap_or_else(|| {
902                                self.spec
903                                    .endpoints
904                                    .iter()
905                                    .find(|ep| {
906                                        matches!(
907                                            ep.protocol,
908                                            Protocol::Http | Protocol::Https | Protocol::Websocket
909                                        )
910                                    })
911                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
912                            });
913                        }
914                    }
915
916                    let start_grace = self
917                        .spec
918                        .health
919                        .start_grace
920                        .unwrap_or(Duration::from_secs(5));
921                    let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
922                    let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
923                    let retries = self.spec.health.retries;
924
925                    let checker = HealthChecker::new(check, effective_ip);
926                    let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
927                        .with_start_grace(start_grace)
928                        .with_check_timeout(check_timeout);
929
930                    // Build the optional proxy backend handle. This is only present
931                    // when both a proxy manager AND a reachable overlay IP exist; in
932                    // degraded-overlay / no-proxy deployments it stays None and the
933                    // callback below skips all proxy work while STILL bridging health
934                    // state back into ServiceManager.
935                    let proxy_backend: Option<(Arc<ProxyManager>, SocketAddr)> =
936                        if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
937                            let proxy = Arc::clone(proxy);
938                            // Get the container's target port, using the runtime override if
939                            // present. On macOS sandbox, port_override gives each replica a
940                            // unique port so the proxy can distinguish backends sharing
941                            // 127.0.0.1.
942                            let port = port_override.unwrap_or_else(|| {
943                                self.spec
944                                    .endpoints
945                                    .iter()
946                                    .find(|ep| {
947                                        matches!(
948                                            ep.protocol,
949                                            Protocol::Http | Protocol::Https | Protocol::Websocket
950                                        )
951                                    })
952                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
953                            });
954
955                            let backend_addr = SocketAddr::new(ip, port);
956
957                            // Register backend with load balancer so proxy can route to it.
958                            // This must happen before the health callback is created, because
959                            // update_backend_health only updates *existing* backends.
960                            proxy.add_backend(&self.service_name, backend_addr).await;
961
962                            // Publish this container's exposed ports on the node
963                            // loopback (`127.0.0.1:<endpoint.port>`) so a consumer
964                            // sharing the node loopback can reach the service at
965                            // `localhost:<port>`. Gated on the spec's policy
966                            // (`Auto` publishes only for single-member services).
967                            // Uses the SAME runtime-resolved `ip`/`port_override`
968                            // as the backend above: on macOS each replica shares
969                            // 127.0.0.1 with a unique override; on Linux/VM the
970                            // overlay IP carries the declared target port.
971                            if self.spec.publish_to_node_loopback() {
972                                if let Err(e) = proxy
973                                    .publish_loopback_for_container(
974                                        self.deployment.as_deref(),
975                                        &self.service_name,
976                                        &self.spec,
977                                        ip,
978                                        port_override,
979                                    )
980                                    .await
981                                {
982                                    // A host-port conflict means a DIFFERENT
983                                    // deployment/service already owns this
984                                    // published port; refuse to cross-wire
985                                    // (Bug 7) and surface it at deploy time.
986                                    tracing::error!(
987                                        service = %self.service_name,
988                                        error = %e,
989                                        "Failed to publish container ports on node loopback"
990                                    );
991                                    return Err(e);
992                                }
993                            }
994
995                            Some((proxy, backend_addr))
996                        } else {
997                            None
998                        };
999
1000                    // The health bridge is ALWAYS attached, independent of proxy/IP
1001                    // availability. stabilization::wait_for_stabilization only treats a
1002                    // service as ready when health_states[name] == Healthy, so this write
1003                    // must happen even when the overlay is degraded and no proxy backend
1004                    // exists — otherwise the service stays healthy=false forever and
1005                    // stabilization times out.
1006                    let health_states_opt = self.health_states.clone();
1007                    let svc_name_for_states = self.service_name.clone();
1008                    let svc_name_for_proxy = self.service_name.clone();
1009                    let svc_name_for_log = self.service_name.clone();
1010
1011                    let health_callback: HealthCallback =
1012                        Arc::new(move |container_id: ContainerId, is_healthy: bool| {
1013                            tracing::info!(
1014                                container = %container_id,
1015                                service = %svc_name_for_log,
1016                                healthy = is_healthy,
1017                                has_proxy_backend = proxy_backend.is_some(),
1018                                "health status changed"
1019                            );
1020
1021                            // Always bridge health state back to ServiceManager's
1022                            // health_states map (unconditional — no proxy/IP required).
1023                            if let Some(ref health_states) = health_states_opt {
1024                                let states = Arc::clone(health_states);
1025                                let svc = svc_name_for_states.clone();
1026                                tokio::spawn(async move {
1027                                    let state = if is_healthy {
1028                                        HealthState::Healthy
1029                                    } else {
1030                                        HealthState::Unhealthy {
1031                                            failures: 0,
1032                                            reason: "health check failed".into(),
1033                                        }
1034                                    };
1035                                    states.write().await.insert(svc, state);
1036                                });
1037                            }
1038
1039                            // Update proxy backend health only when a proxy backend was
1040                            // registered (proxy manager + reachable overlay IP present).
1041                            if let Some((proxy, backend_addr)) = proxy_backend.clone() {
1042                                let svc = svc_name_for_proxy.clone();
1043                                tokio::spawn(async move {
1044                                    proxy
1045                                        .update_backend_health(&svc, backend_addr, is_healthy)
1046                                        .await;
1047                                });
1048                            }
1049                        });
1050
1051                    monitor = monitor.with_callback(health_callback);
1052
1053                    monitor.start()
1054                };
1055
1056                // Update state (short write lock)
1057                {
1058                    let mut containers = self.containers.write().await;
1059                    containers.insert(
1060                        id.clone(),
1061                        Container {
1062                            id: id.clone(),
1063                            image: self.spec.image.name.to_string(),
1064                            state: ContainerState::Running,
1065                            pid: None,
1066                            task: None,
1067                            overlay_ip: effective_ip,
1068                            health_monitor: Some(health_monitor_handle),
1069                            port_override,
1070                        },
1071                    );
1072                } // Lock released here
1073            }
1074        }
1075
1076        // Phase 3: Scale down - remove containers (short write lock per removal)
1077        //
1078        // Containers were created with `with_role_and_node(role, local_node_id)`
1079        // on scale-up, so we must reconstruct the same identity on scale-down
1080        // — the role is derived from `replica_groups` via `role_for_replica`
1081        // and the node id is the local cluster node. Mismatched ids would miss
1082        // the live entry in `self.containers` and leak the container.
1083        if replicas < current_replicas {
1084            for i in replicas..current_replicas {
1085                let replica_idx = i + 1;
1086                let id = ContainerId::with_role_and_node(
1087                    self.service_name.clone(),
1088                    replica_idx,
1089                    self.role_for_replica(replica_idx),
1090                    local_node_id,
1091                );
1092
1093                // Remove from state first and get the container to abort health monitor (short write lock)
1094                let removed_container = {
1095                    let mut containers = self.containers.write().await;
1096                    containers.remove(&id)
1097                }; // Lock released here
1098
1099                // Then perform cleanup (no lock held - I/O operations)
1100                if let Some(container) = removed_container {
1101                    // Abort the health monitor task if it exists
1102                    if let Some(handle) = container.health_monitor {
1103                        handle.abort();
1104                    }
1105
1106                    // Unpublish this container's node-loopback ports (mirror of
1107                    // the publish in the start path above). Recomputes the same
1108                    // backend from the container's stored runtime-resolved IP and
1109                    // port override; the last replica's removal frees the
1110                    // loopback listener. Gated identically to publish.
1111                    if self.spec.publish_to_node_loopback() {
1112                        if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip)
1113                        {
1114                            proxy
1115                                .unpublish_loopback_for_container(
1116                                    &self.spec,
1117                                    ip,
1118                                    container.port_override,
1119                                )
1120                                .await;
1121                        }
1122                    }
1123
1124                    // Remove DNS records for this container
1125                    if let Some(dns) = &self.dns_server {
1126                        // Remove replica-specific DNS entry
1127                        let replica_hostname =
1128                            format!("{}.{}.service.local", id.replica, self.service_name);
1129                        if let Err(e) = dns.remove_record(&replica_hostname).await {
1130                            tracing::warn!(
1131                                hostname = %replica_hostname,
1132                                error = %e,
1133                                "failed to remove replica DNS record"
1134                            );
1135                        } else {
1136                            tracing::debug!(
1137                                hostname = %replica_hostname,
1138                                "removed replica DNS record"
1139                            );
1140                        }
1141
1142                        // Remove per-role DNS entry if this was a non-default group.
1143                        // Note: this is best-effort and removes the record even if
1144                        // other replicas in the same role still need it — the DNS
1145                        // server's add/remove API is single-record so we can't keep
1146                        // it alive for siblings. P2.3-bis (round-robin per-role)
1147                        // can fix this later via a per-role refcount; for now the
1148                        // service-level hostname keeps cluster-internal clients
1149                        // working even when the role-specific record briefly
1150                        // disappears.
1151                        if id.role != "default" {
1152                            let role_hostname =
1153                                format!("{}.{}.service.local", id.role, self.service_name);
1154                            if let Err(e) = dns.remove_record(&role_hostname).await {
1155                                tracing::warn!(
1156                                    hostname = %role_hostname,
1157                                    error = %e,
1158                                    "failed to remove role DNS record"
1159                                );
1160                            } else {
1161                                tracing::debug!(
1162                                    hostname = %role_hostname,
1163                                    "removed role DNS record"
1164                                );
1165                            }
1166                        }
1167
1168                        // Note: We don't remove the service-level hostname here because
1169                        // other replicas may still be using it. The service-level record
1170                        // should be cleaned up when the entire service is removed.
1171                    }
1172
1173                    // Detach from overlay network if manager available.
1174                    //
1175                    // Done BEFORE stop_container because:
1176                    //   - The container init process must still be in
1177                    //     /proc to look up its PID via `get_container_pid`.
1178                    //   - `OverlayManager::detach_container` deletes host-side
1179                    //     veth interfaces by name (`veth-<pid>-*`) and
1180                    //     releases the allocated overlay IPs back to the
1181                    //     per-node slice. Without this the IPs leak across
1182                    //     container churn and the slice exhausts.
1183                    //
1184                    // Best-effort: failures are logged but never abort the
1185                    // scale-down. The periodic orphan sweep
1186                    // (`start_periodic_orphan_sweep`) catches anything we
1187                    // missed.
1188                    if let Some(overlay) = &self.overlay_manager {
1189                        // VM guests have no host veth/PID — release the overlayd
1190                        // allocation (IP + registered mesh peer) by container id
1191                        // instead of by PID.
1192                        if self.runtime.overlay_attach_kind()
1193                            == crate::runtime::OverlayAttachKind::InGuestVsock
1194                        {
1195                            let overlay_guard = overlay.read().await;
1196                            if let Err(e) =
1197                                overlay_guard.detach_container_guest(&id.to_string()).await
1198                            {
1199                                tracing::warn!(
1200                                    container = %id,
1201                                    error = %e,
1202                                    "overlay detach_container_guest failed; relying on orphan sweep"
1203                                );
1204                            }
1205                        } else {
1206                            match self.runtime.get_container_pid(&id).await {
1207                                Ok(Some(pid)) => {
1208                                    let overlay_guard = overlay.read().await;
1209                                    if let Err(e) = overlay_guard.detach_container(pid).await {
1210                                        tracing::warn!(
1211                                            container = %id,
1212                                            pid,
1213                                            error = %e,
1214                                            "overlay detach_container failed; relying on orphan sweep"
1215                                        );
1216                                    }
1217                                }
1218                                Ok(None) => {
1219                                    tracing::debug!(
1220                                        container = %id,
1221                                        "no PID available for overlay detach (already exited or non-Linux runtime)"
1222                                    );
1223                                }
1224                                Err(e) => {
1225                                    tracing::warn!(
1226                                        container = %id,
1227                                        error = %e,
1228                                        "failed to query container PID for overlay detach"
1229                                    );
1230                                }
1231                            }
1232                        }
1233                    }
1234
1235                    // Stop container
1236                    self.runtime
1237                        .stop_container(&id, Duration::from_secs(30))
1238                        .await?;
1239
1240                    // Sync volumes to S3 before removal (no-op if not configured)
1241                    if let Err(e) = self.runtime.sync_container_volumes(&id).await {
1242                        tracing::warn!(
1243                            container = %id,
1244                            error = %e,
1245                            "failed to sync volumes before removal"
1246                        );
1247                    }
1248
1249                    // Remove container
1250                    self.runtime.remove_container(&id).await?;
1251                }
1252            }
1253        }
1254
1255        Ok(())
1256    }
1257
1258    /// Get current number of replicas
1259    pub async fn replica_count(&self) -> usize {
1260        self.containers.read().await.len()
1261    }
1262
1263    /// Get all container IDs
1264    pub async fn container_ids(&self) -> Vec<ContainerId> {
1265        self.containers.read().await.keys().cloned().collect()
1266    }
1267
1268    /// Get per-container info (id, image, state, pid, overlay IP) for every
1269    /// live container in this instance.
1270    ///
1271    /// Surfaces the REAL image reference each container was created from and its
1272    /// REAL lifecycle state (lowercased via [`ContainerState::as_str`]) so the
1273    /// API/`ps` no longer reports a hardcoded `"running"` with no image.
1274    pub async fn container_infos(&self) -> Vec<ContainerInfo> {
1275        self.containers
1276            .read()
1277            .await
1278            .values()
1279            .map(|c| ContainerInfo {
1280                id: c.id.clone(),
1281                image: c.image.clone(),
1282                state: c.state.as_str().to_string(),
1283                pid: c.pid,
1284                overlay_ip: c.overlay_ip.map(|ip| ip.to_string()),
1285            })
1286            .collect()
1287    }
1288
1289    /// Get read access to the containers map
1290    ///
1291    /// This allows callers to access container overlay IPs and other metadata
1292    /// without copying the entire map.
1293    pub fn containers(
1294        &self,
1295    ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
1296        &self.containers
1297    }
1298
1299    /// Check if this service instance has an overlay manager configured
1300    pub fn has_overlay_manager(&self) -> bool {
1301        self.overlay_manager.is_some()
1302    }
1303
1304    /// Check if this service instance has a proxy manager configured
1305    pub fn has_proxy_manager(&self) -> bool {
1306        self.proxy_manager.is_some()
1307    }
1308
1309    /// Get the proxy manager for this instance, if configured.
1310    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1311        self.proxy_manager.as_ref()
1312    }
1313
1314    /// Check if this service instance has a DNS server configured
1315    pub fn has_dns_server(&self) -> bool {
1316        self.dns_server.is_some()
1317    }
1318}
1319
1320/// Per-container summary surfaced to callers (API / `ps`).
1321///
1322/// Carries the REAL image reference and lifecycle state of a single live
1323/// container, replacing the previous id-only view that forced the API to
1324/// fabricate a hardcoded `"running"` state with no image.
1325#[derive(Debug, Clone)]
1326pub struct ContainerInfo {
1327    /// Container identity.
1328    pub id: ContainerId,
1329    /// Image reference the container was created from (canonical form).
1330    pub image: String,
1331    /// Lowercased lifecycle state (e.g. `"running"`, `"exited"`).
1332    pub state: String,
1333    /// Process ID, when the container is running.
1334    pub pid: Option<u32>,
1335    /// Overlay IP rendered as a string, when assigned.
1336    pub overlay_ip: Option<String>,
1337}
1338
1339/// A live deployment container enriched for Docker-compat `ps` rows and for
1340/// name resolution. Produced by [`ServiceManager::list_container_views`].
1341#[derive(Debug, Clone)]
1342pub struct DeploymentContainerView {
1343    /// Deployment (compose project) name, when known.
1344    pub deployment: Option<String>,
1345    /// Service name within the deployment.
1346    pub service: String,
1347    /// Concrete container identity.
1348    pub container_id: ContainerId,
1349    /// Compose `container_name:` (the user-facing Docker name), when set.
1350    pub container_name: Option<String>,
1351    /// Image reference the container was created from.
1352    pub image: String,
1353    /// Lowercased lifecycle state (e.g. `"running"`).
1354    pub state: String,
1355    /// Process id when running.
1356    pub pid: Option<u32>,
1357    /// The service's published port mappings.
1358    pub ports: Vec<zlayer_spec::PortMapping>,
1359}
1360
1361/// Service manager for multiple services
1362pub struct ServiceManager {
1363    runtime: Arc<dyn Runtime + Send + Sync>,
1364    services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
1365    scale_semaphore: Arc<Semaphore>,
1366    /// Overlay network manager for container networking
1367    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1368    /// Stream registry for L4 proxy route registration (TCP/UDP)
1369    stream_registry: Option<Arc<StreamRegistry>>,
1370    /// Proxy manager for health-aware load balancing (hyper-based proxy)
1371    proxy_manager: Option<Arc<ProxyManager>>,
1372    /// DNS server for service discovery
1373    dns_server: Option<Arc<DnsServer>>,
1374    /// Container-injectable overlay resolver IP. When set, new service
1375    /// instances inject `<ip>` into their `spec.dns` so containers resolve
1376    /// through the overlay DNS server (bound on `<ip>:53`) rather than the
1377    /// hijacked host resolv.conf.
1378    container_dns: Option<IpAddr>,
1379    /// Deployment name (used for generating hostnames)
1380    deployment_name: Option<String>,
1381    /// Health states for dependency condition checking
1382    health_states: Arc<RwLock<HashMap<String, HealthState>>>,
1383    /// Job executor for run-to-completion workloads
1384    job_executor: Option<Arc<JobExecutor>>,
1385    /// Cron scheduler for time-based job triggers
1386    cron_scheduler: Option<Arc<CronScheduler>>,
1387    /// Container supervisor for crash/panic policy enforcement
1388    container_supervisor: Option<Arc<ContainerSupervisor>>,
1389    /// Cluster membership + dispatch handle. When `None`, scale operations
1390    /// run purely local (single-node mode). When `Some`, `scale_service`
1391    /// routes through the cluster (leader dispatches to peers; followers
1392    /// forward to the leader).
1393    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1394    /// Whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
1395    /// `NodeConfig.ingress`). Threaded onto each `ServiceInstance` so external
1396    /// service domains resolve to this node's overlay IP when it is the funnel.
1397    /// Defaults to `false`; set by the daemon from `NodeConfig.ingress`.
1398    ingress: bool,
1399}
1400
1401// ---------------------------------------------------------------------------
1402// ServiceManagerBuilder
1403// ---------------------------------------------------------------------------
1404
1405/// Builder for constructing a [`ServiceManager`] with optional subsystems.
1406///
1407/// Prefer using `ServiceManager::builder(runtime)` to start building.
1408///
1409/// # Example
1410///
1411/// ```ignore
1412/// let manager = ServiceManager::builder(runtime)
1413///     .overlay_manager(om)
1414///     .proxy_manager(proxy)
1415///     .deployment_name("prod")
1416///     .build();
1417/// ```
1418pub struct ServiceManagerBuilder {
1419    runtime: Arc<dyn Runtime + Send + Sync>,
1420    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1421    proxy_manager: Option<Arc<ProxyManager>>,
1422    stream_registry: Option<Arc<StreamRegistry>>,
1423    dns_server: Option<Arc<DnsServer>>,
1424    container_dns: Option<IpAddr>,
1425    deployment_name: Option<String>,
1426    job_executor: Option<Arc<JobExecutor>>,
1427    cron_scheduler: Option<Arc<CronScheduler>>,
1428    container_supervisor: Option<Arc<ContainerSupervisor>>,
1429    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1430}
1431
1432impl ServiceManagerBuilder {
1433    /// Create a new builder with the required runtime.
1434    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1435        Self {
1436            runtime,
1437            overlay_manager: None,
1438            proxy_manager: None,
1439            stream_registry: None,
1440            dns_server: None,
1441            container_dns: None,
1442            deployment_name: None,
1443            job_executor: None,
1444            cron_scheduler: None,
1445            container_supervisor: None,
1446            cluster: None,
1447        }
1448    }
1449
1450    /// Set the overlay network manager for container networking.
1451    #[must_use]
1452    pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
1453        self.overlay_manager = Some(om);
1454        self
1455    }
1456
1457    /// Set the proxy manager for health-aware load balancing.
1458    #[must_use]
1459    pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
1460        self.proxy_manager = Some(pm);
1461        self
1462    }
1463
1464    /// Set the stream registry for TCP/UDP L4 proxy route registration.
1465    #[must_use]
1466    pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
1467        self.stream_registry = Some(sr);
1468        self
1469    }
1470
1471    /// Set the DNS server for service discovery.
1472    #[must_use]
1473    pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1474        self.dns_server = Some(dns);
1475        self
1476    }
1477
1478    /// Set the container-injectable overlay resolver IP.
1479    ///
1480    /// The daemon passes the IP it bound the overlay DNS server on at port 53
1481    /// (see `daemon.rs` Phase 4). New service instances inject it into
1482    /// `spec.dns` so containers resolve through the overlay instead of the
1483    /// hijacked host resolv.conf. The port is implicitly 53 (resolv.conf has no
1484    /// port syntax), which is why only the bare IP is threaded here.
1485    #[must_use]
1486    pub fn container_dns(mut self, ip: IpAddr) -> Self {
1487        self.container_dns = Some(ip);
1488        self
1489    }
1490
1491    /// Set the deployment name (used for hostname generation).
1492    #[must_use]
1493    pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
1494        self.deployment_name = Some(name.into());
1495        self
1496    }
1497
1498    /// Set the job executor for run-to-completion workloads.
1499    #[must_use]
1500    pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
1501        self.job_executor = Some(je);
1502        self
1503    }
1504
1505    /// Set the cron scheduler for time-based job triggers.
1506    #[must_use]
1507    pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
1508        self.cron_scheduler = Some(cs);
1509        self
1510    }
1511
1512    /// Set the container supervisor for crash/panic policy enforcement.
1513    #[must_use]
1514    pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
1515        self.container_supervisor = Some(cs);
1516        self
1517    }
1518
1519    /// Set the cluster membership + dispatch handle. When set,
1520    /// [`ServiceManager::scale_service`] will route through the cluster
1521    /// (leader dispatches to peers; followers forward to the leader).
1522    /// When unset (the default), scale operations remain local-only.
1523    #[must_use]
1524    pub fn cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1525        self.cluster = Some(cluster);
1526        self
1527    }
1528
1529    /// Consume the builder and produce a fully-wired [`ServiceManager`].
1530    ///
1531    /// Logs warnings for missing recommended subsystems (proxy,
1532    /// `stream_registry`, `container_supervisor`, `deployment_name`).
1533    pub fn build(self) -> ServiceManager {
1534        if self.proxy_manager.is_none() {
1535            tracing::warn!("ServiceManager built without proxy_manager");
1536        }
1537        if self.stream_registry.is_none() {
1538            tracing::warn!("ServiceManager built without stream_registry");
1539        }
1540        if self.container_supervisor.is_none() {
1541            tracing::warn!("ServiceManager built without container_supervisor");
1542        }
1543        if self.deployment_name.is_none() {
1544            tracing::warn!("ServiceManager built without deployment_name");
1545        }
1546
1547        ServiceManager {
1548            runtime: self.runtime,
1549            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1550            scale_semaphore: Arc::new(Semaphore::new(10)),
1551            overlay_manager: self.overlay_manager,
1552            stream_registry: self.stream_registry,
1553            proxy_manager: self.proxy_manager,
1554            dns_server: self.dns_server,
1555            container_dns: self.container_dns,
1556            deployment_name: self.deployment_name,
1557            health_states: Arc::new(RwLock::new(HashMap::new())),
1558            job_executor: self.job_executor,
1559            cron_scheduler: self.cron_scheduler,
1560            container_supervisor: self.container_supervisor,
1561            cluster: self.cluster,
1562            ingress: false,
1563        }
1564    }
1565}
1566
1567impl ServiceManager {
1568    /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
1569    ///
1570    /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
1571    ///
1572    /// # Example
1573    ///
1574    /// ```ignore
1575    /// let manager = ServiceManager::builder(runtime)
1576    ///     .overlay_manager(om)
1577    ///     .proxy_manager(proxy)
1578    ///     .build();
1579    /// ```
1580    pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
1581        ServiceManagerBuilder::new(runtime)
1582    }
1583
1584    /// Create a new service manager
1585    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1586    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1587        Self {
1588            runtime,
1589            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1590            scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
1591            overlay_manager: None,
1592            stream_registry: None,
1593            proxy_manager: None,
1594            dns_server: None,
1595            container_dns: None,
1596            deployment_name: None,
1597            health_states: Arc::new(RwLock::new(HashMap::new())),
1598            job_executor: None,
1599            cron_scheduler: None,
1600            container_supervisor: None,
1601            cluster: None,
1602            ingress: false,
1603        }
1604    }
1605
1606    /// Create a service manager with overlay network support
1607    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1608    pub fn with_overlay(
1609        runtime: Arc<dyn Runtime + Send + Sync>,
1610        overlay_manager: Arc<RwLock<OverlayManager>>,
1611    ) -> Self {
1612        Self {
1613            runtime,
1614            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1615            scale_semaphore: Arc::new(Semaphore::new(10)),
1616            overlay_manager: Some(overlay_manager),
1617            stream_registry: None,
1618            proxy_manager: None,
1619            dns_server: None,
1620            container_dns: None,
1621            deployment_name: None,
1622            health_states: Arc::new(RwLock::new(HashMap::new())),
1623            job_executor: None,
1624            cron_scheduler: None,
1625            container_supervisor: None,
1626            cluster: None,
1627            ingress: false,
1628        }
1629    }
1630
1631    /// Create a fully-configured service manager with overlay and proxy support
1632    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1633    pub fn with_full_config(
1634        runtime: Arc<dyn Runtime + Send + Sync>,
1635        overlay_manager: Arc<RwLock<OverlayManager>>,
1636        deployment_name: String,
1637    ) -> Self {
1638        Self {
1639            runtime,
1640            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1641            scale_semaphore: Arc::new(Semaphore::new(10)),
1642            overlay_manager: Some(overlay_manager),
1643            stream_registry: None,
1644            proxy_manager: None,
1645            dns_server: None,
1646            container_dns: None,
1647            deployment_name: Some(deployment_name),
1648            health_states: Arc::new(RwLock::new(HashMap::new())),
1649            job_executor: None,
1650            cron_scheduler: None,
1651            container_supervisor: None,
1652            cluster: None,
1653            ingress: false,
1654        }
1655    }
1656
1657    /// Get the health states map for external monitoring
1658    pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
1659        Arc::clone(&self.health_states)
1660    }
1661
1662    /// Update health state for a service
1663    pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1664        let mut states = self.health_states.write().await;
1665        states.insert(service_name.to_string(), state);
1666    }
1667
1668    /// Set the deployment name (used for generating hostnames)
1669    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1670    pub fn set_deployment_name(&mut self, name: String) {
1671        self.deployment_name = Some(name);
1672    }
1673
1674    /// Set the stream registry for L4 proxy integration (TCP/UDP)
1675    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1676    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1677        self.stream_registry = Some(registry);
1678    }
1679
1680    /// Builder pattern: add stream registry for L4 proxy integration
1681    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1682    #[must_use]
1683    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1684        self.stream_registry = Some(registry);
1685        self
1686    }
1687
1688    /// Get the stream registry (if configured)
1689    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1690        self.stream_registry.as_ref()
1691    }
1692
1693    /// Set the overlay manager for container networking
1694    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1695    pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1696        self.overlay_manager = Some(manager);
1697    }
1698
1699    /// Set the proxy manager for health-aware load balancing
1700    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1701    pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1702        self.proxy_manager = Some(proxy);
1703    }
1704
1705    /// Builder pattern: add proxy manager for health-aware load balancing
1706    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1707    #[must_use]
1708    pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1709        self.proxy_manager = Some(proxy);
1710        self
1711    }
1712
1713    /// Get the proxy manager (if configured)
1714    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1715        self.proxy_manager.as_ref()
1716    }
1717
1718    /// Set the DNS server for service discovery
1719    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1720    pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1721        self.dns_server = Some(dns);
1722    }
1723
1724    /// Builder pattern: add DNS server for service discovery
1725    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1726    #[must_use]
1727    pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1728        self.dns_server = Some(dns);
1729        self
1730    }
1731
1732    /// Get the DNS server (if configured)
1733    pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1734        self.dns_server.as_ref()
1735    }
1736
1737    /// Set the job executor for run-to-completion workloads
1738    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1739    pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1740        self.job_executor = Some(executor);
1741    }
1742
1743    /// Set the cron scheduler for time-based job triggers
1744    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1745    pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1746        self.cron_scheduler = Some(scheduler);
1747    }
1748
1749    /// Builder pattern: add job executor
1750    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1751    #[must_use]
1752    pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1753        self.job_executor = Some(executor);
1754        self
1755    }
1756
1757    /// Builder pattern: add cron scheduler
1758    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1759    #[must_use]
1760    pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1761        self.cron_scheduler = Some(scheduler);
1762        self
1763    }
1764
1765    /// Set the cluster handle for cluster-aware scaling.
1766    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1767    pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
1768        self.cluster = Some(cluster);
1769    }
1770
1771    /// Builder pattern: add a cluster handle for cluster-aware scaling.
1772    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1773    #[must_use]
1774    pub fn with_cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1775        self.cluster = Some(cluster);
1776        self
1777    }
1778
1779    /// Get the cluster handle (if configured).
1780    pub fn cluster(&self) -> Option<&Arc<dyn zlayer_scheduler::cluster::Cluster>> {
1781        self.cluster.as_ref()
1782    }
1783
1784    /// Set whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
1785    /// `NodeConfig.ingress`). Threaded onto each new `ServiceInstance` so its
1786    /// external service domains resolve to this node's overlay IP when it is
1787    /// the funnel. Defaults to `false`.
1788    pub fn set_ingress(&mut self, enabled: bool) {
1789        self.ingress = enabled;
1790    }
1791
1792    /// Whether THIS node holds the standing HTTP/HTTPS ingress.
1793    #[must_use]
1794    pub fn ingress(&self) -> bool {
1795        self.ingress
1796    }
1797
1798    /// Get the job executor (if configured)
1799    pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1800        self.job_executor.as_ref()
1801    }
1802
1803    /// Get the cron scheduler (if configured)
1804    pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1805        self.cron_scheduler.as_ref()
1806    }
1807
1808    /// Set the container supervisor for crash/panic policy enforcement
1809    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1810    pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1811        self.container_supervisor = Some(supervisor);
1812    }
1813
1814    /// Builder pattern: add container supervisor
1815    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1816    #[must_use]
1817    pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1818        self.container_supervisor = Some(supervisor);
1819        self
1820    }
1821
1822    /// Get the container supervisor (if configured)
1823    pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1824        self.container_supervisor.as_ref()
1825    }
1826
1827    /// Start the container supervisor background task
1828    ///
1829    /// This spawns a background task that monitors containers for crashes
1830    /// and enforces the `on_panic` error policy.
1831    ///
1832    /// # Errors
1833    /// Returns an error if no container supervisor is configured.
1834    ///
1835    /// # Returns
1836    /// A `JoinHandle` for the supervisor task.
1837    pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1838        let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1839            AgentError::Configuration("Container supervisor not configured".to_string())
1840        })?;
1841
1842        let supervisor = Arc::clone(supervisor);
1843        Ok(tokio::spawn(async move {
1844            supervisor.run_loop().await;
1845        }))
1846    }
1847
1848    /// Shutdown the container supervisor
1849    pub fn shutdown_container_supervisor(&self) {
1850        if let Some(supervisor) = &self.container_supervisor {
1851            supervisor.shutdown();
1852        }
1853    }
1854
1855    /// Get the supervised state of a container
1856    pub async fn get_container_supervised_state(
1857        &self,
1858        container_id: &ContainerId,
1859    ) -> Option<SupervisedState> {
1860        if let Some(supervisor) = &self.container_supervisor {
1861            supervisor.get_state(container_id).await
1862        } else {
1863            None
1864        }
1865    }
1866
1867    /// Get supervisor events receiver
1868    ///
1869    /// Note: This can only be called once; the receiver is moved to the caller.
1870    pub async fn take_supervisor_events(
1871        &self,
1872    ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1873        if let Some(supervisor) = &self.container_supervisor {
1874            supervisor.take_event_receiver().await
1875        } else {
1876            None
1877        }
1878    }
1879
1880    // ==================== Dependency Orchestration ====================
1881
1882    /// Deploy multiple services respecting their dependency order
1883    ///
1884    /// This method:
1885    /// 1. Builds a dependency graph from the services
1886    /// 2. Validates no cycles exist
1887    /// 3. Computes topological order (services with no deps first)
1888    /// 4. For each service in order, waits for dependencies then starts the service
1889    ///
1890    /// # Arguments
1891    /// * `services` - Map of service name to service specification
1892    ///
1893    /// # Errors
1894    /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1895    /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1896    pub async fn deploy_with_dependencies(
1897        &self,
1898        services: HashMap<String, ServiceSpec>,
1899    ) -> Result<()> {
1900        if services.is_empty() {
1901            return Ok(());
1902        }
1903
1904        // Build dependency graph
1905        let graph = DependencyGraph::build(&services)?;
1906
1907        tracing::info!(
1908            service_count = services.len(),
1909            "Starting deployment with dependency ordering"
1910        );
1911
1912        // Get startup order
1913        let order = graph.startup_order();
1914        tracing::debug!(order = ?order, "Computed startup order");
1915
1916        // Start services in dependency order
1917        for service_name in order {
1918            let service_spec = services
1919                .get(service_name)
1920                .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1921
1922            // Wait for dependencies first
1923            if !service_spec.depends.is_empty() {
1924                tracing::info!(
1925                    service = %service_name,
1926                    dependency_count = service_spec.depends.len(),
1927                    "Waiting for dependencies"
1928                );
1929                self.wait_for_dependencies(service_name, &service_spec.depends)
1930                    .await?;
1931            }
1932
1933            // Register and start service
1934            tracing::info!(service = %service_name, "Starting service");
1935            Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
1936
1937            // Get the desired replica count from scale config
1938            let replicas = match &service_spec.scale {
1939                zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1940                zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1941                zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1942            };
1943            self.scale_service(service_name, replicas).await?;
1944
1945            // Mark service as started in health states (Unknown until health check runs)
1946            self.update_health_state(service_name, HealthState::Unknown)
1947                .await;
1948
1949            tracing::info!(
1950                service = %service_name,
1951                replicas = replicas,
1952                "Service started"
1953            );
1954        }
1955
1956        tracing::info!(service_count = services.len(), "Deployment complete");
1957
1958        Ok(())
1959    }
1960
1961    /// Wait for all dependencies of a service to be satisfied
1962    ///
1963    /// # Arguments
1964    /// * `service` - Name of the service waiting for dependencies
1965    /// * `deps` - Slice of dependency specifications
1966    ///
1967    /// # Errors
1968    /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1969    async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1970        let condition_checker = DependencyConditionChecker::new(
1971            Arc::clone(&self.runtime),
1972            Arc::clone(&self.health_states),
1973            None,
1974        );
1975
1976        let waiter = DependencyWaiter::new(condition_checker);
1977        let results = waiter.wait_for_all(deps).await?;
1978
1979        // Check results for failures
1980        for result in results {
1981            match result {
1982                WaitResult::TimedOutFail {
1983                    service: dep_service,
1984                    condition,
1985                    timeout,
1986                } => {
1987                    return Err(AgentError::DependencyTimeout {
1988                        service: service.to_string(),
1989                        dependency: dep_service,
1990                        condition: format!("{condition:?}"),
1991                        timeout,
1992                    });
1993                }
1994                WaitResult::TimedOutWarn {
1995                    service: dep_service,
1996                    condition,
1997                } => {
1998                    tracing::warn!(
1999                        service = %service,
2000                        dependency = %dep_service,
2001                        condition = ?condition,
2002                        "Dependency timed out but continuing"
2003                    );
2004                }
2005                WaitResult::TimedOutContinue | WaitResult::Satisfied => {
2006                    // Continue silently
2007                }
2008            }
2009        }
2010
2011        Ok(())
2012    }
2013
2014    /// Check if all dependencies for a service are currently satisfied
2015    ///
2016    /// This is a one-shot check (no waiting). Useful for pre-flight validation.
2017    ///
2018    /// # Errors
2019    /// Returns an error if a dependency check fails unexpectedly.
2020    pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
2021        let condition_checker = DependencyConditionChecker::new(
2022            Arc::clone(&self.runtime),
2023            Arc::clone(&self.health_states),
2024            None,
2025        );
2026
2027        for dep in deps {
2028            if !condition_checker.check(dep).await? {
2029                return Ok(false);
2030            }
2031        }
2032
2033        Ok(true)
2034    }
2035
2036    /// Add or update a workload (service, job, or cron)
2037    ///
2038    /// This method handles different resource types appropriately:
2039    /// - **Service**: Traditional long-running containers with scaling and health checks
2040    /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
2041    /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
2042    ///
2043    /// # Errors
2044    /// Returns an error if service creation, scaling, or cron registration fails.
2045    #[allow(clippy::too_many_lines)]
2046    pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
2047        match spec.rtype {
2048            ResourceType::Service => {
2049                // Long-running service: create/update instance
2050                let mut services = self.services.write().await;
2051
2052                if let Some(instance) = services.get_mut(&name) {
2053                    // Update existing service. We need to:
2054                    //   1. Update the in-memory spec (so future scale-ups use the new image).
2055                    //   2. Recreate the local replicas when the image actually changed —
2056                    //      either a different image *reference* (e.g. tag bump
2057                    //      1.28 -> 1.29), which is a new image regardless of pull
2058                    //      policy, or, under Always/Newer, observed *digest* drift on
2059                    //      the same reference.
2060                    // The recreate is LOCAL (`scale_service_local`): `upsert_service`
2061                    // runs on whichever node owns the replicas (the leader for its
2062                    // own share, each worker via the `/internal/scale` handler). Using
2063                    // the cluster-routed `scale_service` here would bounce a worker's
2064                    // recreate back to the leader and re-enter dispatch. Cluster-wide
2065                    // distribution is the caller's job (orchestrate_deployment + the
2066                    // scale dispatch that carries this spec to every node).
2067                    let image_changed = instance.spec.image.name != spec.image.name;
2068                    instance.spec = spec.clone();
2069                    if let Some(dns) = &self.dns_server {
2070                        instance.set_dns_server(Arc::clone(dns));
2071                    }
2072                    // Re-apply overlay resolver injection: the spec was just
2073                    // replaced wholesale, so any prior injection on the old
2074                    // spec is gone. Honors host_network / user-supplied dns.
2075                    if let Some(ip) = self.container_dns {
2076                        instance.set_container_dns(ip);
2077                    }
2078
2079                    let effective = spec.image.pull_policy;
2080                    let old_digest = instance.last_pulled_digest().await;
2081                    let current_replicas =
2082                        u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
2083                    drop(services); // Release write lock before pull / scale (which take their own locks).
2084
2085                    // A changed image reference always recreates. Same-reference
2086                    // refreshes are governed by pull policy + digest drift.
2087                    let mut should_recreate = image_changed;
2088                    let mut new_digest = old_digest.clone();
2089
2090                    match effective {
2091                        PullPolicy::Never | PullPolicy::IfNotPresent => {
2092                            // No proactive pull. If the reference changed we still
2093                            // recreate below; the scale-up path pulls the (absent) new
2094                            // image per IfNotPresent. A same-reference redeploy under
2095                            // these policies is a genuine no-op.
2096                            tracing::debug!(
2097                                service = %name,
2098                                policy = ?effective,
2099                                image_changed,
2100                                "re-deploy under no-refresh pull policy"
2101                            );
2102                        }
2103                        PullPolicy::Always | PullPolicy::Newer => {
2104                            // Pull (this updates the cached digest as a side-effect).
2105                            // We need a read guard to keep the instance alive while
2106                            // calling its &self method.
2107                            let services_ro = self.services.read().await;
2108                            new_digest = if let Some(inst) = services_ro.get(&name) {
2109                                inst.pull_and_refresh_digest().await?
2110                            } else {
2111                                // The service vanished between our write-lock release
2112                                // and read-lock acquisition (race with remove_service).
2113                                // Treat this as a no-op; the caller will see the removal.
2114                                tracing::warn!(
2115                                    service = %name,
2116                                    "service removed during upsert; skipping drift recreate"
2117                                );
2118                                return Ok(());
2119                            };
2120                            drop(services_ro);
2121
2122                            // Always forces a recreate. Newer recreates on digest
2123                            // drift. When digests are unknown (runtime doesn't expose
2124                            // them), we can't observe drift safely under Newer, so the
2125                            // reference check above is the only trigger.
2126                            should_recreate = should_recreate
2127                                || match effective {
2128                                    PullPolicy::Always => true,
2129                                    PullPolicy::Newer => match (&old_digest, &new_digest) {
2130                                        (Some(old), Some(new)) => old != new,
2131                                        _ => false,
2132                                    },
2133                                    _ => false,
2134                                };
2135                        }
2136                    }
2137
2138                    if should_recreate && current_replicas > 0 {
2139                        tracing::info!(
2140                            service = %name,
2141                            policy = ?effective,
2142                            image_changed,
2143                            old_digest = ?old_digest,
2144                            new_digest = ?new_digest,
2145                            replicas = current_replicas,
2146                            "image changed; performing local rolling recreate"
2147                        );
2148                        self.scale_service_local(&name, 0).await?;
2149                        self.scale_service_local(&name, current_replicas).await?;
2150                        tracing::info!(
2151                            service = %name,
2152                            new_digest = ?new_digest,
2153                            "service recreated with refreshed image"
2154                        );
2155                    } else {
2156                        tracing::debug!(
2157                            service = %name,
2158                            policy = ?effective,
2159                            old_digest = ?old_digest,
2160                            new_digest = ?new_digest,
2161                            "service up to date; no recreate required"
2162                        );
2163                    }
2164                    return Ok(());
2165                }
2166                // Create new service with proxy manager for health-aware load balancing
2167                let overlay = self.overlay_manager.as_ref().map(Arc::clone);
2168                let mut instance = if let Some(proxy) = &self.proxy_manager {
2169                    ServiceInstance::with_proxy(
2170                        name.clone(),
2171                        spec,
2172                        self.runtime.clone(),
2173                        overlay,
2174                        Arc::clone(proxy),
2175                    )
2176                } else {
2177                    ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
2178                };
2179                // Thread the local cluster node id so new `ContainerId`s carry
2180                // owning-node identity. Defaults to `0` in single-node mode.
2181                instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
2182                // Thread ingress capability + cluster handle so external service
2183                // domains resolve to an ingress-capable node's overlay IP.
2184                instance.set_ingress_enabled(self.ingress);
2185                if let Some(cluster) = &self.cluster {
2186                    instance.set_cluster(Arc::clone(cluster));
2187                }
2188                // Set DNS server if configured
2189                if let Some(dns) = &self.dns_server {
2190                    instance.set_dns_server(Arc::clone(dns));
2191                }
2192                // Inject the overlay resolver into the spec so containers use it
2193                // instead of the hijacked host resolv.conf (no-op for
2194                // host_network / user-supplied dns).
2195                if let Some(ip) = self.container_dns {
2196                    instance.set_container_dns(ip);
2197                }
2198                // Wire shared health states so callbacks bridge back to ServiceManager
2199                instance.set_health_states(Arc::clone(&self.health_states));
2200                // Register HTTP routes via proxy manager
2201                if let Some(proxy) = &self.proxy_manager {
2202                    proxy.add_service(&name, &instance.spec).await;
2203                }
2204                // Register TCP/UDP endpoints in stream registry
2205                if let Some(stream_registry) = &self.stream_registry {
2206                    for endpoint in &instance.spec.endpoints {
2207                        let svc = StreamService::new(
2208                            name.clone(),
2209                            Vec::new(), // No backends yet; added on scale-up
2210                        );
2211                        match endpoint.protocol {
2212                            Protocol::Tcp => {
2213                                stream_registry.register_tcp(endpoint.port, svc);
2214                                tracing::debug!(
2215                                    service = %name,
2216                                    port = endpoint.port,
2217                                    "Registered TCP stream route"
2218                                );
2219                            }
2220                            Protocol::Udp => {
2221                                stream_registry.register_udp(endpoint.port, svc);
2222                                tracing::debug!(
2223                                    service = %name,
2224                                    port = endpoint.port,
2225                                    "Registered UDP stream route"
2226                                );
2227                            }
2228                            _ => {} // HTTP routes handled by proxy manager
2229                        }
2230                    }
2231                }
2232                services.insert(name, instance);
2233            }
2234            ResourceType::Job => {
2235                // Job: Just store the spec for later triggering
2236                // Jobs don't start containers immediately; they're triggered on-demand
2237                if let Some(executor) = &self.job_executor {
2238                    executor.register_job(&name, spec).await;
2239                    tracing::info!(job = %name, "Registered job spec");
2240                } else {
2241                    tracing::warn!(
2242                        job = %name,
2243                        "Job executor not configured, storing as service for reference"
2244                    );
2245                    // Fallback: store as service instance for reference
2246                    let mut services = self.services.write().await;
2247                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
2248                    let mut instance = if let Some(proxy) = &self.proxy_manager {
2249                        ServiceInstance::with_proxy(
2250                            name.clone(),
2251                            spec,
2252                            self.runtime.clone(),
2253                            overlay,
2254                            Arc::clone(proxy),
2255                        )
2256                    } else {
2257                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
2258                    };
2259                    // Thread the local cluster node id (same as the Service
2260                    // branch above) so the fallback-as-service Job entry also
2261                    // carries owning-node identity.
2262                    instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
2263                    // Thread ingress capability + cluster handle (same as the
2264                    // Service branch).
2265                    instance.set_ingress_enabled(self.ingress);
2266                    if let Some(cluster) = &self.cluster {
2267                        instance.set_cluster(Arc::clone(cluster));
2268                    }
2269                    // Set DNS server if configured
2270                    if let Some(dns) = &self.dns_server {
2271                        instance.set_dns_server(Arc::clone(dns));
2272                    }
2273                    // Inject the overlay resolver (no-op for host_network /
2274                    // user-supplied dns).
2275                    if let Some(ip) = self.container_dns {
2276                        instance.set_container_dns(ip);
2277                    }
2278                    services.insert(name, instance);
2279                }
2280            }
2281            ResourceType::Cron => {
2282                // Cron: Register with the cron scheduler
2283                if let Some(scheduler) = &self.cron_scheduler {
2284                    scheduler.register(&name, &spec).await?;
2285                    tracing::info!(cron = %name, "Registered cron job with scheduler");
2286                } else {
2287                    return Err(AgentError::Configuration(format!(
2288                        "Cron scheduler not configured for cron job '{name}'"
2289                    )));
2290                }
2291            }
2292        }
2293
2294        Ok(())
2295    }
2296
2297    /// Update backend addresses via `ProxyManager` after scaling, applying
2298    /// per-endpoint `target_role` filtering.
2299    ///
2300    /// For each L7 endpoint of the service, this collects the subset of
2301    /// containers whose `ContainerId.role` matches `endpoint.target_role`
2302    /// (or all containers when `target_role` is `None`) and updates the
2303    /// proxy's backend pool for that specific endpoint via
2304    /// [`ProxyManager::update_endpoint_backends`].
2305    async fn update_proxy_backends(&self, instance: &ServiceInstance) {
2306        let Some(proxy) = &self.proxy_manager else {
2307            return;
2308        };
2309        for endpoint in &instance.spec.endpoints {
2310            // Only L7 endpoints flow through the proxy (HTTP/HTTPS/WS).
2311            if !matches!(
2312                endpoint.protocol,
2313                Protocol::Http | Protocol::Https | Protocol::Websocket
2314            ) {
2315                continue;
2316            }
2317            let addrs = self.collect_endpoint_backends(instance, endpoint).await;
2318            proxy
2319                .update_endpoint_backends(&instance.service_name, &endpoint.name, addrs)
2320                .await;
2321        }
2322    }
2323
2324    /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
2325    ///
2326    /// For containers with a port override (macOS sandbox), the addresses already
2327    /// carry the runtime-assigned port. In that case, the container listens on the
2328    /// override port for all traffic, so we use the address port directly. For
2329    /// containers without a port override (Linux, VMs), we reconstruct addresses
2330    /// using the endpoint's declared port, since each container has its own IP
2331    /// and can bind any port independently.
2332    async fn update_stream_backends(&self, instance: &ServiceInstance) {
2333        let Some(stream_registry) = &self.stream_registry else {
2334            return;
2335        };
2336
2337        for endpoint in &instance.spec.endpoints {
2338            match endpoint.protocol {
2339                Protocol::Tcp => {
2340                    let tcp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2341                    let backend_count = tcp_backends.len();
2342                    stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
2343                    tracing::debug!(
2344                        endpoint = %endpoint.name,
2345                        port = endpoint.port,
2346                        backend_count = backend_count,
2347                        target_role = ?endpoint.target_role,
2348                        "Updated TCP stream backends"
2349                    );
2350                }
2351                Protocol::Udp => {
2352                    let udp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2353                    let backend_count = udp_backends.len();
2354                    stream_registry.update_udp_backends(endpoint.port, udp_backends);
2355                    tracing::debug!(
2356                        endpoint = %endpoint.name,
2357                        port = endpoint.port,
2358                        backend_count = backend_count,
2359                        target_role = ?endpoint.target_role,
2360                        "Updated UDP stream backends"
2361                    );
2362                }
2363                _ => {} // HTTP endpoints handled by update_proxy_backends
2364            }
2365        }
2366    }
2367
2368    /// Scale a service. Cluster-aware: if this node has a `Cluster` handle
2369    /// and we're not the leader, forward to the leader; if leader, compute
2370    /// affinity-aware placement and dispatch each node its share via
2371    /// `dispatch_scale_distributed`; else (single-node) just scale locally.
2372    ///
2373    /// # Errors
2374    /// Returns an error if scaling fails on any participating node.
2375    #[allow(clippy::cast_possible_truncation)]
2376    pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
2377        use zlayer_scheduler::cluster::InternalScaleRequest;
2378
2379        tracing::info!(
2380            target: "zlayer::scale_distribute",
2381            service = name,
2382            replicas,
2383            has_cluster = self.cluster.is_some(),
2384            "scale_service ENTER"
2385        );
2386
2387        // Attach the current spec so every receiving node can register/update
2388        // the service before scaling. This is what propagates an image change
2389        // to worker containers and lets a fresh worker run a replica it has
2390        // never seen. `None` if the service isn't registered locally (the
2391        // receiver then falls back to its own cached spec).
2392        let spec = self
2393            .services
2394            .read()
2395            .await
2396            .get(name)
2397            .map(|inst| inst.spec.clone());
2398        let build_req = |replicas: u32| {
2399            let req = InternalScaleRequest::new(name, replicas);
2400            match spec.clone() {
2401                Some(s) => req.with_spec(s),
2402                None => req,
2403            }
2404        };
2405
2406        if let Some(cluster) = &self.cluster {
2407            let is_leader = cluster.is_leader().await;
2408            tracing::info!(
2409                target: "zlayer::scale_distribute",
2410                service = name,
2411                replicas,
2412                is_leader,
2413                spec_affinity = ?spec.as_ref().and_then(|s| s.affinity.clone()),
2414                "scale_service: cluster path"
2415            );
2416            if !is_leader {
2417                // Follower: forward to the leader and let it dispatch.
2418                return cluster
2419                    .forward_scale(build_req(replicas))
2420                    .await
2421                    .map_err(|e| AgentError::CreateFailed {
2422                        id: name.to_string(),
2423                        reason: format!("cluster forward: {e}"),
2424                    });
2425            }
2426
2427            // Leader path. Compute affinity-aware placement across the Ready
2428            // node set and dispatch each node its share. `dispatch_scale_distributed`
2429            // reuses the same placement machinery as one-off container placement
2430            // (`cluster_nodes_to_node_states` + `place_service_replicas`), honoring
2431            // `ServiceSpec.affinity` (`spread`/`pack`/`pin`). The leader's own
2432            // share short-circuits to a local call (no localhost HTTP round-trip),
2433            // and the attached spec lets fresh workers register the service before
2434            // scaling. Single-node clusters fall through the default impl, which
2435            // dispatches everything to this node (unchanged behavior).
2436            return cluster
2437                .dispatch_scale_distributed(build_req(replicas))
2438                .await
2439                .map_err(|e| AgentError::CreateFailed {
2440                    id: name.to_string(),
2441                    reason: format!("cluster dispatch: {e}"),
2442                });
2443        }
2444
2445        // No cluster handle — single-node mode.
2446        self.scale_service_local(name, replicas).await
2447    }
2448
2449    /// Local (single-node) scale: directly creates/destroys containers on
2450    /// this node only. Called by:
2451    ///   - `scale_service` in single-node mode (when `self.cluster` is None).
2452    ///   - The `/api/v1/internal/scale` handler (which the leader's
2453    ///     `Cluster::dispatch_scale` HTTP-POSTs to, bottoming out the
2454    ///     recursive loop on each receiving node).
2455    ///   - The cluster impls' `local_dispatch` closure (for the leader's own
2456    ///     share — short-circuited to avoid a localhost round-trip).
2457    ///
2458    /// # Errors
2459    /// Returns an error if the service is not found or scaling fails.
2460    #[allow(clippy::cast_possible_truncation)]
2461    pub async fn scale_service_local(&self, name: &str, replicas: u32) -> Result<()> {
2462        tracing::info!(
2463            target: "zlayer::scale_distribute",
2464            service = name,
2465            replicas,
2466            "scale_service_local ENTER"
2467        );
2468        let _permit = self.scale_semaphore.acquire().await;
2469
2470        let services = self.services.read().await;
2471        let Some(instance) = services.get(name) else {
2472            // Draining a service this node never hosted is a no-op (e.g. the
2473            // leader fans out `count=0` to a node to drain it during a
2474            // scale-down, but that node never ran the service).
2475            if replicas == 0 {
2476                return Ok(());
2477            }
2478            return Err(AgentError::NotFound {
2479                container: name.to_string(),
2480                reason: "service not found".to_string(),
2481            });
2482        };
2483
2484        // Get current replica count before scaling
2485        let current_replicas = instance.replica_count().await as u32;
2486
2487        // Perform the scaling operation
2488        instance.scale_to(replicas).await?;
2489
2490        // After scaling, update proxy and stream backends for each endpoint.
2491        // Per-endpoint collection (rather than a single service-wide list)
2492        // is what makes `EndpointSpec.target_role` filtering possible:
2493        // each endpoint receives only the containers whose
2494        // `ContainerId.role` matches its declared role.
2495        if self.proxy_manager.is_some() {
2496            self.update_proxy_backends(instance).await;
2497        }
2498        if self.stream_registry.is_some() {
2499            self.update_stream_backends(instance).await;
2500        }
2501
2502        // Register new containers with supervisor for crash monitoring.
2503        //
2504        // Container ids here must match what `ServiceInstance::scale_to`
2505        // constructed — same role (derived from `replica_groups`) and same
2506        // local node id. Otherwise supervise/unsupervise miss the live entry
2507        // and crash-restart bookkeeping leaks across scale events.
2508        let local_node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2509        if let Some(supervisor) = &self.container_supervisor {
2510            // For scale-up, register new containers
2511            if replicas > current_replicas {
2512                for i in current_replicas..replicas {
2513                    let replica_idx = i + 1;
2514                    let container_id = ContainerId::with_role_and_node(
2515                        name.to_string(),
2516                        replica_idx,
2517                        instance.role_for_replica(replica_idx),
2518                        local_node_id,
2519                    );
2520                    supervisor.supervise(&container_id, &instance.spec).await;
2521                }
2522            }
2523            // For scale-down, unregister removed containers
2524            if replicas < current_replicas {
2525                for i in replicas..current_replicas {
2526                    let replica_idx = i + 1;
2527                    let container_id = ContainerId::with_role_and_node(
2528                        name.to_string(),
2529                        replica_idx,
2530                        instance.role_for_replica(replica_idx),
2531                        local_node_id,
2532                    );
2533                    supervisor.unsupervise(&container_id).await;
2534                }
2535            }
2536        }
2537
2538        Ok(())
2539    }
2540
2541    /// Collect backend addresses for a single endpoint of a service.
2542    ///
2543    /// This queries the service instance's containers for their overlay
2544    /// network IP addresses and constructs backend addresses using the
2545    /// endpoint's container target port.
2546    ///
2547    /// Containers are filtered by `endpoint.target_role`:
2548    /// - `None` (default): all containers of the service are eligible
2549    ///   (legacy behavior).
2550    /// - `Some(role)`: only containers whose `ContainerId.role` equals
2551    ///   `role` are included. Implements
2552    ///   [`zlayer_spec::EndpointSpec::target_role`].
2553    ///
2554    /// If a container has a `port_override` (e.g., macOS sandbox where all
2555    /// containers share the host network), that port is used instead of
2556    /// the spec-declared endpoint port. This allows multiple replicas on
2557    /// the same IP (`127.0.0.1`) to be distinguished by port.
2558    async fn collect_endpoint_backends(
2559        &self,
2560        instance: &ServiceInstance,
2561        endpoint: &zlayer_spec::EndpointSpec,
2562    ) -> Vec<SocketAddr> {
2563        let mut addrs = Vec::new();
2564        let endpoint_port = endpoint.target_port();
2565        let containers = instance.containers().read().await;
2566
2567        for (container_id, container) in containers.iter() {
2568            // target_role filter: skip containers whose role doesn't match.
2569            if let Some(required_role) = endpoint.target_role.as_ref() {
2570                if container_id.role != *required_role {
2571                    continue;
2572                }
2573            }
2574            let Some(ip) = container.overlay_ip else {
2575                continue;
2576            };
2577            // Use the runtime-assigned port override if present (macOS
2578            // sandbox), otherwise fall back to the endpoint's declared
2579            // target port.
2580            let port = container.port_override.unwrap_or(endpoint_port);
2581            addrs.push(SocketAddr::new(ip, port));
2582        }
2583
2584        // If we expected backends but found none, log a hint so operators
2585        // can debug. Distinguish "no containers" from "role filter
2586        // excluded everything" from "no overlay IPs".
2587        if addrs.is_empty() && !containers.is_empty() {
2588            tracing::warn!(
2589                service = %instance.service_name,
2590                endpoint = %endpoint.name,
2591                target_role = ?endpoint.target_role,
2592                container_count = containers.len(),
2593                "no backends collected for endpoint - either no matching role, no overlay IPs, or filtering excluded all"
2594            );
2595        }
2596
2597        addrs
2598    }
2599
2600    /// Get service replica count
2601    ///
2602    /// # Errors
2603    /// Returns an error if the service is not found.
2604    pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
2605        let services = self.services.read().await;
2606        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
2607            container: name.to_string(),
2608            reason: "service not found".to_string(),
2609        })?;
2610
2611        Ok(instance.replica_count().await)
2612    }
2613
2614    /// Remove a workload (service, job, or cron)
2615    ///
2616    /// This method handles cleanup for different resource types:
2617    /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
2618    /// - **Job**: Unregisters from job executor
2619    /// - **Cron**: Unregisters from cron scheduler
2620    ///
2621    /// # Errors
2622    /// Returns an error if the service cannot be removed or scale-down fails.
2623    pub async fn remove_service(&self, name: &str) -> Result<()> {
2624        // Try to unregister from cron scheduler first
2625        if let Some(scheduler) = &self.cron_scheduler {
2626            scheduler.unregister(name).await;
2627        }
2628
2629        // Try to unregister from job executor
2630        if let Some(executor) = &self.job_executor {
2631            executor.unregister_job(name).await;
2632        }
2633
2634        // Unregister stream routes (TCP/UDP) from the stream registry
2635        if let Some(stream_registry) = &self.stream_registry {
2636            // Need to get the service spec to know which ports to unregister
2637            let services = self.services.read().await;
2638            if let Some(instance) = services.get(name) {
2639                for endpoint in &instance.spec.endpoints {
2640                    match endpoint.protocol {
2641                        Protocol::Tcp => {
2642                            let _ = stream_registry.unregister_tcp(endpoint.port);
2643                            tracing::debug!(
2644                                service = %name,
2645                                port = endpoint.port,
2646                                "Unregistered TCP stream route"
2647                            );
2648                        }
2649                        Protocol::Udp => {
2650                            let _ = stream_registry.unregister_udp(endpoint.port);
2651                            tracing::debug!(
2652                                service = %name,
2653                                port = endpoint.port,
2654                                "Unregistered UDP stream route"
2655                            );
2656                        }
2657                        _ => {} // HTTP routes handled above
2658                    }
2659                }
2660            }
2661            drop(services); // Release read lock
2662        }
2663
2664        // Unpublish node-loopback ports for every live replica of this
2665        // service so the loopback listeners are freed (mirror of the
2666        // per-replica unpublish in `ServiceInstance::scale_to`). Gated on the
2667        // spec's policy; recomputes each backend from the container's stored
2668        // runtime-resolved IP and port override.
2669        {
2670            let services = self.services.read().await;
2671            if let Some(instance) = services.get(name) {
2672                if instance.spec.publish_to_node_loopback() {
2673                    if let Some(proxy) = instance.proxy_manager() {
2674                        let containers = instance.containers().read().await;
2675                        for container in containers.values() {
2676                            if let Some(ip) = container.overlay_ip {
2677                                proxy
2678                                    .unpublish_loopback_for_container(
2679                                        &instance.spec,
2680                                        ip,
2681                                        container.port_override,
2682                                    )
2683                                    .await;
2684                            }
2685                        }
2686                    }
2687                }
2688            }
2689            drop(services); // Release read lock
2690        }
2691
2692        // Unregister containers from the supervisor
2693        if let Some(supervisor) = &self.container_supervisor {
2694            let containers = self.get_service_containers(name).await;
2695            for container_id in containers {
2696                supervisor.unsupervise(&container_id).await;
2697            }
2698            tracing::debug!(service = %name, "Unregistered containers from supervisor");
2699        }
2700
2701        // Clean up DNS records for the service (bare name + FQDNs).
2702        self.cleanup_service_dns(name).await;
2703
2704        // Remove from services map (may or may not exist depending on rtype)
2705        let mut services = self.services.write().await;
2706        if services.remove(name).is_some() {
2707            tracing::debug!(service = %name, "Removed service from manager");
2708        }
2709
2710        Ok(())
2711    }
2712
2713    /// Remove every DNS record this service registered on attach: the bare
2714    /// compose service name (`{service}`), the service-level FQDN
2715    /// (`{service}.service.local`), and each replica's FQDN
2716    /// (`{replica}.{service}.service.local`). Best-effort; failures are logged.
2717    async fn cleanup_service_dns(&self, name: &str) {
2718        let Some(dns) = &self.dns_server else {
2719            return;
2720        };
2721
2722        // Bare compose service-name record (compose discovery).
2723        if let Err(e) = dns.remove_record(name).await {
2724            tracing::warn!(
2725                hostname = %name,
2726                error = %e,
2727                "failed to remove bare service-name DNS record"
2728            );
2729        }
2730
2731        // Service-level FQDN.
2732        let service_hostname = format!("{name}.service.local");
2733        if let Err(e) = dns.remove_record(&service_hostname).await {
2734            tracing::warn!(
2735                hostname = %service_hostname,
2736                error = %e,
2737                "failed to remove service DNS record"
2738            );
2739        } else {
2740            tracing::debug!(hostname = %service_hostname, "removed service DNS record");
2741        }
2742
2743        // Any remaining replica-specific FQDNs.
2744        let services = self.services.read().await;
2745        if let Some(instance) = services.get(name) {
2746            let containers = instance.containers().read().await;
2747            for (id, _) in containers.iter() {
2748                let replica_hostname = format!("{}.{}.service.local", id.replica, name);
2749                if let Err(e) = dns.remove_record(&replica_hostname).await {
2750                    tracing::warn!(
2751                        hostname = %replica_hostname,
2752                        error = %e,
2753                        "failed to remove replica DNS record during service removal"
2754                    );
2755                }
2756            }
2757        }
2758    }
2759
2760    /// Introspect service infrastructure wiring.
2761    /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
2762    pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
2763        let services = self.services.read().await;
2764        services.get(name).map(|i| {
2765            (
2766                i.has_overlay_manager(),
2767                i.has_proxy_manager(),
2768                i.has_dns_server(),
2769            )
2770        })
2771    }
2772
2773    /// List all services
2774    pub async fn list_services(&self) -> Vec<String> {
2775        self.services.read().await.keys().cloned().collect()
2776    }
2777
2778    /// Get logs for a service, aggregated from all container replicas.
2779    ///
2780    /// # Arguments
2781    /// * `service_name` - Name of the service to fetch logs for
2782    /// * `tail` - Number of lines to return per container (0 = all)
2783    /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
2784    ///
2785    /// # Errors
2786    /// Returns an error if the service or instance is not found.
2787    ///
2788    /// # Returns
2789    /// Structured log entries from all (or specific) container replicas. Each
2790    /// entry has its `service` and `deployment` fields populated when available.
2791    pub async fn get_service_logs(
2792        &self,
2793        service_name: &str,
2794        tail: usize,
2795        instance: Option<&str>,
2796    ) -> Result<Vec<LogEntry>> {
2797        let container_ids = self.get_service_containers(service_name).await;
2798
2799        if container_ids.is_empty() {
2800            return Err(AgentError::NotFound {
2801                container: service_name.to_string(),
2802                reason: "no containers found for service".to_string(),
2803            });
2804        }
2805
2806        // If a specific instance is requested, filter to just that one
2807        let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
2808            if let Ok(replica_num) = inst.parse::<u32>() {
2809                container_ids
2810                    .iter()
2811                    .filter(|id| id.replica == replica_num)
2812                    .collect()
2813            } else {
2814                // Try matching by full container ID string suffix
2815                container_ids
2816                    .iter()
2817                    .filter(|id| id.to_string().contains(inst))
2818                    .collect()
2819            }
2820        } else {
2821            container_ids.iter().collect()
2822        };
2823
2824        if target_ids.is_empty() {
2825            return Err(AgentError::NotFound {
2826                container: format!("{}/{}", service_name, instance.unwrap_or("?")),
2827                reason: "instance not found".to_string(),
2828            });
2829        }
2830
2831        let mut all_entries: Vec<LogEntry> = Vec::new();
2832
2833        for id in &target_ids {
2834            match self.runtime.container_logs(id, tail).await {
2835                Ok(mut entries) => {
2836                    // Populate service and deployment metadata on each entry
2837                    for entry in &mut entries {
2838                        if entry.service.is_none() {
2839                            entry.service = Some(service_name.to_string());
2840                        }
2841                        if entry.deployment.is_none() {
2842                            entry.deployment.clone_from(&self.deployment_name);
2843                        }
2844                    }
2845                    all_entries.extend(entries);
2846                }
2847                Err(e) => {
2848                    tracing::warn!(
2849                        service = service_name,
2850                        container = %id,
2851                        error = %e,
2852                        "Failed to read container logs"
2853                    );
2854                }
2855            }
2856        }
2857
2858        Ok(all_entries)
2859    }
2860
2861    /// Get all container IDs for a specific service
2862    ///
2863    /// Returns an empty vector if the service doesn't exist.
2864    ///
2865    /// # Arguments
2866    /// * `service_name` - Name of the service to query
2867    ///
2868    /// # Returns
2869    /// Vector of `ContainerIds` for all replicas of the service
2870    pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
2871        let services = self.services.read().await;
2872        if let Some(instance) = services.get(service_name) {
2873            instance.container_ids().await
2874        } else {
2875            Vec::new()
2876        }
2877    }
2878
2879    /// Get per-container info (id, image, real state, pid, overlay IP) for a
2880    /// specific service.
2881    ///
2882    /// Unlike [`get_service_containers`](Self::get_service_containers) (which
2883    /// returns ids only), this surfaces the REAL image reference and lifecycle
2884    /// state of each live container so the API/`ps` can report them accurately.
2885    ///
2886    /// Returns an empty vector if the service doesn't exist.
2887    pub async fn get_service_container_infos(&self, service_name: &str) -> Vec<ContainerInfo> {
2888        let services = self.services.read().await;
2889        if let Some(instance) = services.get(service_name) {
2890            instance.container_infos().await
2891        } else {
2892            Vec::new()
2893        }
2894    }
2895
2896    /// This node's **local** view of `service` (running replica count, health,
2897    /// containers), used for cluster-wide aggregation. Served by the internal
2898    /// `/api/v1/internal/services/{svc}/state` endpoint and used as the local
2899    /// part of [`Self::cluster_service_states`].
2900    pub async fn local_service_state(
2901        &self,
2902        service: &str,
2903    ) -> zlayer_types::cluster::NodeServiceState {
2904        use zlayer_types::cluster::{ClusterContainerSummary, NodeServiceState};
2905        let node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2906        let infos = self.get_service_container_infos(service).await;
2907        #[allow(clippy::cast_possible_truncation)]
2908        let running = infos
2909            .iter()
2910            .filter(|i| i.state.eq_ignore_ascii_case("running"))
2911            .count() as u32;
2912        // A node running 0 replicas is trivially healthy (it can't drag the
2913        // cluster-wide aggregate). Otherwise require a Healthy health state.
2914        let healthy = if running == 0 {
2915            true
2916        } else {
2917            let states = self.health_states();
2918            let guard = states.read().await;
2919            matches!(guard.get(service), Some(HealthState::Healthy))
2920        };
2921        let containers = infos
2922            .into_iter()
2923            .map(|i| ClusterContainerSummary {
2924                node_id,
2925                id: i.id.to_string(),
2926                service: i.id.service.clone(),
2927                replica: i.id.replica,
2928                image: i.image,
2929                state: i.state,
2930                pid: i.pid,
2931                overlay_ip: i.overlay_ip,
2932            })
2933            .collect();
2934        NodeServiceState {
2935            node_id,
2936            running,
2937            healthy,
2938            containers,
2939        }
2940    }
2941
2942    /// Cluster-wide per-node states for `service`: this node's local view plus
2943    /// every other node's (fetched via the cluster handle's
2944    /// `fetch_remote_service_states`). When not clustered, returns just the
2945    /// local view. This is the source of truth for distributed-service replica
2946    /// counts, health, and the `ps` container listing on the leader.
2947    pub async fn cluster_service_states(
2948        &self,
2949        service: &str,
2950    ) -> Vec<zlayer_types::cluster::NodeServiceState> {
2951        let mut states = vec![self.local_service_state(service).await];
2952        if let Some(cluster) = &self.cluster {
2953            states.extend(cluster.fetch_remote_service_states(service).await);
2954        }
2955        states
2956    }
2957
2958    /// Execute a command inside a running container for a service
2959    ///
2960    /// Picks a specific replica if provided, otherwise uses the first available container.
2961    ///
2962    /// # Arguments
2963    /// * `service_name` - Name of the service
2964    /// * `replica` - Optional replica number to target
2965    /// * `cmd` - Command and arguments to execute
2966    ///
2967    /// # Errors
2968    /// Returns an error if the service or replica is not found, or if exec fails.
2969    ///
2970    /// # Panics
2971    /// Panics if no replica is specified and the container list is unexpectedly empty
2972    /// after the emptiness check (should not happen in practice).
2973    ///
2974    /// # Returns
2975    /// Tuple of (`exit_code`, stdout, stderr)
2976    pub async fn exec_in_container(
2977        &self,
2978        service_name: &str,
2979        replica: Option<u32>,
2980        cmd: &[String],
2981    ) -> Result<(i32, String, String)> {
2982        let container_ids = self.get_service_containers(service_name).await;
2983
2984        if container_ids.is_empty() {
2985            return Err(AgentError::NotFound {
2986                container: service_name.to_string(),
2987                reason: "no containers found for service".to_string(),
2988            });
2989        }
2990
2991        // Pick the target container
2992        let target = if let Some(rep) = replica {
2993            container_ids
2994                .into_iter()
2995                .find(|cid| cid.replica == rep)
2996                .ok_or_else(|| AgentError::NotFound {
2997                    container: format!("{service_name}-rep-{rep}"),
2998                    reason: format!("replica {rep} not found for service"),
2999                })?
3000        } else {
3001            // Use the first container (lowest replica number)
3002            container_ids.into_iter().next().unwrap()
3003        };
3004
3005        self.runtime.exec(&target, cmd).await
3006    }
3007
3008    /// List every live container across all services, enriched with the data a
3009    /// Docker `ps` row needs and the data the Docker-name resolver needs.
3010    ///
3011    /// For each running container this surfaces the deployment name, the service
3012    /// name, the concrete [`ContainerId`], the compose `container_name:` label
3013    /// (when set, the user-facing Docker name), the real image, the lifecycle
3014    /// state, and the service's published port mappings. Used by the unified
3015    /// container-name resolver and by `docker ps` so compose deployments show up
3016    /// and resolve by their `container_name`.
3017    pub async fn list_container_views(&self) -> Vec<DeploymentContainerView> {
3018        let deployment = self.deployment_name.clone();
3019        let services = self.services.read().await;
3020        let mut out = Vec::new();
3021        for (service_name, instance) in services.iter() {
3022            let container_name = instance
3023                .spec
3024                .labels
3025                .get("com.docker.compose.container_name")
3026                .cloned();
3027            let ports = instance.spec.port_mappings.clone();
3028            for info in instance.container_infos().await {
3029                out.push(DeploymentContainerView {
3030                    deployment: deployment.clone(),
3031                    service: service_name.clone(),
3032                    container_id: info.id,
3033                    container_name: container_name.clone(),
3034                    image: info.image,
3035                    state: info.state,
3036                    pid: info.pid,
3037                    ports: ports.clone(),
3038                });
3039            }
3040        }
3041        out
3042    }
3043
3044    /// Resolve a Docker-style container name/id to a live deployment
3045    /// [`ContainerId`].
3046    ///
3047    /// Matching precedence (first hit wins):
3048    /// 1. The compose `container_name:` label (e.g. `forgejo-e2e`).
3049    /// 2. The conventional compose names `{deployment}-{service}-{replica}` and
3050    ///    `{deployment}_{service}_{replica}` (replica is 1-based, mirroring
3051    ///    Docker Compose; `ContainerId.replica` is 0-based so we add 1).
3052    /// 3. The bare service name (`{service}`), targeting its first replica.
3053    /// 4. The [`ContainerId`] `Display` form.
3054    ///
3055    /// Returns `None` when nothing matches a *running* container.
3056    pub async fn resolve_container_name(&self, name: &str) -> Option<ContainerId> {
3057        let views = self.list_container_views().await;
3058        // 1. explicit container_name label.
3059        if let Some(v) = views
3060            .iter()
3061            .find(|v| v.container_name.as_deref() == Some(name))
3062        {
3063            return Some(v.container_id.clone());
3064        }
3065        // 2 & 3. conventional names + bare service name.
3066        for v in &views {
3067            let dep = v.deployment.as_deref().unwrap_or("");
3068            let svc = &v.service;
3069            let rep1 = v.container_id.replica + 1;
3070            let candidates = [
3071                format!("{dep}-{svc}-{rep1}"),
3072                format!("{dep}_{svc}_{rep1}"),
3073                svc.clone(),
3074            ];
3075            if candidates.iter().any(|c| c == name) {
3076                return Some(v.container_id.clone());
3077            }
3078        }
3079        // 4. ContainerId Display form.
3080        for v in &views {
3081            if v.container_id.to_string() == name {
3082                return Some(v.container_id.clone());
3083            }
3084        }
3085        None
3086    }
3087
3088    /// Execute a command in a specific deployment container (by its concrete
3089    /// [`ContainerId`]) honouring Docker `exec` options (`--user`, `-w`, `-e`).
3090    ///
3091    /// Routes through [`Runtime::exec_with_opts`] so runtimes that support
3092    /// dropping to a uid/gid + chdir + env injection (macOS VZ-Linux) apply
3093    /// them; others fall back to a plain buffered exec.
3094    ///
3095    /// # Errors
3096    /// Propagates the runtime's exec error.
3097    pub async fn exec_in_container_id_with_opts(
3098        &self,
3099        id: &ContainerId,
3100        opts: &crate::runtime::ExecOptions,
3101    ) -> Result<(i32, String, String)> {
3102        self.runtime.exec_with_opts(id, opts).await
3103    }
3104
3105    // ==================== Job Management ====================
3106
3107    /// Trigger a job execution
3108    ///
3109    /// # Arguments
3110    /// * `name` - Name of the registered job
3111    /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
3112    ///
3113    /// # Returns
3114    /// The execution ID for tracking the job
3115    ///
3116    /// # Errors
3117    /// - Returns error if job executor is not configured
3118    /// - Returns error if the job is not registered
3119    pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
3120        let executor = self
3121            .job_executor
3122            .as_ref()
3123            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
3124
3125        let spec = executor
3126            .get_job_spec(name)
3127            .await
3128            .ok_or_else(|| AgentError::NotFound {
3129                container: name.to_string(),
3130                reason: "job not registered".to_string(),
3131            })?;
3132
3133        executor.trigger(name, &spec, trigger).await
3134    }
3135
3136    /// Get the status of a job execution
3137    ///
3138    /// # Arguments
3139    /// * `id` - The execution ID returned from `trigger_job`
3140    ///
3141    /// # Returns
3142    /// The job execution details, or None if not found
3143    pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
3144        if let Some(executor) = &self.job_executor {
3145            executor.get_execution(id).await
3146        } else {
3147            None
3148        }
3149    }
3150
3151    /// List all executions for a specific job
3152    ///
3153    /// # Arguments
3154    /// * `name` - Name of the job
3155    ///
3156    /// # Returns
3157    /// Vector of job executions for the specified job
3158    pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
3159        if let Some(executor) = &self.job_executor {
3160            executor.list_executions(name).await
3161        } else {
3162            Vec::new()
3163        }
3164    }
3165
3166    /// Cancel a running job execution
3167    ///
3168    /// # Arguments
3169    /// * `id` - The execution ID to cancel
3170    ///
3171    /// # Errors
3172    /// Returns error if job executor is not configured or if cancellation fails
3173    pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
3174        let executor = self
3175            .job_executor
3176            .as_ref()
3177            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
3178
3179        executor.cancel(id).await
3180    }
3181
3182    // ==================== Cron Management ====================
3183
3184    /// Manually trigger a cron job (outside of its schedule)
3185    ///
3186    /// # Arguments
3187    /// * `name` - Name of the cron job
3188    ///
3189    /// # Returns
3190    /// The execution ID for tracking the triggered job
3191    ///
3192    /// # Errors
3193    /// Returns error if cron scheduler is not configured or job not found
3194    pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
3195        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
3196            AgentError::Configuration("Cron scheduler not configured".to_string())
3197        })?;
3198
3199        scheduler.trigger_now(name).await
3200    }
3201
3202    /// Enable or disable a cron job
3203    ///
3204    /// # Arguments
3205    /// * `name` - Name of the cron job
3206    /// * `enabled` - Whether to enable or disable the job
3207    pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
3208        if let Some(scheduler) = &self.cron_scheduler {
3209            scheduler.set_enabled(name, enabled).await;
3210        }
3211    }
3212
3213    /// List all registered cron jobs
3214    pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
3215        if let Some(scheduler) = &self.cron_scheduler {
3216            scheduler.list_jobs().await
3217        } else {
3218            Vec::new()
3219        }
3220    }
3221
3222    /// Start the cron scheduler background task
3223    ///
3224    /// This spawns a background task that checks for due cron jobs every second.
3225    /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
3226    ///
3227    /// # Errors
3228    /// Returns error if cron scheduler is not configured
3229    pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
3230        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
3231            AgentError::Configuration("Cron scheduler not configured".to_string())
3232        })?;
3233
3234        let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
3235        Ok(tokio::spawn(async move {
3236            scheduler.run_loop().await;
3237        }))
3238    }
3239
3240    /// Shutdown the cron scheduler
3241    pub fn shutdown_cron(&self) {
3242        if let Some(scheduler) = &self.cron_scheduler {
3243            scheduler.shutdown();
3244        }
3245    }
3246}
3247
3248#[cfg(test)]
3249#[allow(deprecated)]
3250mod tests {
3251    use super::*;
3252    use crate::runtime::MockRuntime;
3253
3254    #[tokio::test]
3255    async fn test_service_manager() {
3256        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3257        let manager = ServiceManager::new(runtime);
3258
3259        // Add service
3260        let spec = mock_spec();
3261        Box::pin(manager.upsert_service("test".to_string(), spec))
3262            .await
3263            .unwrap();
3264
3265        // Scale up
3266        manager.scale_service("test", 3).await.unwrap();
3267
3268        // Check count
3269        let count = manager.service_replica_count("test").await.unwrap();
3270        assert_eq!(count, 3);
3271
3272        // List services
3273        let services = manager.list_services().await;
3274        assert_eq!(services, vec!["test".to_string()]);
3275    }
3276
3277    #[tokio::test]
3278    async fn test_service_manager_basic_lifecycle() {
3279        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3280        let manager = ServiceManager::new(runtime);
3281
3282        // Add service with HTTP endpoint
3283        let spec = mock_spec();
3284        Box::pin(manager.upsert_service("api".to_string(), spec))
3285            .await
3286            .unwrap();
3287
3288        // Scale up
3289        manager.scale_service("api", 2).await.unwrap();
3290
3291        // Check count
3292        let count = manager.service_replica_count("api").await.unwrap();
3293        assert_eq!(count, 2);
3294
3295        // Remove service
3296        manager.remove_service("api").await.unwrap();
3297
3298        // Verify service is gone
3299        let services = manager.list_services().await;
3300        assert!(!services.contains(&"api".to_string()));
3301    }
3302
3303    #[tokio::test]
3304    async fn test_service_manager_with_full_config() {
3305        use tokio::sync::RwLock;
3306
3307        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3308
3309        // Create a mock overlay manager (skip actual network setup)
3310        let overlay_manager = Arc::new(RwLock::new(
3311            OverlayManager::new("test-deployment".to_string(), "test".to_string())
3312                .await
3313                .unwrap(),
3314        ));
3315
3316        let manager =
3317            ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
3318
3319        // Add service
3320        let spec = mock_spec();
3321        Box::pin(manager.upsert_service("web".to_string(), spec))
3322            .await
3323            .unwrap();
3324
3325        // Verify service is registered
3326        let services = manager.list_services().await;
3327        assert!(services.contains(&"web".to_string()));
3328    }
3329
3330    #[test]
3331    fn test_container_state_as_str() {
3332        use crate::runtime::ContainerState;
3333        assert_eq!(ContainerState::Pending.as_str(), "pending");
3334        assert_eq!(ContainerState::Initializing.as_str(), "initializing");
3335        assert_eq!(ContainerState::Running.as_str(), "running");
3336        assert_eq!(ContainerState::Stopping.as_str(), "stopping");
3337        assert_eq!(ContainerState::Exited { code: 0 }.as_str(), "exited");
3338        assert_eq!(
3339            ContainerState::Failed {
3340                reason: "boom".to_string()
3341            }
3342            .as_str(),
3343            "failed"
3344        );
3345        // Display delegates to as_str.
3346        assert_eq!(ContainerState::Running.to_string(), "running");
3347    }
3348
3349    /// A container created from image X must report image X and its real
3350    /// lifecycle state through the new `container_infos` accessor, replacing
3351    /// the previously hardcoded `"running"` / empty-image behavior.
3352    #[tokio::test]
3353    async fn test_container_infos_surfaces_image_and_state() {
3354        use crate::runtime::{Container, ContainerState};
3355
3356        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3357        let manager = ServiceManager::new(runtime);
3358
3359        let spec = mock_spec(); // image name = "test:latest"
3360        let image = spec.image.name.to_string();
3361        Box::pin(manager.upsert_service("web".to_string(), spec))
3362            .await
3363            .unwrap();
3364
3365        // Inject containers directly with distinct states.
3366        {
3367            let services = manager.services.read().await;
3368            let instance = services.get("web").unwrap();
3369            let mut containers = instance.containers().write().await;
3370
3371            let running_id = ContainerId::new("web", 0);
3372            containers.insert(
3373                running_id.clone(),
3374                Container {
3375                    id: running_id,
3376                    image: image.clone(),
3377                    state: ContainerState::Running,
3378                    pid: Some(4242),
3379                    task: None,
3380                    overlay_ip: None,
3381                    health_monitor: None,
3382                    port_override: None,
3383                },
3384            );
3385
3386            let exited_id = ContainerId::new("web", 1);
3387            containers.insert(
3388                exited_id.clone(),
3389                Container {
3390                    id: exited_id,
3391                    image: image.clone(),
3392                    state: ContainerState::Exited { code: 1 },
3393                    pid: None,
3394                    task: None,
3395                    overlay_ip: None,
3396                    health_monitor: None,
3397                    port_override: None,
3398                },
3399            );
3400        }
3401
3402        let mut infos = manager.get_service_container_infos("web").await;
3403        infos.sort_by_key(|i| i.id.replica);
3404        assert_eq!(infos.len(), 2);
3405
3406        // Every container reports the real image it was created from.
3407        assert!(infos.iter().all(|i| i.image == image));
3408        assert!(infos.iter().all(|i| i.image == "test:latest"));
3409
3410        // Real per-container state is surfaced (not a hardcoded "running").
3411        assert_eq!(infos[0].state, "running");
3412        assert_eq!(infos[0].pid, Some(4242));
3413        assert_eq!(infos[1].state, "exited");
3414        assert_eq!(infos[1].pid, None);
3415
3416        // Unknown service yields an empty list.
3417        assert!(manager
3418            .get_service_container_infos("missing")
3419            .await
3420            .is_empty());
3421    }
3422
3423    /// Bug 2 (`cluster_upgrade`): a changed image *reference* (tag bump) under
3424    /// `if_not_present` must still recreate the local replicas. Previously the
3425    /// recreate only fired on digest drift under `Always`/`Newer`, so a tag
3426    /// change was silently ignored and containers stayed on the old image.
3427    #[tokio::test]
3428    async fn upsert_recreates_local_replicas_on_image_reference_change() {
3429        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3430        let manager = ServiceManager::new(runtime);
3431
3432        // Deploy v1 with the e2e's pull policy (if_not_present) and scale up.
3433        let mut spec = mock_spec();
3434        spec.image.name = "docker.io/library/nginx:1.28-alpine".parse().unwrap();
3435        spec.image.pull_policy = zlayer_spec::PullPolicy::IfNotPresent;
3436        Box::pin(manager.upsert_service("web".to_string(), spec.clone()))
3437            .await
3438            .unwrap();
3439        manager.scale_service_local("web", 2).await.unwrap();
3440
3441        let v1: Vec<String> = manager
3442            .get_service_container_infos("web")
3443            .await
3444            .into_iter()
3445            .map(|i| i.image)
3446            .collect();
3447        assert_eq!(v1.len(), 2);
3448        assert!(
3449            v1.iter().all(|img| img.contains("1.28")),
3450            "expected v1 images, got {v1:?}"
3451        );
3452
3453        // Upgrade to v2 under the SAME if_not_present policy.
3454        let mut spec_v2 = spec;
3455        spec_v2.image.name = "docker.io/library/nginx:1.29-alpine".parse().unwrap();
3456        Box::pin(manager.upsert_service("web".to_string(), spec_v2))
3457            .await
3458            .unwrap();
3459
3460        let v2: Vec<String> = manager
3461            .get_service_container_infos("web")
3462            .await
3463            .into_iter()
3464            .map(|i| i.image)
3465            .collect();
3466        assert_eq!(v2.len(), 2, "replica count preserved across upgrade");
3467        assert!(
3468            v2.iter().all(|img| img.contains("1.29")),
3469            "containers must be recreated on the new image, got {v2:?}"
3470        );
3471    }
3472
3473    fn mock_spec() -> ServiceSpec {
3474        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3475            r"
3476version: v1
3477deployment: test
3478services:
3479  test:
3480    rtype: service
3481    image:
3482      name: test:latest
3483    endpoints:
3484      - name: http
3485        protocol: http
3486        port: 8080
3487    scale:
3488      mode: fixed
3489      replicas: 1
3490",
3491        )
3492        .unwrap()
3493        .services
3494        .remove("test")
3495        .unwrap()
3496    }
3497
3498    #[test]
3499    fn test_set_container_dns_injects_when_empty() {
3500        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3501        let spec = mock_spec(); // spec.dns defaults to empty, host_network false
3502        let mut instance =
3503            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3504        instance.set_container_dns("10.42.0.1".parse().unwrap());
3505        assert_eq!(instance.spec.dns, vec!["10.42.0.1".to_string()]);
3506    }
3507
3508    #[test]
3509    fn test_set_container_dns_skips_host_network() {
3510        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3511        let mut spec = mock_spec();
3512        spec.host_network = true;
3513        let mut instance =
3514            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3515        instance.set_container_dns("10.42.0.1".parse().unwrap());
3516        assert!(
3517            instance.spec.dns.is_empty(),
3518            "host_network containers must inherit the host resolv.conf"
3519        );
3520    }
3521
3522    #[test]
3523    fn test_set_container_dns_preserves_user_dns() {
3524        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3525        let mut spec = mock_spec();
3526        spec.dns = vec!["1.1.1.1".to_string()];
3527        let mut instance =
3528            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3529        instance.set_container_dns("10.42.0.1".parse().unwrap());
3530        assert_eq!(
3531            instance.spec.dns,
3532            vec!["1.1.1.1".to_string()],
3533            "user-supplied spec.dns must win over the overlay resolver"
3534        );
3535    }
3536
3537    /// Helper to create a `ServiceSpec` with dependencies
3538    fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
3539        let mut spec = mock_spec();
3540        spec.depends = deps;
3541        spec
3542    }
3543
3544    /// Helper to create a `DependsSpec`
3545    fn dep(
3546        service: &str,
3547        condition: zlayer_spec::DependencyCondition,
3548        timeout_ms: u64,
3549        on_timeout: zlayer_spec::TimeoutAction,
3550    ) -> DependsSpec {
3551        DependsSpec {
3552            service: service.to_string(),
3553            condition,
3554            timeout: Some(Duration::from_millis(timeout_ms)),
3555            on_timeout,
3556        }
3557    }
3558
3559    #[tokio::test]
3560    async fn test_deploy_with_dependencies_no_deps() {
3561        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3562        let manager = ServiceManager::new(runtime);
3563
3564        // Services with no dependencies
3565        let mut services = HashMap::new();
3566        services.insert("a".to_string(), mock_spec());
3567        services.insert("b".to_string(), mock_spec());
3568
3569        // Should deploy both without issue
3570        Box::pin(manager.deploy_with_dependencies(services))
3571            .await
3572            .unwrap();
3573
3574        // Both services should be registered
3575        let service_list = manager.list_services().await;
3576        assert_eq!(service_list.len(), 2);
3577    }
3578
3579    #[tokio::test]
3580    async fn test_deploy_with_dependencies_linear() {
3581        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3582        let manager = ServiceManager::new(runtime);
3583
3584        // A -> B -> C (A depends on B, B depends on C)
3585        // All use "started" condition which is satisfied when container is running
3586        let mut services = HashMap::new();
3587        services.insert("c".to_string(), mock_spec());
3588        services.insert(
3589            "b".to_string(),
3590            mock_spec_with_deps(vec![dep(
3591                "c",
3592                zlayer_spec::DependencyCondition::Started,
3593                5000,
3594                zlayer_spec::TimeoutAction::Fail,
3595            )]),
3596        );
3597        services.insert(
3598            "a".to_string(),
3599            mock_spec_with_deps(vec![dep(
3600                "b",
3601                zlayer_spec::DependencyCondition::Started,
3602                5000,
3603                zlayer_spec::TimeoutAction::Fail,
3604            )]),
3605        );
3606
3607        // Should deploy in order: c, b, a
3608        Box::pin(manager.deploy_with_dependencies(services))
3609            .await
3610            .unwrap();
3611
3612        // All services should be registered
3613        let service_list = manager.list_services().await;
3614        assert_eq!(service_list.len(), 3);
3615    }
3616
3617    #[tokio::test]
3618    async fn test_deploy_with_dependencies_cycle_detection() {
3619        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3620        let manager = ServiceManager::new(runtime);
3621
3622        // A -> B -> A (cycle)
3623        let mut services = HashMap::new();
3624        services.insert(
3625            "a".to_string(),
3626            mock_spec_with_deps(vec![dep(
3627                "b",
3628                zlayer_spec::DependencyCondition::Started,
3629                5000,
3630                zlayer_spec::TimeoutAction::Fail,
3631            )]),
3632        );
3633        services.insert(
3634            "b".to_string(),
3635            mock_spec_with_deps(vec![dep(
3636                "a",
3637                zlayer_spec::DependencyCondition::Started,
3638                5000,
3639                zlayer_spec::TimeoutAction::Fail,
3640            )]),
3641        );
3642
3643        // Should fail with cycle detection
3644        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3645        assert!(result.is_err());
3646        let err = result.unwrap_err().to_string();
3647        assert!(err.contains("Cyclic dependency"));
3648    }
3649
3650    #[tokio::test]
3651    async fn test_deploy_with_dependencies_timeout_continue() {
3652        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3653        let manager = ServiceManager::new(runtime);
3654
3655        // A depends on B (healthy), but B never becomes healthy
3656        // Using continue action, so it should proceed anyway
3657        let mut services = HashMap::new();
3658        services.insert("b".to_string(), mock_spec());
3659        services.insert(
3660            "a".to_string(),
3661            mock_spec_with_deps(vec![dep(
3662                "b",
3663                zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
3664                100,                                       // Short timeout
3665                zlayer_spec::TimeoutAction::Continue,      // But continue anyway
3666            )]),
3667        );
3668
3669        // Should deploy both despite timeout
3670        Box::pin(manager.deploy_with_dependencies(services))
3671            .await
3672            .unwrap();
3673
3674        let service_list = manager.list_services().await;
3675        assert_eq!(service_list.len(), 2);
3676    }
3677
3678    #[tokio::test]
3679    async fn test_deploy_with_dependencies_timeout_warn() {
3680        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3681        let manager = ServiceManager::new(runtime);
3682
3683        // A depends on B (healthy), but B never becomes healthy
3684        // Using warn action, so it should proceed with a warning
3685        let mut services = HashMap::new();
3686        services.insert("b".to_string(), mock_spec());
3687        services.insert(
3688            "a".to_string(),
3689            mock_spec_with_deps(vec![dep(
3690                "b",
3691                zlayer_spec::DependencyCondition::Healthy,
3692                100,
3693                zlayer_spec::TimeoutAction::Warn,
3694            )]),
3695        );
3696
3697        // Should deploy both despite timeout (with warning)
3698        Box::pin(manager.deploy_with_dependencies(services))
3699            .await
3700            .unwrap();
3701
3702        let service_list = manager.list_services().await;
3703        assert_eq!(service_list.len(), 2);
3704    }
3705
3706    #[tokio::test]
3707    async fn test_deploy_with_dependencies_timeout_fail() {
3708        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3709        let manager = ServiceManager::new(runtime);
3710
3711        // A depends on B (healthy), but B never becomes healthy
3712        // Using fail action, so deployment should fail
3713        let mut services = HashMap::new();
3714        services.insert("b".to_string(), mock_spec());
3715        services.insert(
3716            "a".to_string(),
3717            mock_spec_with_deps(vec![dep(
3718                "b",
3719                zlayer_spec::DependencyCondition::Healthy,
3720                100,
3721                zlayer_spec::TimeoutAction::Fail,
3722            )]),
3723        );
3724
3725        // Should fail after B is started but doesn't become healthy
3726        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3727        assert!(result.is_err());
3728
3729        // B should be started (it has no deps), but A should fail
3730        let err = result.unwrap_err().to_string();
3731        assert!(err.contains("Dependency timeout"));
3732    }
3733
3734    #[tokio::test]
3735    async fn test_check_dependencies_all_satisfied() {
3736        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3737        let manager = ServiceManager::new(runtime);
3738
3739        // Mark a service as healthy
3740        manager
3741            .update_health_state("db", HealthState::Healthy)
3742            .await;
3743
3744        let deps = vec![DependsSpec {
3745            service: "db".to_string(),
3746            condition: zlayer_spec::DependencyCondition::Healthy,
3747            timeout: Some(Duration::from_secs(60)),
3748            on_timeout: zlayer_spec::TimeoutAction::Fail,
3749        }];
3750
3751        let satisfied = manager.check_dependencies(&deps).await.unwrap();
3752        assert!(satisfied);
3753    }
3754
3755    #[tokio::test]
3756    async fn test_check_dependencies_not_satisfied() {
3757        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3758        let manager = ServiceManager::new(runtime);
3759
3760        // Service not healthy (no state set = Unknown)
3761        let deps = vec![DependsSpec {
3762            service: "db".to_string(),
3763            condition: zlayer_spec::DependencyCondition::Healthy,
3764            timeout: Some(Duration::from_secs(60)),
3765            on_timeout: zlayer_spec::TimeoutAction::Fail,
3766        }];
3767
3768        let satisfied = manager.check_dependencies(&deps).await.unwrap();
3769        assert!(!satisfied);
3770    }
3771
3772    #[tokio::test]
3773    async fn test_health_state_tracking() {
3774        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3775        let manager = ServiceManager::new(runtime);
3776
3777        // Update health states
3778        manager
3779            .update_health_state("db", HealthState::Healthy)
3780            .await;
3781        manager
3782            .update_health_state("cache", HealthState::Unknown)
3783            .await;
3784
3785        // Verify states
3786        let states = manager.health_states();
3787        let states_read = states.read().await;
3788
3789        assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
3790        assert!(matches!(
3791            states_read.get("cache"),
3792            Some(HealthState::Unknown)
3793        ));
3794    }
3795
3796    /// Regression test for the stabilization timeout that blocked the raft-e2e
3797    /// `cluster_scaling` / `cluster_upgrade` suites.
3798    ///
3799    /// Previously the callback that bridges a container's health result into the
3800    /// `ServiceManager` `health_states` map was only attached when BOTH a proxy
3801    /// manager AND a reachable overlay IP existed. In degraded-overlay / no-proxy
3802    /// deployments that `if let` was false, so `health_states` was never written,
3803    /// the service stayed `healthy=false` forever, and stabilization timed out
3804    /// even though the container was running and its health check passing.
3805    ///
3806    /// This test drives the real `scale_to` create path with:
3807    ///   * NO `proxy_manager` (so `proxy_backend` resolves to None), and
3808    ///   * a `Command { command: "true" }` health check (always passes host-side),
3809    /// then asserts the shared `health_states` map receives `Healthy` for the
3810    /// service — proving the bridge fires unconditionally.
3811    ///
3812    /// Gated to `#[cfg(unix)]` because `HealthCheck::Command` is executed via
3813    /// `sh -c <command>` in `crate::health::HealthChecker::check_command`. On
3814    /// Windows hosts without `sh` on PATH (the default Windows CI image), no
3815    /// Command-based health check can ever pass, so the test would fail for
3816    /// reasons unrelated to the bridge it is regression-testing. The bridge
3817    /// behavior under test is platform-agnostic; only the test fixture's
3818    /// "always-passes command" needs a Unix shell.
3819    #[cfg(unix)]
3820    #[tokio::test]
3821    async fn test_health_states_bridge_fires_without_proxy() {
3822        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3823
3824        // Service spec with a host-side command health check that always passes.
3825        // Zero start-grace + a short interval keep the test fast.
3826        let mut spec = mock_spec();
3827        spec.health = zlayer_spec::HealthSpec {
3828            start_grace: Some(Duration::from_millis(0)),
3829            interval: Some(Duration::from_millis(50)),
3830            timeout: Some(Duration::from_secs(5)),
3831            retries: 1,
3832            check: HealthCheck::Command {
3833                command: "true".to_string(),
3834            },
3835        };
3836
3837        // Build a ServiceInstance with NO proxy_manager and NO overlay_manager,
3838        // then wire in the shared health_states map exactly as ServiceManager does.
3839        let mut instance =
3840            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3841        let health_states: Arc<RwLock<HashMap<String, HealthState>>> =
3842            Arc::new(RwLock::new(HashMap::new()));
3843        instance.set_health_states(Arc::clone(&health_states));
3844
3845        // Drive the real create path (no proxy, MockRuntime IP present but proxy
3846        // absent => proxy_backend is None, hitting the previously-broken branch).
3847        instance.scale_to(1).await.unwrap();
3848
3849        // Poll for the bridged Healthy state (the monitor checks asynchronously
3850        // after its start grace). Bounded so a regression fails fast.
3851        let mut bridged = false;
3852        for _ in 0..100 {
3853            if matches!(
3854                health_states.read().await.get("web"),
3855                Some(HealthState::Healthy)
3856            ) {
3857                bridged = true;
3858                break;
3859            }
3860            tokio::time::sleep(Duration::from_millis(50)).await;
3861        }
3862
3863        assert!(
3864            bridged,
3865            "health_states must receive Healthy for the service even without a \
3866             proxy or overlay IP; the bridge regressed and stabilization would time out"
3867        );
3868    }
3869
3870    // ==================== Job/Cron Integration Tests ====================
3871
3872    fn mock_job_spec() -> ServiceSpec {
3873        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3874            r"
3875version: v1
3876deployment: test
3877services:
3878  backup:
3879    rtype: job
3880    image:
3881      name: backup:latest
3882",
3883        )
3884        .unwrap()
3885        .services
3886        .remove("backup")
3887        .unwrap()
3888    }
3889
3890    fn mock_cron_spec() -> ServiceSpec {
3891        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3892            r#"
3893version: v1
3894deployment: test
3895services:
3896  cleanup:
3897    rtype: cron
3898    schedule: "0 0 * * * * *"
3899    image:
3900      name: cleanup:latest
3901"#,
3902        )
3903        .unwrap()
3904        .services
3905        .remove("cleanup")
3906        .unwrap()
3907    }
3908
3909    #[tokio::test]
3910    async fn test_service_manager_with_job_executor() {
3911        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3912        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3913
3914        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3915
3916        // Register job
3917        let job_spec = mock_job_spec();
3918        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3919            .await
3920            .unwrap();
3921
3922        // Trigger job
3923        let exec_id = manager
3924            .trigger_job("backup", JobTrigger::Cli)
3925            .await
3926            .unwrap();
3927
3928        // Give job time to start
3929        tokio::time::sleep(Duration::from_millis(50)).await;
3930
3931        // Check execution exists
3932        let execution = manager.get_job_execution(&exec_id).await;
3933        assert!(execution.is_some());
3934        assert_eq!(execution.unwrap().job_name, "backup");
3935    }
3936
3937    #[tokio::test]
3938    async fn test_service_manager_with_cron_scheduler() {
3939        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3940        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3941        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3942
3943        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3944
3945        // Register cron job
3946        let cron_spec = mock_cron_spec();
3947        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3948            .await
3949            .unwrap();
3950
3951        // List cron jobs
3952        let cron_jobs = manager.list_cron_jobs().await;
3953        assert_eq!(cron_jobs.len(), 1);
3954        assert_eq!(cron_jobs[0].name, "cleanup");
3955        assert!(cron_jobs[0].enabled);
3956    }
3957
3958    #[tokio::test]
3959    async fn test_service_manager_trigger_cron() {
3960        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3961        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3962        let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
3963
3964        let manager = ServiceManager::new(runtime)
3965            .with_job_executor(job_executor)
3966            .with_cron_scheduler(cron_scheduler);
3967
3968        // Register cron job
3969        let cron_spec = mock_cron_spec();
3970        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3971            .await
3972            .unwrap();
3973
3974        // Manually trigger the cron job
3975        let exec_id = manager.trigger_cron("cleanup").await.unwrap();
3976        assert!(!exec_id.0.is_empty());
3977    }
3978
3979    #[tokio::test]
3980    async fn test_service_manager_enable_disable_cron() {
3981        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3982        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3983        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3984
3985        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3986
3987        // Register cron job
3988        let cron_spec = mock_cron_spec();
3989        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3990            .await
3991            .unwrap();
3992
3993        // Initially enabled
3994        let cron_jobs = manager.list_cron_jobs().await;
3995        assert!(cron_jobs[0].enabled);
3996
3997        // Disable
3998        manager.set_cron_enabled("cleanup", false).await;
3999        let cron_jobs = manager.list_cron_jobs().await;
4000        assert!(!cron_jobs[0].enabled);
4001
4002        // Re-enable
4003        manager.set_cron_enabled("cleanup", true).await;
4004        let cron_jobs = manager.list_cron_jobs().await;
4005        assert!(cron_jobs[0].enabled);
4006    }
4007
4008    #[tokio::test]
4009    async fn test_service_manager_remove_cleans_up_job() {
4010        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4011        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
4012
4013        let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
4014
4015        // Register job
4016        let job_spec = mock_job_spec();
4017        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
4018            .await
4019            .unwrap();
4020
4021        // Verify job is registered
4022        let spec = job_executor.get_job_spec("backup").await;
4023        assert!(spec.is_some());
4024
4025        // Remove job
4026        manager.remove_service("backup").await.unwrap();
4027
4028        // Verify job is unregistered
4029        let spec = job_executor.get_job_spec("backup").await;
4030        assert!(spec.is_none());
4031    }
4032
4033    #[tokio::test]
4034    async fn test_service_manager_remove_cleans_up_cron() {
4035        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4036        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
4037        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
4038
4039        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
4040
4041        // Register cron job
4042        let cron_spec = mock_cron_spec();
4043        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
4044            .await
4045            .unwrap();
4046
4047        // Verify cron job is registered
4048        assert_eq!(cron_scheduler.job_count().await, 1);
4049
4050        // Remove cron job
4051        manager.remove_service("cleanup").await.unwrap();
4052
4053        // Verify cron job is unregistered
4054        assert_eq!(cron_scheduler.job_count().await, 0);
4055    }
4056
4057    #[tokio::test]
4058    async fn test_service_manager_job_without_executor() {
4059        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4060        let manager = ServiceManager::new(runtime);
4061
4062        // Try to trigger job without executor configured
4063        let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
4064        assert!(result.is_err());
4065        assert!(result.unwrap_err().to_string().contains("not configured"));
4066    }
4067
4068    #[tokio::test]
4069    async fn test_service_manager_cron_without_scheduler() {
4070        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4071        let manager = ServiceManager::new(runtime);
4072
4073        // Try to register cron job without scheduler configured
4074        let cron_spec = mock_cron_spec();
4075        let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
4076        assert!(result.is_err());
4077        assert!(result.unwrap_err().to_string().contains("not configured"));
4078    }
4079
4080    #[tokio::test]
4081    async fn test_service_manager_list_job_executions() {
4082        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4083        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
4084
4085        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
4086
4087        // Register job
4088        let job_spec = mock_job_spec();
4089        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
4090            .await
4091            .unwrap();
4092
4093        // Trigger job twice
4094        manager
4095            .trigger_job("backup", JobTrigger::Cli)
4096            .await
4097            .unwrap();
4098        manager
4099            .trigger_job("backup", JobTrigger::Scheduler)
4100            .await
4101            .unwrap();
4102
4103        // Give jobs time to start
4104        tokio::time::sleep(Duration::from_millis(50)).await;
4105
4106        // List executions
4107        let executions = manager.list_job_executions("backup").await;
4108        assert_eq!(executions.len(), 2);
4109    }
4110
4111    // ==================== Container Supervisor Integration Tests ====================
4112
4113    #[tokio::test]
4114    async fn test_service_manager_with_supervisor() {
4115        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4116        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
4117
4118        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
4119
4120        // Add service
4121        let spec = mock_spec();
4122        Box::pin(manager.upsert_service("api".to_string(), spec))
4123            .await
4124            .unwrap();
4125
4126        // Scale up - containers should be registered with supervisor
4127        manager.scale_service("api", 2).await.unwrap();
4128
4129        // Verify containers are supervised
4130        assert_eq!(supervisor.supervised_count().await, 2);
4131
4132        // Scale down - containers should be unregistered
4133        manager.scale_service("api", 1).await.unwrap();
4134        assert_eq!(supervisor.supervised_count().await, 1);
4135
4136        // Remove service - remaining containers should be unregistered
4137        manager.remove_service("api").await.unwrap();
4138        assert_eq!(supervisor.supervised_count().await, 0);
4139    }
4140
4141    #[tokio::test]
4142    async fn test_service_manager_supervisor_state() {
4143        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4144        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
4145
4146        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
4147
4148        // Add and scale service
4149        let spec = mock_spec();
4150        Box::pin(manager.upsert_service("web".to_string(), spec))
4151            .await
4152            .unwrap();
4153        manager.scale_service("web", 1).await.unwrap();
4154
4155        // Check supervised state
4156        let container_id = ContainerId::new("web".to_string(), 1);
4157        let state = manager.get_container_supervised_state(&container_id).await;
4158        assert_eq!(state, Some(SupervisedState::Running));
4159    }
4160
4161    #[tokio::test]
4162    async fn test_service_manager_start_supervisor() {
4163        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4164        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
4165
4166        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
4167
4168        // Start the supervisor
4169        let handle = manager.start_container_supervisor().unwrap();
4170
4171        // Give it time to start
4172        tokio::time::sleep(Duration::from_millis(50)).await;
4173        assert!(supervisor.is_running());
4174
4175        // Shutdown
4176        manager.shutdown_container_supervisor();
4177
4178        // Wait for it to stop
4179        tokio::time::timeout(Duration::from_secs(1), handle)
4180            .await
4181            .unwrap()
4182            .unwrap();
4183
4184        assert!(!supervisor.is_running());
4185    }
4186
4187    #[tokio::test]
4188    async fn test_service_manager_supervisor_not_configured() {
4189        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4190        let manager = ServiceManager::new(runtime);
4191
4192        // Try to start supervisor without configuring it
4193        let result = manager.start_container_supervisor();
4194        assert!(result.is_err());
4195        assert!(result.unwrap_err().to_string().contains("not configured"));
4196    }
4197
4198    // ==================== Stream Registry Integration Tests ====================
4199
4200    fn mock_tcp_spec() -> ServiceSpec {
4201        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4202            r"
4203version: v1
4204deployment: test
4205services:
4206  database:
4207    rtype: service
4208    image:
4209      name: postgres:latest
4210    endpoints:
4211      - name: postgresql
4212        protocol: tcp
4213        port: 5432
4214    scale:
4215      mode: fixed
4216      replicas: 1
4217",
4218        )
4219        .unwrap()
4220        .services
4221        .remove("database")
4222        .unwrap()
4223    }
4224
4225    fn mock_udp_spec() -> ServiceSpec {
4226        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4227            r"
4228version: v1
4229deployment: test
4230services:
4231  dns:
4232    rtype: service
4233    image:
4234      name: dns:latest
4235    endpoints:
4236      - name: dns
4237        protocol: udp
4238        port: 53
4239    scale:
4240      mode: fixed
4241      replicas: 1
4242",
4243        )
4244        .unwrap()
4245        .services
4246        .remove("dns")
4247        .unwrap()
4248    }
4249
4250    fn mock_mixed_spec() -> ServiceSpec {
4251        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4252            r"
4253version: v1
4254deployment: test
4255services:
4256  mixed:
4257    rtype: service
4258    image:
4259      name: mixed:latest
4260    endpoints:
4261      - name: http
4262        protocol: http
4263        port: 8080
4264      - name: grpc
4265        protocol: tcp
4266        port: 9000
4267      - name: metrics
4268        protocol: udp
4269        port: 8125
4270    scale:
4271      mode: fixed
4272      replicas: 1
4273",
4274        )
4275        .unwrap()
4276        .services
4277        .remove("mixed")
4278        .unwrap()
4279    }
4280
4281    #[tokio::test]
4282    async fn test_service_manager_with_stream_registry_tcp() {
4283        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4284        let stream_registry = Arc::new(StreamRegistry::new());
4285
4286        let mut manager = ServiceManager::new(runtime);
4287        manager.set_stream_registry(stream_registry.clone());
4288        manager.set_deployment_name("test".to_string());
4289
4290        // Add TCP-only service
4291        let spec = mock_tcp_spec();
4292        Box::pin(manager.upsert_service("database".to_string(), spec))
4293            .await
4294            .unwrap();
4295
4296        // Verify TCP route was registered
4297        assert_eq!(stream_registry.tcp_count(), 1);
4298        assert!(stream_registry.tcp_ports().contains(&5432));
4299
4300        // Remove service and verify cleanup
4301        manager.remove_service("database").await.unwrap();
4302        assert_eq!(stream_registry.tcp_count(), 0);
4303    }
4304
4305    #[tokio::test]
4306    async fn test_service_manager_with_stream_registry_udp() {
4307        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4308        let stream_registry = Arc::new(StreamRegistry::new());
4309
4310        let mut manager = ServiceManager::new(runtime);
4311        manager.set_stream_registry(stream_registry.clone());
4312        manager.set_deployment_name("test".to_string());
4313
4314        // Add UDP-only service
4315        let spec = mock_udp_spec();
4316        Box::pin(manager.upsert_service("dns".to_string(), spec))
4317            .await
4318            .unwrap();
4319
4320        // Verify UDP route was registered
4321        assert_eq!(stream_registry.udp_count(), 1);
4322        assert!(stream_registry.udp_ports().contains(&53));
4323
4324        // Remove service and verify cleanup
4325        manager.remove_service("dns").await.unwrap();
4326        assert_eq!(stream_registry.udp_count(), 0);
4327    }
4328
4329    #[tokio::test]
4330    async fn test_service_manager_with_stream_registry_mixed() {
4331        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4332        let stream_registry = Arc::new(StreamRegistry::new());
4333
4334        let mut manager = ServiceManager::new(runtime);
4335        manager.set_stream_registry(stream_registry.clone());
4336        manager.set_deployment_name("test".to_string());
4337
4338        // Add mixed service (HTTP + TCP + UDP)
4339        let spec = mock_mixed_spec();
4340        Box::pin(manager.upsert_service("mixed".to_string(), spec))
4341            .await
4342            .unwrap();
4343
4344        // Verify stream routes were registered
4345        assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
4346        assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
4347
4348        assert!(stream_registry.tcp_ports().contains(&9000));
4349        assert!(stream_registry.udp_ports().contains(&8125));
4350
4351        // Remove service and verify stream cleanup
4352        manager.remove_service("mixed").await.unwrap();
4353        assert_eq!(stream_registry.tcp_count(), 0);
4354        assert_eq!(stream_registry.udp_count(), 0);
4355    }
4356
4357    #[tokio::test]
4358    async fn test_service_manager_stream_registry_builder() {
4359        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4360        let stream_registry = Arc::new(StreamRegistry::new());
4361
4362        // Test builder pattern
4363        let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
4364
4365        // Verify stream registry is accessible
4366        assert!(manager.stream_registry().is_some());
4367    }
4368
4369    #[tokio::test]
4370    async fn test_tcp_service_without_stream_registry() {
4371        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4372
4373        // Manager without stream registry
4374        let mut manager = ServiceManager::new(runtime);
4375        manager.set_deployment_name("test".to_string());
4376
4377        // Add TCP service - should log warning but not fail
4378        let spec = mock_tcp_spec();
4379        Box::pin(manager.upsert_service("database".to_string(), spec))
4380            .await
4381            .unwrap();
4382
4383        // No stream registry to check, but service should be tracked
4384        let services = manager.list_services().await;
4385        assert!(services.contains(&"database".to_string()));
4386    }
4387
4388    /// Verify `collect_endpoint_backends` filters containers by
4389    /// `EndpointSpec.target_role`.
4390    ///
4391    /// Given two replica groups (`primary` × 1, `read` × 2) and two
4392    /// endpoints — one with `target_role: primary` and one with
4393    /// `target_role: read` — each endpoint should receive only the
4394    /// matching containers' overlay addresses. The legacy no-filter
4395    /// endpoint (`target_role: None`) should receive all of them.
4396    #[tokio::test]
4397    #[allow(clippy::too_many_lines)]
4398    async fn test_collect_endpoint_backends_respects_target_role() {
4399        use crate::runtime::Container;
4400        use std::collections::HashMap as StdHashMap;
4401        use std::net::{IpAddr, Ipv4Addr};
4402        use zlayer_spec::{
4403            EndpointSpec, ExposeType, GroupAffinity, Protocol, ReplicaGroup, ScaleSpec,
4404        };
4405
4406        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4407        let manager = ServiceManager::new(runtime.clone());
4408
4409        // Build a spec with replica_groups and three endpoints:
4410        // - "write" targets role "primary"
4411        // - "read" targets role "read"
4412        // - "any" has no target_role (legacy)
4413        let mut spec = mock_spec();
4414        spec.replica_groups = Some(vec![
4415            ReplicaGroup {
4416                role: "primary".to_string(),
4417                count: 1,
4418                image: None,
4419                env: StdHashMap::new(),
4420                command: None,
4421                resources: None,
4422                affinity: GroupAffinity::default(),
4423            },
4424            ReplicaGroup {
4425                role: "read".to_string(),
4426                count: 2,
4427                image: None,
4428                env: StdHashMap::new(),
4429                command: None,
4430                resources: None,
4431                affinity: GroupAffinity::default(),
4432            },
4433        ]);
4434        spec.scale = ScaleSpec::Fixed { replicas: 3 };
4435        spec.endpoints = vec![
4436            EndpointSpec {
4437                name: "write".to_string(),
4438                protocol: Protocol::Tcp,
4439                port: 5432,
4440                target_port: Some(5432),
4441                path: None,
4442                host: None,
4443                expose: ExposeType::Internal,
4444                stream: None,
4445                tunnel: None,
4446                target_role: Some("primary".to_string()),
4447            },
4448            EndpointSpec {
4449                name: "read".to_string(),
4450                protocol: Protocol::Tcp,
4451                port: 5433,
4452                target_port: Some(5432),
4453                path: None,
4454                host: None,
4455                expose: ExposeType::Internal,
4456                stream: None,
4457                tunnel: None,
4458                target_role: Some("read".to_string()),
4459            },
4460            EndpointSpec {
4461                name: "any".to_string(),
4462                protocol: Protocol::Tcp,
4463                port: 5434,
4464                target_port: Some(5432),
4465                path: None,
4466                host: None,
4467                expose: ExposeType::Internal,
4468                stream: None,
4469                tunnel: None,
4470                target_role: None,
4471            },
4472        ];
4473
4474        let instance = ServiceInstance::new(
4475            "postgres".to_string(),
4476            spec.clone(),
4477            runtime,
4478            None, // overlay_manager — not exercised by this test
4479        );
4480
4481        // Inject three containers directly: one primary, two read replicas.
4482        let cid_primary = ContainerId::with_role_and_node("postgres", 1, "primary", 0);
4483        let cid_first_read = ContainerId::with_role_and_node("postgres", 2, "read", 0);
4484        let cid_second_read = ContainerId::with_role_and_node("postgres", 3, "read", 0);
4485
4486        let ip_primary = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
4487        let ip_first_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2));
4488        let ip_second_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3));
4489
4490        {
4491            let mut containers = instance.containers().write().await;
4492            containers.insert(
4493                cid_primary.clone(),
4494                Container {
4495                    id: cid_primary,
4496                    image: spec.image.name.to_string(),
4497                    state: crate::runtime::ContainerState::Running,
4498                    pid: None,
4499                    task: None,
4500                    overlay_ip: Some(ip_primary),
4501                    health_monitor: None,
4502                    port_override: None,
4503                },
4504            );
4505            containers.insert(
4506                cid_first_read.clone(),
4507                Container {
4508                    id: cid_first_read,
4509                    image: spec.image.name.to_string(),
4510                    state: crate::runtime::ContainerState::Running,
4511                    pid: None,
4512                    task: None,
4513                    overlay_ip: Some(ip_first_read),
4514                    health_monitor: None,
4515                    port_override: None,
4516                },
4517            );
4518            containers.insert(
4519                cid_second_read.clone(),
4520                Container {
4521                    id: cid_second_read,
4522                    image: spec.image.name.to_string(),
4523                    state: crate::runtime::ContainerState::Running,
4524                    pid: None,
4525                    task: None,
4526                    overlay_ip: Some(ip_second_read),
4527                    health_monitor: None,
4528                    port_override: None,
4529                },
4530            );
4531        }
4532
4533        let write_ep = &spec.endpoints[0];
4534        let read_ep = &spec.endpoints[1];
4535        let any_ep = &spec.endpoints[2];
4536
4537        let write_backends = manager.collect_endpoint_backends(&instance, write_ep).await;
4538        let read_backends = manager.collect_endpoint_backends(&instance, read_ep).await;
4539        let any_backends = manager.collect_endpoint_backends(&instance, any_ep).await;
4540
4541        // write endpoint -> only the primary container
4542        assert_eq!(write_backends.len(), 1, "write should match only primary");
4543        assert!(
4544            write_backends.iter().any(|a| a.ip() == ip_primary),
4545            "write backends missing primary IP: {write_backends:?}"
4546        );
4547
4548        // read endpoint -> both read containers, no primary
4549        assert_eq!(
4550            read_backends.len(),
4551            2,
4552            "read should match both read replicas"
4553        );
4554        assert!(read_backends.iter().any(|a| a.ip() == ip_first_read));
4555        assert!(read_backends.iter().any(|a| a.ip() == ip_second_read));
4556        assert!(
4557            !read_backends.iter().any(|a| a.ip() == ip_primary),
4558            "read backends must not contain primary: {read_backends:?}"
4559        );
4560
4561        // legacy endpoint (target_role = None) -> every container
4562        assert_eq!(
4563            any_backends.len(),
4564            3,
4565            "any-role endpoint should see all containers"
4566        );
4567    }
4568
4569    /// Build a `ServiceInstance` whose spec carries a deployment, so the
4570    /// instance's deployment-scoping helpers (`dns_hostnames` /
4571    /// `dns_search_domain`) are exercised through the real construction path
4572    /// (the constructors capture `spec.deployment`).
4573    fn instance_in_deployment(service: &str, deployment: &str) -> ServiceInstance {
4574        let mut spec = ServiceSpec::minimal(service, "postgres:16-alpine");
4575        spec.deployment = Some(deployment.to_string());
4576        ServiceInstance::new(
4577            service.to_string(),
4578            spec,
4579            Arc::new(MockRuntime::new()),
4580            None,
4581        )
4582    }
4583
4584    /// Register every hostname an instance would register into a real
4585    /// [`DnsServer`]'s authority, then resolve a name through that authority and
4586    /// assert the answer. Uses the DNS handle's direct authority lookup (no UDP
4587    /// roundtrip — a blocking sync DNS client inside a tokio runtime would
4588    /// deadlock the current-thread executor), so the test exercises the actual
4589    /// record store deterministically.
4590    async fn resolve_through_authority(
4591        handle: &zlayer_overlay::DnsHandle,
4592        fqdn: &str,
4593    ) -> Option<IpAddr> {
4594        handle.lookup_a(fqdn).await
4595    }
4596
4597    /// Bug 6 (the CRITICAL leak): two DIFFERENT deployments each with a service
4598    /// named `postgres`, registered into ONE daemon-global DNS authority, must
4599    /// resolve to their OWN instance's IP — no last-writer-wins clobber across
4600    /// deployments. The deployment-scoped FQDNs are what a guest's
4601    /// `search <D>.zlayer.local` expands the bare `postgres` / `postgres.service`
4602    /// queries into, so distinct scoped keys are exactly what prevents the leak.
4603    #[tokio::test]
4604    async fn deployment_scoped_dns_no_cross_deployment_clobber() {
4605        use zlayer_overlay::DnsServer;
4606
4607        let bind = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
4608        let server = DnsServer::new(bind, "zlayer.local.").expect("build DNS server");
4609        let handle = server.handle();
4610
4611        let inst_a = instance_in_deployment("postgres", "deploy-a");
4612        let inst_b = instance_in_deployment("postgres", "deploy-b");
4613
4614        let cid = ContainerId::with_role_and_node("postgres", 1, "default", 0);
4615        let ip_a = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 0, 11));
4616        let ip_b = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 0, 22));
4617
4618        // Register every name each instance would register for this container,
4619        // in deployment order, into the SAME authority.
4620        for name in inst_a.dns_hostnames(&cid) {
4621            handle.add_record(&name, ip_a).await.expect("add a record");
4622        }
4623        for name in inst_b.dns_hostnames(&cid) {
4624            handle.add_record(&name, ip_b).await.expect("add b record");
4625        }
4626
4627        // The deployment-scoped FQDNs stay distinct and resolve to their OWN IP.
4628        assert_eq!(
4629            resolve_through_authority(&handle, "postgres.deploy-a.zlayer.local").await,
4630            Some(ip_a),
4631            "deploy-a's postgres must resolve to deploy-a's IP"
4632        );
4633        assert_eq!(
4634            resolve_through_authority(&handle, "postgres.deploy-b.zlayer.local").await,
4635            Some(ip_b),
4636            "deploy-b's postgres must resolve to deploy-b's IP (no clobber)"
4637        );
4638        // `<svc>.service` form, scoped per deployment.
4639        assert_eq!(
4640            resolve_through_authority(&handle, "postgres.service.deploy-a.zlayer.local").await,
4641            Some(ip_a),
4642        );
4643        assert_eq!(
4644            resolve_through_authority(&handle, "postgres.service.deploy-b.zlayer.local").await,
4645            Some(ip_b),
4646        );
4647    }
4648
4649    /// Bug 6 part (a): two services in the SAME deployment resolve each other by
4650    /// their deployment-scoped names.
4651    #[tokio::test]
4652    async fn deployment_scoped_dns_same_deployment_siblings_resolve() {
4653        use zlayer_overlay::DnsServer;
4654
4655        let bind = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
4656        let server = DnsServer::new(bind, "zlayer.local.").expect("build DNS server");
4657        let handle = server.handle();
4658
4659        let db = instance_in_deployment("db", "myapp");
4660        let cache = instance_in_deployment("cache", "myapp");
4661        let cid = ContainerId::with_role_and_node("x", 1, "default", 0);
4662        let ip_db = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 1, 1));
4663        let ip_cache = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 1, 2));
4664
4665        for name in db.dns_hostnames(&cid) {
4666            handle.add_record(&name, ip_db).await.expect("add db");
4667        }
4668        for name in cache.dns_hostnames(&cid) {
4669            handle.add_record(&name, ip_cache).await.expect("add cache");
4670        }
4671
4672        // Within deployment `myapp`, the search-expanded sibling FQDNs resolve
4673        // to the same deployment's instances.
4674        assert_eq!(
4675            resolve_through_authority(&handle, "db.myapp.zlayer.local").await,
4676            Some(ip_db)
4677        );
4678        assert_eq!(
4679            resolve_through_authority(&handle, "cache.myapp.zlayer.local").await,
4680            Some(ip_cache)
4681        );
4682    }
4683
4684    /// Bug 6 part (b): the per-deployment resolv.conf `search` value is emitted
4685    /// correctly (deployment scope first, bare zone last for cross-deployment
4686    /// FQDN + global names), and is absent for a non-deployment instance.
4687    #[test]
4688    fn dns_search_domain_is_deployment_scoped() {
4689        let scoped = instance_in_deployment("api", "deploy-a");
4690        assert_eq!(
4691            scoped.dns_search_domain("zlayer.local"),
4692            Some("deploy-a.zlayer.local zlayer.local".to_string()),
4693        );
4694        // Trailing dot on the zone is normalized away.
4695        assert_eq!(
4696            scoped.dns_search_domain("zlayer.local."),
4697            Some("deploy-a.zlayer.local zlayer.local".to_string()),
4698        );
4699
4700        // No deployment => no override (caller falls back to the global zone).
4701        let unscoped = ServiceInstance::new(
4702            "api".to_string(),
4703            ServiceSpec::minimal("api", "nginx:latest"),
4704            Arc::new(MockRuntime::new()),
4705            None,
4706        );
4707        assert_eq!(unscoped.dns_search_domain("zlayer.local"), None);
4708    }
4709
4710    /// The deployment-scoped hostnames include both the bare-name scope
4711    /// (`<svc>.<D>`) and the `<svc>.service.<D>` form, and the legacy unscoped
4712    /// names are still emitted for native / compose back-compat.
4713    #[test]
4714    fn dns_hostnames_emit_scoped_and_legacy_families() {
4715        let inst = instance_in_deployment("postgres", "myapp");
4716        let cid = ContainerId::with_role_and_node("postgres", 2, "default", 0);
4717        let names = inst.dns_hostnames(&cid);
4718
4719        // Deployment-scoped family.
4720        assert!(names.contains(&"postgres.myapp".to_string()));
4721        assert!(names.contains(&"postgres.service.myapp".to_string()));
4722        assert!(names.contains(&"postgres.myapp.service".to_string()));
4723        assert!(names.contains(&"2.postgres.service.myapp".to_string()));
4724
4725        // Legacy / unscoped family (back-compat).
4726        assert!(names.contains(&"postgres".to_string()));
4727        assert!(names.contains(&"postgres.service.local".to_string()));
4728        assert!(names.contains(&"2.postgres.service.local".to_string()));
4729    }
4730
4731    /// Minimal [`Cluster`] stub for the external-domain DNS tests: only
4732    /// `select_ingress_overlay_ip` carries behavior (returns the configured
4733    /// peer overlay IP); every other method is an unreachable stub since these
4734    /// tests never exercise scaling/placement.
4735    struct IngressPeerCluster {
4736        ingress_overlay_ip: Option<String>,
4737    }
4738
4739    #[async_trait::async_trait]
4740    impl zlayer_scheduler::cluster::Cluster for IngressPeerCluster {
4741        fn node_id(&self) -> u64 {
4742            2
4743        }
4744        async fn select_ingress_overlay_ip(&self) -> Option<String> {
4745            self.ingress_overlay_ip.clone()
4746        }
4747        async fn is_leader(&self) -> bool {
4748            false
4749        }
4750        async fn leader_addr(&self) -> Option<std::net::SocketAddr> {
4751            None
4752        }
4753        async fn list_nodes(&self) -> Vec<zlayer_scheduler::cluster::NodeRecord> {
4754            Vec::new()
4755        }
4756        async fn dispatch_scale(
4757            &self,
4758            _target: u64,
4759            _req: zlayer_scheduler::cluster::InternalScaleRequest,
4760        ) -> Result<(), zlayer_scheduler::cluster::ClusterError> {
4761            unreachable!("not exercised by external-domain DNS tests")
4762        }
4763        async fn forward_scale(
4764            &self,
4765            _req: zlayer_scheduler::cluster::InternalScaleRequest,
4766        ) -> Result<(), zlayer_scheduler::cluster::ClusterError> {
4767            unreachable!("not exercised by external-domain DNS tests")
4768        }
4769        async fn place_container(
4770            &self,
4771            _spec: &zlayer_spec::ServiceSpec,
4772        ) -> Result<
4773            Option<zlayer_scheduler::cluster::ContainerPlacement>,
4774            zlayer_scheduler::cluster::ClusterError,
4775        > {
4776            unreachable!("not exercised by external-domain DNS tests")
4777        }
4778    }
4779
4780    /// A `ServiceSpec` whose single endpoint carries an external vhost domain.
4781    fn mock_spec_with_host(host: &str) -> ServiceSpec {
4782        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(&format!(
4783            r"
4784version: v1
4785deployment: test
4786services:
4787  web:
4788    rtype: service
4789    image:
4790      name: test:latest
4791    endpoints:
4792      - name: http
4793        protocol: http
4794        port: 8080
4795        host: {host}
4796    scale:
4797      mode: fixed
4798      replicas: 1
4799"
4800        ))
4801        .unwrap()
4802        .services
4803        .remove("web")
4804        .unwrap()
4805    }
4806
4807    #[tokio::test]
4808    async fn external_domain_registers_host_to_ingress_peer_ip() {
4809        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4810        let dns = Arc::new(
4811            zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
4812        );
4813        let spec = mock_spec_with_host("app.example.com");
4814        let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
4815        instance.set_dns_server(Arc::clone(&dns));
4816        // THIS node is not the ingress; the cluster names a peer's overlay IP.
4817        instance.set_ingress_enabled(false);
4818        instance.set_cluster(Arc::new(IngressPeerCluster {
4819            ingress_overlay_ip: Some("10.200.0.7".to_string()),
4820        }));
4821
4822        instance.register_external_domains(&dns).await;
4823
4824        // The external domain resolves to the ingress peer's overlay IP.
4825        let handle = dns.handle();
4826        let ip = handle.lookup_a("app.example.com.").await;
4827        assert_eq!(ip, Some("10.200.0.7".parse().unwrap()));
4828    }
4829
4830    #[tokio::test]
4831    async fn external_domain_skips_when_no_ingress_node() {
4832        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4833        let dns = Arc::new(
4834            zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
4835        );
4836        let spec = mock_spec_with_host("app.example.com");
4837        let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
4838        instance.set_dns_server(Arc::clone(&dns));
4839        // No ingress locally and the cluster has no ingress node → WARN + skip,
4840        // never error, and no record is registered.
4841        instance.set_ingress_enabled(false);
4842        instance.set_cluster(Arc::new(IngressPeerCluster {
4843            ingress_overlay_ip: None,
4844        }));
4845
4846        instance.register_external_domains(&dns).await;
4847
4848        let handle = dns.handle();
4849        assert_eq!(handle.lookup_a("app.example.com.").await, None);
4850    }
4851
4852    #[tokio::test]
4853    async fn external_domain_skips_wildcard_host_patterns() {
4854        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4855        let dns = Arc::new(
4856            zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
4857        );
4858        // A wildcard host is a routing matcher, not a resolvable name.
4859        let spec = mock_spec_with_host("'*.example.com'");
4860        let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
4861        instance.set_dns_server(Arc::clone(&dns));
4862        instance.set_ingress_enabled(false);
4863        instance.set_cluster(Arc::new(IngressPeerCluster {
4864            ingress_overlay_ip: Some("10.200.0.7".to_string()),
4865        }));
4866
4867        instance.register_external_domains(&dns).await;
4868
4869        let handle = dns.handle();
4870        // Wildcard skipped → nothing registered under the literal pattern.
4871        assert_eq!(handle.lookup_a("*.example.com.").await, None);
4872    }
4873}