Skip to main content

zlayer_agent/
service.rs

1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6    DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::{IpAddr, SocketAddr};
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType, ServiceSpec};
24
25/// Shared handle to the daemon's registry-credential store, used by the
26/// supervisor to resolve `zlayer login` credentials (and the
27/// `~/.docker/config.json` fallback) for a service's image when recreating a
28/// container outside the API create handler (restore / drift / scale). Matches
29/// the concrete type wired into the API state in `serve.rs`.
30pub type RegistryCredentialStoreHandle =
31    Arc<zlayer_secrets::RegistryCredentialStore<Arc<zlayer_secrets::PersistentSecretsStore>>>;
32
33/// Service instance manages a single service's containers
34pub struct ServiceInstance {
35    pub service_name: String,
36    pub spec: ServiceSpec,
37    runtime: Arc<dyn Runtime + Send + Sync>,
38    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
39    /// Overlay network manager for container networking (optional, not needed for Docker runtime)
40    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
41    /// Proxy manager for updating backend health (optional)
42    proxy_manager: Option<Arc<ProxyManager>>,
43    /// DNS server for service discovery (optional)
44    dns_server: Option<Arc<DnsServer>>,
45    /// Container-injectable overlay resolver IP (optional). When set, this
46    /// node's overlay DNS server is reachable on `<ip>:53` and we inject it
47    /// into the container's resolv.conf so workloads resolve through the
48    /// overlay instead of inheriting the host's resolv.conf.
49    container_dns: Option<IpAddr>,
50    /// Shared health states map so callbacks can update ServiceManager-level health
51    health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
52    /// Most recently observed image digest after a successful pull. Used by
53    /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
54    /// requiring callers to track digest state externally. Wrapped in a
55    /// `RwLock` so `&self` methods (`scale_to`) can update it.
56    last_pulled_digest: tokio::sync::RwLock<Option<String>>,
57    /// Local cluster node id used when constructing new `ContainerId`s during
58    /// scale-up. `0` in single-node deployments or when the cluster handle is
59    /// not yet wired. Populated by `ServiceManager` from `Cluster::node_id()`
60    /// at instance construction time.
61    node_id: u64,
62    /// Owning deployment name (the `zlayer up` / deploy request's deployment),
63    /// when known. Threaded from `ServiceSpec.deployment` by `upsert_service`.
64    ///
65    /// Used to scope service-discovery DNS per-deployment: records are
66    /// registered as `{service}.{deployment}` / `{service}.service.{deployment}`
67    /// (within the daemon zone) and the container's resolv.conf gets a
68    /// per-deployment `search` domain so a bare `<svc>` / `<svc>.service` query
69    /// resolves to THIS deployment's instance and never clobbers another
70    /// deployment that happens to share a service name. `None` for standalone /
71    /// single-deployment callers (falls back to the daemon's global zone).
72    deployment: Option<String>,
73    /// Whether THIS node holds the standing HTTP/HTTPS ingress on
74    /// `0.0.0.0:80` / `0.0.0.0:443` (mirrors `NodeConfig.ingress`). When `true`
75    /// and the node has an overlay IP, external service domains
76    /// (`EndpointSpec.host`) are resolved to this node's own overlay IP;
77    /// otherwise the selected ingress peer's overlay IP is used. Defaults to
78    /// `false`. Threaded from `ServiceManager`.
79    ingress_enabled: bool,
80    /// Cluster handle used to select an ingress-capable peer's overlay IP when
81    /// THIS node is not itself the ingress. `None` in standalone / single-node
82    /// mode (the funnel is then THIS node when it is ingress-capable).
83    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
84    /// Sink used to revoke a container's scoped access token on scale-down /
85    /// removal. Threaded from `ServiceManager`. `None` = no persistence backend
86    /// wired (the container token was minted without a revocable `jti`).
87    token_sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>,
88    /// Sink that persists the resolved image digest into the deployment store
89    /// after a successful pull (see [`crate::auth::DeploymentDigestSink`]).
90    /// `None` disables digest persistence. Threaded from `ServiceManager`.
91    digest_sink: Option<Arc<dyn crate::auth::DeploymentDigestSink>>,
92    /// Restore pin: the image digest this service resolved to before the daemon
93    /// last went down (loaded from `StoredDeployment.resolved_image_digests` on
94    /// restore). When set, [`Self::pull_and_refresh_digest`] tries a strictly
95    /// LOCAL resolution (`SourcePolicy::LocalOnly`) FIRST — recreating the
96    /// container from the already-local image (local registry + local blob
97    /// layers) with zero remote/S3 traffic — and only falls back to the spec's
98    /// (remote-capable) source policy when the image is NOT present locally.
99    /// `None` for fresh deploys (the normal pull path runs unchanged).
100    restore_pin: tokio::sync::RwLock<Option<String>>,
101    /// Registry credential store used to resolve `zlayer login` credentials +
102    /// the `~/.docker/config.json` fallback for this service's image when the
103    /// supervisor recreates a container (restore / drift / scale). Threaded from
104    /// `ServiceManager`. `None` = no store wired → the pull runs anonymous /
105    /// docker-config as before. Without this,
106    /// [`Self::pull_and_refresh_digest`] pulled with `None` auth and a private
107    /// image failed to re-pull on supervised recreate — even though the initial
108    /// deploy (through the API create handler) had resolved auth correctly.
109    registry_store: Option<RegistryCredentialStoreHandle>,
110}
111
112impl ServiceInstance {
113    /// Create a new service instance
114    pub fn new(
115        service_name: String,
116        spec: ServiceSpec,
117        runtime: Arc<dyn Runtime + Send + Sync>,
118        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
119    ) -> Self {
120        let deployment = spec.deployment.clone();
121        Self {
122            service_name,
123            spec,
124            runtime,
125            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
126            overlay_manager,
127            proxy_manager: None,
128            dns_server: None,
129            container_dns: None,
130            health_states: None,
131            last_pulled_digest: tokio::sync::RwLock::new(None),
132            node_id: 0,
133            deployment,
134            ingress_enabled: false,
135            cluster: None,
136            token_sink: None,
137            digest_sink: None,
138            restore_pin: tokio::sync::RwLock::new(None),
139            registry_store: None,
140        }
141    }
142
143    /// Create a new service instance with proxy manager for health-aware load balancing
144    pub fn with_proxy(
145        service_name: String,
146        spec: ServiceSpec,
147        runtime: Arc<dyn Runtime + Send + Sync>,
148        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
149        proxy_manager: Arc<ProxyManager>,
150    ) -> Self {
151        let deployment = spec.deployment.clone();
152        Self {
153            service_name,
154            spec,
155            runtime,
156            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
157            overlay_manager,
158            proxy_manager: Some(proxy_manager),
159            dns_server: None,
160            container_dns: None,
161            health_states: None,
162            last_pulled_digest: tokio::sync::RwLock::new(None),
163            node_id: 0,
164            deployment,
165            ingress_enabled: false,
166            cluster: None,
167            token_sink: None,
168            digest_sink: None,
169            restore_pin: tokio::sync::RwLock::new(None),
170            registry_store: None,
171        }
172    }
173
174    /// Set the local cluster node id. Used by `ServiceManager` to thread
175    /// `Cluster::node_id()` down to container construction so new
176    /// `ContainerId`s carry the owning node identity. Defaults to `0` (the
177    /// single-node sentinel) when unset.
178    pub fn set_node_id(&mut self, node_id: u64) {
179        self.node_id = node_id;
180    }
181
182    /// Set the owning deployment name for service-discovery DNS scoping.
183    ///
184    /// Idempotent with construction: the constructors already capture
185    /// `spec.deployment`, but `ServiceManager` calls this so a deployment
186    /// stamped after the fact (or via a different code path) is honored.
187    pub fn set_deployment(&mut self, deployment: Option<String>) {
188        self.deployment = deployment;
189    }
190
191    /// Set whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
192    /// `NodeConfig.ingress`). Threaded by `ServiceManager` so external service
193    /// domains can be resolved to this node's overlay IP when it is the funnel.
194    pub fn set_ingress_enabled(&mut self, enabled: bool) {
195        self.ingress_enabled = enabled;
196    }
197
198    /// Set the cluster handle used to select an ingress-capable peer's overlay
199    /// IP when THIS node is not itself the ingress.
200    pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
201        self.cluster = Some(cluster);
202    }
203
204    /// Set the token sink used to revoke container scoped tokens on
205    /// scale-down / removal. Threaded from `ServiceManager`.
206    pub fn set_token_sink(&mut self, sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>) {
207        self.token_sink = sink;
208    }
209
210    /// Set the sink that persists the resolved image digest into the deployment
211    /// store after a successful pull. Threaded from `ServiceManager`.
212    pub fn set_digest_sink(&mut self, sink: Option<Arc<dyn crate::auth::DeploymentDigestSink>>) {
213        self.digest_sink = sink;
214    }
215
216    /// Set the registry credential store used to resolve stored (`zlayer login`)
217    /// credentials + the `~/.docker/config.json` fallback for this service's
218    /// image on supervised recreate. Threaded from `ServiceManager`.
219    pub fn set_registry_store(&mut self, store: Option<RegistryCredentialStoreHandle>) {
220        self.registry_store = store;
221    }
222
223    /// Pin this service to a previously-resolved image `digest` for restore.
224    ///
225    /// On daemon restart the bin loads `StoredDeployment.resolved_image_digests`
226    /// and calls this for each service before scaling. With a pin set,
227    /// [`Self::pull_and_refresh_digest`] resolves the image STRICTLY locally
228    /// first (no remote/S3), so a service whose layers are already on disk comes
229    /// back even when the S3/origin tiers are down — breaking the boot-time
230    /// circular dependency where coming up requires re-pulling an already-local
231    /// image. `&self` so the restore path can pin without an exclusive borrow.
232    pub async fn set_restore_pin(&self, digest: Option<String>) {
233        *self.restore_pin.write().await = digest;
234    }
235
236    /// Resolve the overlay IP of the node that should serve external domains for
237    /// this service (the ingress funnel), as an `IpAddr`:
238    ///
239    /// 1. If THIS node is ingress-capable and has an overlay IP, use it.
240    /// 2. Otherwise ask the cluster for the deterministically-selected ingress
241    ///    peer's overlay IP.
242    /// 3. Otherwise (standalone, ingress-disabled local node, no peer) return
243    ///    `None` — the caller WARNs and skips registering the domain.
244    async fn resolve_ingress_ip(&self) -> Option<IpAddr> {
245        if self.ingress_enabled {
246            if let Some(om) = &self.overlay_manager {
247                if let Some(ip) = om.read().await.node_ip() {
248                    return Some(ip);
249                }
250            }
251        }
252        // Not the local funnel (or no local overlay IP): defer to the cluster's
253        // chosen ingress peer.
254        if let Some(cluster) = &self.cluster {
255            if let Some(ip_str) = cluster.select_ingress_overlay_ip().await {
256                match ip_str.parse::<IpAddr>() {
257                    Ok(ip) => return Some(ip),
258                    Err(e) => tracing::warn!(
259                        ingress_overlay_ip = %ip_str,
260                        error = %e,
261                        "selected ingress peer overlay IP is not a valid IP address; \
262                         skipping external-domain DNS registration"
263                    ),
264                }
265            }
266        }
267        None
268    }
269
270    /// Register an A/AAAA record for every endpoint's external vhost domain
271    /// (`EndpointSpec.host`) pointing at the ingress funnel's overlay IP, so a
272    /// client resolving `<host>` reaches an ingress-capable node whose 80/443
273    /// proxy fans out to this service's backends across the mesh.
274    ///
275    /// Wildcard host patterns (`*.example.com`) are routing matchers, not
276    /// resolvable names, so they are skipped. When no ingress node exists
277    /// anywhere in the mesh yet (no 80/443 entrypoint reachable), each host is
278    /// logged at WARN and skipped — never an error; it re-registers on the
279    /// next container attach once an entrypoint appears.
280    async fn register_external_domains(&self, dns: &Arc<DnsServer>) {
281        // Collect resolvable external domains for this service's endpoints.
282        let hosts: Vec<String> = self
283            .spec
284            .endpoints
285            .iter()
286            .filter_map(|ep| ep.host.as_ref())
287            .map(|h| h.trim().to_string())
288            .filter(|h| !h.is_empty() && !h.contains('*'))
289            .collect();
290        if hosts.is_empty() {
291            return;
292        }
293
294        let Some(ingress_ip) = self.resolve_ingress_ip().await else {
295            for host in &hosts {
296                tracing::warn!(
297                    service = %self.service_name,
298                    host = %host,
299                    "no 80/443 entrypoint reachable yet — open one to serve {host}; \
300                     skipping external-domain DNS registration (will retry on next attach)"
301                );
302            }
303            return;
304        };
305
306        for host in &hosts {
307            // `host` is already a fully-qualified external domain; pass it with
308            // a trailing dot so `DnsServer::add_record` treats it as an FQDN and
309            // does NOT append the daemon's internal zone origin.
310            let fqdn = if host.ends_with('.') {
311                host.clone()
312            } else {
313                format!("{host}.")
314            };
315            match dns.add_record(&fqdn, ingress_ip).await {
316                Ok(()) => tracing::info!(
317                    service = %self.service_name,
318                    host = %host,
319                    ingress_ip = %ingress_ip,
320                    "registered external-domain DNS record (host -> ingress overlay IP)"
321                ),
322                Err(e) => tracing::warn!(
323                    service = %self.service_name,
324                    host = %host,
325                    error = %e,
326                    "failed to register external-domain DNS record"
327                ),
328            }
329        }
330    }
331
332    /// The per-deployment resolv.conf `search` domain list for containers in
333    /// this service's deployment, given the daemon's global DNS `zone` (e.g.
334    /// `zlayer.local`).
335    ///
336    /// Returns a space-separated `search` value placing the deployment scope
337    /// FIRST so a guest's bare `<svc>` / `<svc>.service` query expands to THIS
338    /// deployment's record before anything else, with the bare zone last so
339    /// cross-deployment by-FQDN names (`<svc>.<otherdeployment>`) still resolve:
340    ///
341    /// ```text
342    /// search <deployment>.<zone> <zone>
343    /// ```
344    ///
345    /// When this instance has no deployment (standalone / single-deployment),
346    /// returns `None` so callers fall back to the daemon's global zone domain.
347    #[must_use]
348    pub fn dns_search_domain(&self, zone: &str) -> Option<String> {
349        let zone = zone.trim_end_matches('.');
350        self.deployment.as_deref().map(|d| {
351            // `<deployment>.<zone>` first (deployment scope wins), `<zone>` last
352            // (cross-deployment FQDN + global names still resolve).
353            format!("{d}.{zone} {zone}")
354        })
355    }
356
357    /// The set of service-discovery hostnames to register for one container,
358    /// relative to the daemon's DNS zone (each gets `<zone>` appended by
359    /// [`DnsServer::add_record`]).
360    ///
361    /// Two families are emitted:
362    ///
363    /// 1. **Deployment-scoped** (only when this instance carries a
364    ///    `deployment`): `<svc>.<D>`, `<svc>.service.<D>`,
365    ///    `<replica>.<svc>.service.<D>`, the documented example FQDN form
366    ///    `<svc>.<D>.service`, and (for non-default replica groups)
367    ///    `<role>.<svc>.service.<D>`. Paired with the per-deployment
368    ///    `search <D>.<zone> <zone>` resolv.conf domain (see
369    ///    [`Self::dns_search_domain`]), a guest's bare `<svc>` expands to
370    ///    `<svc>.<D>.<zone>` and `<svc>.service` to `<svc>.service.<D>.<zone>`,
371    ///    so both resolve to THIS deployment's instance and NEVER clobber a
372    ///    different deployment that happens to share a service name.
373    ///
374    /// 2. **Unscoped / legacy** (always): the bare `<svc>` name plus
375    ///    `<svc>.service.local`, `<replica>.<svc>.service.local`, and the role
376    ///    form. These preserve the historical compose-style discovery used by
377    ///    native containers (no per-deployment search domain) and existing
378    ///    deployments. NOTE: the bare `<svc>` is the last-writer-wins,
379    ///    cross-deployment-ambiguous key — it is kept for back-compat but the
380    ///    deployment-scoped names above are what make discovery correct.
381    #[must_use]
382    fn dns_hostnames(&self, id: &ContainerId) -> Vec<String> {
383        let svc = &self.service_name;
384        let mut names: Vec<String> = Vec::new();
385
386        // --- Deployment-scoped family (correct, no cross-deployment leak) ---
387        if let Some(d) = self.deployment.as_deref() {
388            // bare `<svc>` -> `<svc>.<D>` (resolves via `search <D>.<zone>`)
389            names.push(format!("{svc}.{d}"));
390            // `<svc>.service` -> `<svc>.service.<D>`
391            names.push(format!("{svc}.service.{d}"));
392            // documented example FQDN form `<svc>.<D>.service`
393            names.push(format!("{svc}.{d}.service"));
394            // replica-specific `<replica>.<svc>.service` -> `.<D>`
395            names.push(format!("{}.{svc}.service.{d}", id.replica));
396            // per-role group form for non-default replica groups
397            if id.role != "default" {
398                names.push(format!("{}.{svc}.service.{d}", id.role));
399            }
400        }
401
402        // --- Unscoped / legacy family (compose back-compat) ---
403        // Bare compose service name (e.g. `postgres`); multiple replicas upsert
404        // the same key and the in-memory authority keeps the most recent A.
405        names.push(svc.clone());
406        names.push(format!("{svc}.service.local"));
407        names.push(format!("{}.{svc}.service.local", id.replica));
408        if id.role != "default" {
409            names.push(format!("{}.{svc}.service.local", id.role));
410        }
411
412        names
413    }
414
415    /// Derive the replica group role for a 1-based `replica_idx`.
416    ///
417    /// When `spec.replica_groups` is unset, returns `"default"` (the implicit
418    /// single-group case). Otherwise walks groups in declaration order,
419    /// accumulating each group's `count` until `replica_idx` falls within the
420    /// current group's range, and returns that group's `role`.
421    ///
422    /// Replicas beyond the declared total fall back to `"default"`.
423    #[must_use]
424    pub fn role_for_replica(&self, replica_idx: u32) -> String {
425        let Some(groups) = self.spec.replica_groups.as_ref() else {
426            return "default".to_string();
427        };
428        let mut cumulative = 0u32;
429        for group in groups {
430            cumulative = cumulative.saturating_add(group.count);
431            if replica_idx <= cumulative {
432                return group.role.clone();
433            }
434        }
435        "default".to_string()
436    }
437
438    /// Builder method to add DNS server for service discovery
439    #[must_use]
440    pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
441        self.dns_server = Some(dns_server);
442        self
443    }
444
445    /// Set the DNS server for service discovery
446    pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
447        // Stamp the per-deployment DNS `search` domain onto the spec so a Linux
448        // container's resolv.conf expands a bare `<svc>` / `<svc>.service`
449        // query to the registered deployment-scoped FQDN. The overlayd Linux
450        // attach path never applies the search domain itself, so without this
451        // `<svc>.service` resolution times out. Skip when the user supplied
452        // their own search list or this instance carries no deployment.
453        if self.spec.dns_search.is_empty() {
454            let zone = dns_server.zone_origin().to_string();
455            if let Some(search) = self.dns_search_domain(&zone) {
456                self.spec.dns_search = search.split_whitespace().map(str::to_string).collect();
457            }
458        }
459        self.dns_server = Some(dns_server);
460    }
461
462    /// Set the container-injectable overlay resolver IP and apply it to the
463    /// instance's spec.
464    ///
465    /// When `container_dns` is set and the spec is eligible (not host-network,
466    /// no user-supplied `dns`), this pre-populates `spec.dns` with the overlay
467    /// resolver so containers resolve through `<ip>:53` instead of inheriting
468    /// the host's `/etc/resolv.conf`.
469    ///
470    /// Why this exists: on overlay-enabled hosts the netbird `~.`
471    /// systemd-resolved hijack swallows the host resolver, so a container that
472    /// inherits the host resolv.conf cannot resolve anything. The overlay DNS
473    /// server forwards non-overlay queries upstream, so pointing the container
474    /// at it fixes resolution AND gives it service-name discovery.
475    ///
476    /// Port-53 constraint: `resolv.conf` `nameserver` lines (and Docker's
477    /// `--dns`) carry no port — they are always port 53. The injected IP is
478    /// therefore only useful because the daemon binds the overlay resolver on
479    /// `<ip>:53` (see `daemon.rs` Phase 4); the injected value is the bare IP,
480    /// not a `host:port`.
481    ///
482    /// User-supplied `spec.dns` is left untouched: an explicit resolver from
483    /// the deployment spec always wins.
484    pub fn set_container_dns(&mut self, container_dns: IpAddr) {
485        self.container_dns = Some(container_dns);
486        if !self.spec.host_network && self.spec.dns.is_empty() {
487            self.spec.dns = vec![container_dns.to_string()];
488        }
489    }
490
491    /// Whether this service's containers should be attached to the overlay
492    /// network at start.
493    ///
494    /// Host-network containers (`spec.host_network`) share the HOST's network
495    /// namespace — there is no private netns to plumb a veth into, and
496    /// attaching by the host-netns PID would wire a stray `zl-*` link into the
497    /// host stack (which systemd-networkd then tries to manage, hitting its
498    /// watchdog and SIGABRT'ing). Such containers must never be attached. This
499    /// mirrors the `host_network` guard in [`set_container_dns`].
500    fn should_attach_overlay(&self) -> bool {
501        !self.spec.host_network
502    }
503
504    /// Effective isolation-network name for this service's overlay attach.
505    /// Derived from the data-plane [`OverlayMode`] (Isolated => fenced to a
506    /// network named after the service) plus any explicit
507    /// `com.zlayer.isolation_network` label (which always wins). `None` => flat
508    /// cluster mesh.
509    fn isolation_network(&self) -> Option<String> {
510        let mode = self
511            .spec
512            .overlay
513            .as_ref()
514            .map(|o| o.mode)
515            .unwrap_or_default();
516        let explicit = self
517            .spec
518            .labels
519            .get(zlayer_types::overlay::ISOLATION_NETWORK_LABEL)
520            .cloned();
521        crate::overlay_manager::resolve_isolation_network(mode, &self.service_name, explicit)
522    }
523
524    /// Set the proxy manager for health-aware load balancing
525    pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
526        self.proxy_manager = Some(proxy_manager);
527    }
528
529    /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
530    pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
531        self.health_states = Some(states);
532    }
533
534    /// Get the last observed image digest (after the most recent successful
535    /// pull). Returns `None` when no pull has happened yet, when the runtime
536    /// does not expose digests, or when no matching `ImageInfo` was found.
537    pub async fn last_pulled_digest(&self) -> Option<String> {
538        self.last_pulled_digest.read().await.clone()
539    }
540
541    /// Pull the service image using the spec's pull policy (literal Docker /
542    /// Kubernetes semantics — no silent auto-upgrade of `IfNotPresent` to
543    /// `Newer` for `:latest` tags) and refresh the cached digest from
544    /// `Runtime::list_images` when the runtime exposes it. Returns the digest
545    /// observed after the pull, when known.
546    ///
547    /// For `Never`, the runtime is still called so it can load the image
548    /// config from the local cache (without any remote round-trip); only the
549    /// remote digest refresh is skipped. Without this call the bundle builder
550    /// has no image entrypoint/cmd and falls back to `/bin/sh`.
551    async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
552        let image_str = self.spec.image.name.to_string();
553        let policy = self.spec.image.pull_policy;
554        let spec_source = self.spec.image.source_policy.unwrap_or_default();
555
556        // Restore-from-local: when this service carries a restore pin (a digest
557        // resolved before the daemon last went down) AND its image is present
558        // locally, recreate from the already-local image with ZERO remote/S3
559        // traffic. We do this by attempting the pull with
560        // `SourcePolicy::LocalOnly` FIRST — the registry chain consults only the
561        // local blob cache + local registry (`try_local_registry`) and errors
562        // cleanly on a local miss, never reaching S3/origin. On a local miss we
563        // fall back to the spec's (remote-capable) source policy so a genuinely
564        // absent image still pulls normally. This breaks the boot-time circular
565        // dependency where ZLayer uses its own (down) ZataStorage as the S3 blob
566        // backend: an already-local service comes back regardless.
567        let restore_pin = self.restore_pin.read().await.clone();
568        let mut resolved_locally = false;
569        if restore_pin.is_some() {
570            match self
571                .runtime
572                .pull_image_with_policy(
573                    &image_str,
574                    policy,
575                    None,
576                    zlayer_spec::SourcePolicy::LocalOnly,
577                )
578                .await
579            {
580                Ok(()) => {
581                    resolved_locally = true;
582                    tracing::info!(
583                        image = %image_str,
584                        pinned_digest = ?restore_pin,
585                        "restore: image present locally; recreated from local layers \
586                         (no remote/S3 traffic)"
587                    );
588                }
589                Err(e) => {
590                    tracing::info!(
591                        image = %image_str,
592                        error = %e,
593                        "restore: image not present locally under pinned digest; \
594                         falling back to the spec source policy (remote-capable)"
595                    );
596                }
597            }
598        }
599
600        // Normal (remote-capable) pull when there is no restore pin, or when the
601        // local-only restore probe missed.
602        if !resolved_locally {
603            // Resolve registry auth for the image's host from the daemon's
604            // credential store (a `zlayer login <host>` credential), falling
605            // back to `~/.docker/config.json`, then anonymous. The API create
606            // handler does this at first deploy; the supervisor's recreate path
607            // (restore / drift / scale) must too, or a private image that
608            // deployed fine fails to re-pull here. `None` store (standalone /
609            // not wired) preserves the old anonymous behaviour.
610            let resolved_auth = match &self.registry_store {
611                Some(store) => {
612                    zlayer_secrets::resolve_stored_registry_auth(&image_str, store).await
613                }
614                None => None,
615            };
616            self.runtime
617                .pull_image_with_policy(&image_str, policy, resolved_auth.as_ref(), spec_source)
618                .await
619                .map_err(|e| AgentError::PullFailed {
620                    image: self.spec.image.name.to_string(),
621                    reason: e.to_string(),
622                })?;
623        }
624
625        // Best-effort: try to discover the resolved digest via list_images.
626        // Runtimes that don't support introspection (Unsupported) leave the
627        // cached digest unchanged; drift detection then falls back to "always
628        // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
629        // when no digests are known".
630        let new_digest = match self.runtime.list_images().await {
631            Ok(images) => images
632                .into_iter()
633                .find(|info| info.reference == image_str)
634                .and_then(|info| info.digest),
635            Err(e) => {
636                tracing::debug!(
637                    image = %image_str,
638                    error = %e,
639                    "list_images unavailable; cannot record post-pull digest"
640                );
641                None
642            }
643        };
644
645        // When list_images doesn't expose a digest but we just resolved a pinned
646        // image strictly from local layers, the pin IS the resolved digest —
647        // re-use it so the recorded/cached value survives a restore where the
648        // runtime can't introspect the local store.
649        let effective_digest = new_digest.clone().or(if resolved_locally {
650            restore_pin.clone()
651        } else {
652            None
653        });
654
655        if let Some(ref digest) = effective_digest {
656            *self.last_pulled_digest.write().await = Some(digest.clone());
657            // Persist the resolved digest into the deployment store so a future
658            // restart can recreate this service from the already-local image.
659            // Best-effort: a failed store write is logged by the sink and never
660            // fails the deploy/scale. Requires the owning deployment name.
661            if let (Some(sink), Some(deployment)) = (&self.digest_sink, self.deployment.as_deref())
662            {
663                sink.record(deployment, &self.service_name, digest).await;
664            }
665        }
666
667        Ok(effective_digest)
668    }
669
670    /// Attach a HOST-process (Linux youki) replica to the overlay in the
671    /// "created" state — by PID, before `start_container`. Returns the assigned
672    /// overlay IP, or `None` when there is no overlay manager, the service must
673    /// not attach (host-network), the runtime is the VM-guest kind (attached
674    /// post-start over vsock instead), or no PID is available (WASM) — those
675    /// benign cases return `Ok(None)`. A genuine attach failure is a HARD error
676    /// (`Err`): we never silently downgrade a service that asked for the overlay
677    /// to host networking. Kept as a standalone, `Box::pin`ned `async fn` so the
678    /// overlayd RPC future does not inflate `scale_to`'s state machine past the
679    /// `large_futures` threshold.
680    #[cfg(not(target_os = "windows"))]
681    async fn attach_host_overlay(
682        &self,
683        id: &ContainerId,
684        container_pid: Option<u32>,
685    ) -> Result<Option<std::net::IpAddr>, AgentError> {
686        let Some(overlay) = self
687            .overlay_manager
688            .as_ref()
689            .filter(|_| self.should_attach_overlay())
690        else {
691            return Ok(None);
692        };
693        if matches!(
694            self.runtime.overlay_attach_kind_for(id).await,
695            crate::runtime::OverlayAttachKind::GuestManaged
696                | crate::runtime::OverlayAttachKind::HostProxy
697        ) {
698            // GuestManaged: attach handled post-start. HostProxy (Seatbelt /
699            // native-VZ): the node is already on the overlay via a host-level
700            // `utun` and the container shares the host stack, so there is no
701            // per-container created-state attach.
702            return Ok(None);
703        }
704        let Some(pid) = container_pid else {
705            // No PID available (e.g. WASM runtime) - skip overlay attachment.
706            tracing::debug!(
707                container = %id,
708                "skipping overlay attachment - no PID available"
709            );
710            return Ok(None);
711        };
712        let overlay_guard = overlay.read().await;
713        // Per-deployment resolv.conf search domain so the container's bare
714        // `<svc>`/`<svc>.service` resolves to THIS deployment (no
715        // cross-deployment clobber).
716        let dns_override = overlay_guard
717            .dns_domain()
718            .and_then(|zone| self.dns_search_domain(zone));
719        match overlay_guard
720            .attach_container(
721                pid,
722                &self.service_name,
723                true,
724                false,
725                self.isolation_network(),
726                dns_override,
727            )
728            .await
729        {
730            Ok(ip) => Ok(Some(ip)),
731            Err(e) => Err(AgentError::Network(format!(
732                "failed to attach container {id} to overlay network: {e}"
733            ))),
734        }
735    }
736
737    /// Reclaim a half-created replica after an init-action / start / liveness
738    /// failure in the scale-up path: detach the overlay (by the PID it was
739    /// attached with, when an attach happened) and remove the container, then
740    /// return the original error for the caller to propagate. Mirrors the job
741    /// path's detach + remove on its init/start early returns.
742    ///
743    /// `attached_pid` is `Some(pid)` only when a HOST (Linux youki) overlay
744    /// attach actually succeeded for this replica; it is always `None` on
745    /// Windows (HCN teardown happens at container-remove) and for the VM-guest
746    /// kind (released by id elsewhere). Kept as a standalone `async fn` (vs an
747    /// inline closure) so its future does not inflate `scale_to`'s state
748    /// machine past the `large_futures` threshold.
749    async fn cleanup_failed_start(
750        &self,
751        id: &ContainerId,
752        attached_pid: Option<u32>,
753        err: AgentError,
754    ) -> AgentError {
755        #[cfg(not(target_os = "windows"))]
756        if let (Some(overlay), Some(pid)) = (self.overlay_manager.as_ref(), attached_pid) {
757            let overlay_guard = overlay.read().await;
758            if let Err(de) = overlay_guard.detach_container(pid).await {
759                tracing::warn!(
760                    container = %id,
761                    pid,
762                    error = %de,
763                    "failed to detach overlay during startup cleanup (veth/IP may leak)"
764                );
765            }
766        }
767        #[cfg(target_os = "windows")]
768        let _ = attached_pid;
769        if let Err(re) = self.runtime.remove_container(id).await {
770            tracing::warn!(
771                container = %id,
772                error = %re,
773                "failed to remove container during startup cleanup"
774            );
775        }
776        err
777    }
778
779    /// Scale to the desired number of replicas
780    ///
781    /// This method uses short-lived locks to avoid blocking concurrent operations.
782    /// I/O operations (pull, create, start, stop, remove) are performed without
783    /// holding the containers lock to allow other operations to proceed.
784    ///
785    /// # Errors
786    /// Returns an error if image pull, container creation, or container lifecycle operations fail.
787    #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
788    pub async fn scale_to(&self, replicas: u32) -> Result<()> {
789        // Phase 1: Determine current state (short read lock)
790        let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
791
792        // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
793        // here with replicas == current_replicas in the steady state) actually
794        // refreshes the cached digest. We skip the call only when scaling
795        // strictly down (no new containers needed). For `Never` the runtime
796        // still needs to load the image config from the local cache so the
797        // bundle builder gets entrypoint/cmd/env — without it the container
798        // falls back to `/bin/sh` and exits instantly. `pull_and_refresh_digest`
799        // itself handles the Never case (no remote round-trip, cache-only).
800        if replicas >= current_replicas {
801            let _ = self.pull_and_refresh_digest().await?;
802        }
803
804        // Phase 2: Scale up - create new containers (no lock held during I/O)
805        //
806        // Compute (role, replica_index) tuples for each new replica. When
807        // `spec.replica_groups` is set, expand groups in declaration order so
808        // each created replica maps to its declared `(role, intra_group_index)`.
809        // Otherwise fall back to the implicit single "default" group. The
810        // `local_node_id` is captured once so every new `ContainerId` carries
811        // the owning node identity for cross-node disambiguation.
812        let local_node_id = self.node_id;
813        if replicas > current_replicas {
814            let replica_specs: Vec<(String, u32)> =
815                if let Some(groups) = self.spec.replica_groups.as_ref() {
816                    let mut specs: Vec<(String, u32)> = Vec::new();
817                    for group in groups {
818                        for idx in 0..group.count {
819                            specs.push((group.role.clone(), idx + 1));
820                        }
821                    }
822                    specs
823                        .into_iter()
824                        .skip(current_replicas as usize)
825                        .take((replicas - current_replicas) as usize)
826                        .collect()
827                } else {
828                    (current_replicas..replicas)
829                        .map(|i| ("default".to_string(), i + 1))
830                        .collect()
831                };
832
833            for (role, replica_idx) in replica_specs {
834                let id = ContainerId::with_role_and_node(
835                    self.service_name.clone(),
836                    replica_idx,
837                    role,
838                    local_node_id,
839                );
840
841                // Reap any leftover container occupying this id before
842                // creating it. After a daemon restart a `KillMode=process`
843                // survivor (or a crash-orphan) from the previous instance can
844                // still hold this id; `create_container` would then collide
845                // ("container already exists"). Policy is reap & recreate, so
846                // stop+remove the stale one first. `NotFound` is the normal,
847                // expected clean-slate case.
848                match self.runtime.container_state(&id).await {
849                    Err(AgentError::NotFound { .. }) => {}
850                    Ok(state) => {
851                        tracing::warn!(
852                            container = %id,
853                            ?state,
854                            "reaping leftover container before recreate (reap & recreate)"
855                        );
856                        let _ = self
857                            .runtime
858                            .stop_container(&id, std::time::Duration::from_secs(5))
859                            .await;
860                        if let Err(e) = self.runtime.remove_container(&id).await {
861                            tracing::warn!(
862                                container = %id,
863                                error = %e,
864                                "failed to remove leftover container before recreate; continuing"
865                            );
866                        }
867                    }
868                    Err(e) => {
869                        tracing::warn!(
870                            container = %id,
871                            error = %e,
872                            "container_state probe failed before create; attempting create anyway"
873                        );
874                    }
875                }
876
877                // Create container (no lock needed - I/O operation)
878                //
879                // RouteToPeer must propagate unchanged: the scheduler uses it
880                // to re-place the workload on a capable peer, and wrapping it
881                // in `CreateFailed` would hide the signal and mark the service
882                // dead instead of rescheduling it. All other errors are
883                // normalised to `CreateFailed` for upstream handling.
884                self.runtime
885                    .create_container(&id, &self.spec)
886                    .await
887                    .map_err(|e| match e {
888                        AgentError::RouteToPeer { .. } => e,
889                        other => AgentError::CreateFailed {
890                            id: id.to_string(),
891                            reason: other.to_string(),
892                        },
893                    })?;
894
895                // Get container PID with retries (may not be immediately
896                // available). youki writes the init pid-file at CREATE time
897                // (the init is paused on the start fifo), so the PID is known
898                // in the "created" state — BEFORE start_container. We capture
899                // it here so the host overlay attach below can run while the
900                // process is still pid-1, dumpable, and root-owned. Once the
901                // entrypoint execve's + drops to a non-root user the netns
902                // becomes non-dumpable (overlayd EACCES on /proc/<pid>/ns/net),
903                // and a one-shot that already exited would 404 (ENOENT). WASM
904                // runtimes have no PID and report None here, which the host
905                // attach arm skips.
906                let mut container_pid = None;
907                for attempt in 1..=5u32 {
908                    match self.runtime.get_container_pid(&id).await {
909                        Ok(Some(pid)) => {
910                            container_pid = Some(pid);
911                            break;
912                        }
913                        Ok(None) if attempt < 5 => {
914                            tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
915                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
916                        }
917                        Ok(None) => {
918                            tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
919                        }
920                        Err(e) => {
921                            tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
922                            if attempt < 5 {
923                                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
924                            }
925                        }
926                    }
927                }
928
929                // Attach the HOST-process (Linux youki) overlay veth in the
930                // "created" state — between create_container and
931                // start_container — mirroring the job path. The guest (VZ
932                // vsock) and Windows (HCN) attaches stay POST-start below: the
933                // guest pushes config into a RUNNING guest over vsock, and
934                // Windows attaches its HCN endpoint at create time inside
935                // overlayd (here we only read the IP back, post-start).
936                //
937                // `host_overlay_ip` carries the host arm's result so the shared
938                // DNS registration + `overlay_ip` merge below run exactly once,
939                // after start, for whichever attach kind produced an IP.
940                #[cfg(not(target_os = "windows"))]
941                let host_overlay_ip: Option<std::net::IpAddr> =
942                    match Box::pin(self.attach_host_overlay(&id, container_pid)).await {
943                        Ok(ip) => ip,
944                        Err(e) => {
945                            return Err(Box::pin(self.cleanup_failed_start(&id, None, e)).await);
946                        }
947                    };
948
949                // From here on, an init-action or start failure must reclaim the
950                // overlay veth/IP we just attached AND remove the half-created
951                // container, or both leak (the attach moved BEFORE start, so the
952                // old `?`-early-returns would now strand a veth). Mirrors the job
953                // path's detach + remove on its init/start early returns. The
954                // cleanup lives in a separate `async fn` (and is `Box::pin`ned at
955                // the await sites) so its future does not inflate `scale_to`'s
956                // state machine past the `large_futures` threshold.
957                #[cfg(not(target_os = "windows"))]
958                let attached_pid = if host_overlay_ip.is_some() {
959                    container_pid
960                } else {
961                    None
962                };
963                #[cfg(target_os = "windows")]
964                let attached_pid: Option<u32> = None;
965
966                // Run init actions with error policy enforcement (no lock needed)
967                let init_orchestrator = InitOrchestrator::with_error_policy(
968                    id.clone(),
969                    self.spec.init.clone(),
970                    self.spec.errors.clone(),
971                );
972                if let Err(e) = init_orchestrator.run().await {
973                    return Err(Box::pin(self.cleanup_failed_start(&id, attached_pid, e)).await);
974                }
975
976                // Start container (no lock needed - I/O operation)
977                if let Err(e) =
978                    self.runtime
979                        .start_container(&id)
980                        .await
981                        .map_err(|e| AgentError::StartFailed {
982                            id: id.to_string(),
983                            reason: e.to_string(),
984                        })
985                {
986                    return Err(Box::pin(self.cleanup_failed_start(&id, attached_pid, e)).await);
987                }
988
989                // Verify the container is still running after start. If the
990                // init process crashed during start (bad image, missing libs,
991                // failed mount), surface the real cause from the container's
992                // log tail instead of a cryptic downstream cascade.
993                if container_pid.is_some() {
994                    let alive = match self.runtime.container_state(&id).await {
995                        Ok(
996                            ContainerState::Running
997                            | ContainerState::Pending
998                            | ContainerState::Initializing,
999                        ) => true,
1000                        Ok(state) => {
1001                            tracing::warn!(
1002                                container = %id,
1003                                ?state,
1004                                "container exited immediately after start"
1005                            );
1006                            false
1007                        }
1008                        Err(e) => {
1009                            // State query failed — don't block on it.
1010                            tracing::warn!(
1011                                container = %id,
1012                                error = %e,
1013                                "container state query failed after start, proceeding"
1014                            );
1015                            true
1016                        }
1017                    };
1018                    if !alive {
1019                        let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
1020                            || "  <log read failed>".to_string(),
1021                            |entries| {
1022                                if entries.is_empty() {
1023                                    "  <no log output>".to_string()
1024                                } else {
1025                                    entries
1026                                        .into_iter()
1027                                        .map(|e| format!("  {}", e.message))
1028                                        .collect::<Vec<_>>()
1029                                        .join("\n")
1030                                }
1031                            },
1032                        );
1033                        return Err(Box::pin(self.cleanup_failed_start(
1034                            &id,
1035                            attached_pid,
1036                            AgentError::StartFailed {
1037                                id: id.to_string(),
1038                                reason: format!("container exited during startup:\n{log_tail}"),
1039                            },
1040                        ))
1041                        .await);
1042                    }
1043                }
1044
1045                // Resolve the overlay IP. The host (Linux youki) attach already
1046                // ran in the created state above; here we only run the guest
1047                // (vsock) and Windows (HCN read-back) kinds, which require a
1048                // RUNNING container, then register DNS once for whichever kind
1049                // produced an IP.
1050                let overlay_ip = if let Some(overlay) = self
1051                    .overlay_manager
1052                    .as_ref()
1053                    .filter(|_| self.should_attach_overlay())
1054                {
1055                    let overlay_guard = overlay.read().await;
1056                    #[cfg(target_os = "windows")]
1057                    let attach_result: Option<std::net::IpAddr> = {
1058                        match self.runtime.overlay_attach_kind_for(&id).await {
1059                            // HCS: the HCN endpoint + per-container namespace were
1060                            // created inside `HcsRuntime::create_container` by
1061                            // overlayd; here we only need the IP it assigned so we
1062                            // can register DNS for service discovery.
1063                            crate::runtime::OverlayAttachKind::HostIp => {
1064                                let _ = &overlay_guard; // unused for the read-back path
1065                                match self.runtime.get_container_ip(&id).await {
1066                                    Ok(Some(ip)) => Some(ip),
1067                                    Ok(None) => {
1068                                        tracing::debug!(
1069                                            container = %id,
1070                                            "no overlay IP recorded (HCS attach skipped at create time)"
1071                                        );
1072                                        None
1073                                    }
1074                                    Err(e) => {
1075                                        tracing::warn!(
1076                                            container = %id,
1077                                            error = %e,
1078                                            "failed to fetch container overlay IP"
1079                                        );
1080                                        None
1081                                    }
1082                                }
1083                            }
1084                            // WSL2 Linux guest: overlayd allocates the overlay
1085                            // identity and we push the WireGuard config into the
1086                            // distro's network namespace, where it brings up its own
1087                            // kernel WireGuard device. (Same as the non-Windows
1088                            // GuestManaged arm below.)
1089                            crate::runtime::OverlayAttachKind::GuestManaged => {
1090                                let cid = id.to_string();
1091                                // Per-deployment resolv.conf search domain so the
1092                                // guest's bare `<svc>`/`<svc>.service` resolves to
1093                                // THIS deployment (no cross-deployment clobber).
1094                                let dns_override = overlay_guard
1095                                    .dns_domain()
1096                                    .and_then(|zone| self.dns_search_domain(zone));
1097                                match overlay_guard
1098                                    .attach_container_guest(
1099                                        &cid,
1100                                        &self.service_name,
1101                                        true,
1102                                        self.isolation_network(),
1103                                        dns_override,
1104                                    )
1105                                    .await
1106                                {
1107                                    Ok(cfg) => {
1108                                        let ip = cfg.overlay_ip;
1109                                        match self.runtime.push_overlay_config(&id, &cfg).await {
1110                                            Ok(()) => Some(ip),
1111                                            Err(e) => {
1112                                                tracing::warn!(
1113                                                    container = %id,
1114                                                    error = %e,
1115                                                    "failed to push overlay config into guest; rolling back allocation"
1116                                                );
1117                                                // Don't leak the overlayd IP/peer.
1118                                                if let Err(de) =
1119                                                    overlay_guard.detach_container_guest(&cid).await
1120                                                {
1121                                                    tracing::warn!(
1122                                                        container = %id,
1123                                                        error = %de,
1124                                                        "failed to roll back guest overlay allocation"
1125                                                    );
1126                                                }
1127                                                None
1128                                            }
1129                                        }
1130                                    }
1131                                    Err(e) => {
1132                                        tracing::warn!(
1133                                            container = %id,
1134                                            error = %e,
1135                                            "failed to allocate guest overlay config from overlayd"
1136                                        );
1137                                        None
1138                                    }
1139                                }
1140                            }
1141                            // HostProxy/HostNetns don't occur on Windows.
1142                            _ => None,
1143                        }
1144                    };
1145                    #[cfg(not(target_os = "windows"))]
1146                    let attach_result: Option<std::net::IpAddr> = {
1147                        match self.runtime.overlay_attach_kind_for(&id).await {
1148                            // VM guest (macOS VZ-Linux): no host netns/PID, so
1149                            // overlayd allocates the overlay identity and we push
1150                            // it into the RUNNING guest over vsock, where it brings
1151                            // up its own kernel WireGuard device.
1152                            crate::runtime::OverlayAttachKind::GuestManaged => {
1153                                let cid = id.to_string();
1154                                // Per-deployment resolv.conf search domain so the
1155                                // guest's bare `<svc>`/`<svc>.service` resolves to
1156                                // THIS deployment (no cross-deployment clobber).
1157                                let dns_override = overlay_guard
1158                                    .dns_domain()
1159                                    .and_then(|zone| self.dns_search_domain(zone));
1160                                match overlay_guard
1161                                    .attach_container_guest(
1162                                        &cid,
1163                                        &self.service_name,
1164                                        true,
1165                                        self.isolation_network(),
1166                                        dns_override,
1167                                    )
1168                                    .await
1169                                {
1170                                    Ok(cfg) => {
1171                                        let ip = cfg.overlay_ip;
1172                                        match self.runtime.push_overlay_config(&id, &cfg).await {
1173                                            Ok(()) => Some(ip),
1174                                            Err(e) => {
1175                                                tracing::warn!(
1176                                                    container = %id,
1177                                                    error = %e,
1178                                                    "failed to push overlay config into guest; rolling back allocation"
1179                                                );
1180                                                // Don't leak the overlayd IP/peer.
1181                                                if let Err(de) =
1182                                                    overlay_guard.detach_container_guest(&cid).await
1183                                                {
1184                                                    tracing::warn!(
1185                                                        container = %id,
1186                                                        error = %de,
1187                                                        "failed to roll back guest overlay allocation"
1188                                                    );
1189                                                }
1190                                                None
1191                                            }
1192                                        }
1193                                    }
1194                                    Err(e) => {
1195                                        tracing::warn!(
1196                                            container = %id,
1197                                            error = %e,
1198                                            "failed to allocate guest overlay config from overlayd"
1199                                        );
1200                                        None
1201                                    }
1202                                }
1203                            }
1204                            // Host-shared native runtime (Seatbelt / native-VZ /
1205                            // libkrun): overlayd allocates a distinct overlay /32
1206                            // from the node slice + utun alias; the runtime
1207                            // forwards <overlay_ip>:port to the container's local
1208                            // delivery.
1209                            crate::runtime::OverlayAttachKind::HostProxy => {
1210                                let dns_override = overlay_guard
1211                                    .dns_domain()
1212                                    .and_then(|zone| self.dns_search_domain(zone));
1213                                match overlay_guard
1214                                    .attach_container_host_shared(
1215                                        &id.to_string(),
1216                                        &self.service_name,
1217                                        false,
1218                                        self.isolation_network(),
1219                                        dns_override,
1220                                    )
1221                                    .await
1222                                {
1223                                    Ok(ip) => {
1224                                        if let Err(e) =
1225                                            self.runtime.attach_overlay_ip(&id, ip).await
1226                                        {
1227                                            tracing::warn!(
1228                                                container = %id,
1229                                                error = %e,
1230                                                "failed to start host-shared overlay forwarders"
1231                                            );
1232                                        }
1233                                        Some(ip)
1234                                    }
1235                                    Err(e) => {
1236                                        tracing::warn!(
1237                                            container = %id,
1238                                            error = %e,
1239                                            "failed to attach host-shared container to overlay"
1240                                        );
1241                                        None
1242                                    }
1243                                }
1244                            }
1245                            // Host-process runtimes (Linux youki): already
1246                            // attached in the created state above.
1247                            _ => host_overlay_ip,
1248                        }
1249                    };
1250
1251                    if let Some(ip) = attach_result {
1252                        tracing::info!(
1253                            container = %id,
1254                            overlay_ip = %ip,
1255                            "attached container to overlay network"
1256                        );
1257
1258                        // Register DNS for service discovery.
1259                        if let Some(dns) = &self.dns_server {
1260                            for hostname in self.dns_hostnames(&id) {
1261                                match dns.add_record(&hostname, ip).await {
1262                                    Ok(()) => tracing::debug!(
1263                                        hostname = %hostname,
1264                                        ip = %ip,
1265                                        "registered service-discovery DNS record"
1266                                    ),
1267                                    Err(e) => tracing::warn!(
1268                                        hostname = %hostname,
1269                                        error = %e,
1270                                        "failed to register service-discovery DNS record"
1271                                    ),
1272                                }
1273                            }
1274
1275                            // Register external service domains (vhosts) so a
1276                            // client resolving `<host>` lands on an
1277                            // ingress-capable node, whose 80/443 proxy fans out
1278                            // to this service's overlay-IP backends anywhere in
1279                            // the mesh. This is ADDITIONAL to the
1280                            // deployment-scoped service-discovery records above.
1281                            self.register_external_domains(dns).await;
1282                        }
1283
1284                        Some(ip)
1285                    } else {
1286                        None
1287                    }
1288                } else {
1289                    None
1290                };
1291
1292                // If overlay failed, try the container runtime's own IP as fallback
1293                let effective_ip = if overlay_ip.is_none() {
1294                    match self.runtime.get_container_ip(&id).await {
1295                        Ok(Some(ip)) => {
1296                            tracing::info!(
1297                                container = %id,
1298                                ip = %ip,
1299                                "using runtime container IP for proxy (overlay unavailable)"
1300                            );
1301                            Some(ip)
1302                        }
1303                        Ok(None) => {
1304                            tracing::warn!(
1305                                container = %id,
1306                                "no container IP available from runtime, proxy routing will be unavailable"
1307                            );
1308                            None
1309                        }
1310                        Err(e) => {
1311                            tracing::warn!(
1312                                container = %id,
1313                                error = %e,
1314                                "failed to get container IP from runtime"
1315                            );
1316                            None
1317                        }
1318                    }
1319                } else {
1320                    overlay_ip
1321                };
1322
1323                tracing::info!(
1324                    container = %id,
1325                    service = %self.service_name,
1326                    overlay_ip = ?overlay_ip,
1327                    effective_ip = ?effective_ip,
1328                    "Container IP resolution complete"
1329                );
1330
1331                // Query port override from the runtime.
1332                // On macOS sandbox, each container is assigned a unique port since
1333                // all processes share the host network (no network namespaces).
1334                // The runtime passes the port to the process via the PORT env var.
1335                let port_override = match self.runtime.get_container_port_override(&id).await {
1336                    Ok(Some(port)) => {
1337                        tracing::info!(
1338                            container = %id,
1339                            port = port,
1340                            "runtime assigned dynamic port override for this container"
1341                        );
1342                        Some(port)
1343                    }
1344                    Ok(None) => None,
1345                    Err(e) => {
1346                        tracing::warn!(
1347                            container = %id,
1348                            error = %e,
1349                            "failed to query port override from runtime, using spec port"
1350                        );
1351                        None
1352                    }
1353                };
1354
1355                // Start health monitoring and store handle (no lock needed during start)
1356                let health_monitor_handle = {
1357                    let mut check = self.spec.health.check.clone();
1358
1359                    // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
1360                    // port the container is listening on. With mac-sandbox, each
1361                    // replica gets a unique assigned port via port_override.
1362                    if let HealthCheck::Tcp { ref mut port } = check {
1363                        if *port == 0 {
1364                            *port = port_override.unwrap_or_else(|| {
1365                                self.spec
1366                                    .endpoints
1367                                    .iter()
1368                                    .find(|ep| {
1369                                        matches!(
1370                                            ep.protocol,
1371                                            Protocol::Http | Protocol::Https | Protocol::Websocket
1372                                        )
1373                                    })
1374                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
1375                            });
1376                        }
1377                    }
1378
1379                    let start_grace = self
1380                        .spec
1381                        .health
1382                        .start_grace
1383                        .unwrap_or(Duration::from_secs(5));
1384                    let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
1385                    let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
1386                    let retries = self.spec.health.retries;
1387
1388                    let checker = HealthChecker::new(check, effective_ip);
1389                    let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
1390                        .with_start_grace(start_grace)
1391                        .with_check_timeout(check_timeout);
1392
1393                    // Build the optional proxy backend handle. This is only present
1394                    // when both a proxy manager AND a reachable overlay IP exist; in
1395                    // degraded-overlay / no-proxy deployments it stays None and the
1396                    // callback below skips all proxy work while STILL bridging health
1397                    // state back into ServiceManager.
1398                    let proxy_backend: Option<(Arc<ProxyManager>, SocketAddr)> =
1399                        if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
1400                            let proxy = Arc::clone(proxy);
1401                            // Get the container's target port, using the runtime override if
1402                            // present. On macOS sandbox, port_override gives each replica a
1403                            // unique port so the proxy can distinguish backends sharing
1404                            // 127.0.0.1.
1405                            let port = port_override.unwrap_or_else(|| {
1406                                self.spec
1407                                    .endpoints
1408                                    .iter()
1409                                    .find(|ep| {
1410                                        matches!(
1411                                            ep.protocol,
1412                                            Protocol::Http | Protocol::Https | Protocol::Websocket
1413                                        )
1414                                    })
1415                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
1416                            });
1417
1418                            let backend_addr = SocketAddr::new(ip, port);
1419
1420                            // Register backend with load balancer so proxy can route to it.
1421                            // This must happen before the health callback is created, because
1422                            // update_backend_health only updates *existing* backends.
1423                            proxy.add_backend(&self.service_name, backend_addr).await;
1424
1425                            // Publish this container's exposed ports on the node
1426                            // loopback (`127.0.0.1:<endpoint.port>`) so a consumer
1427                            // sharing the node loopback can reach the service at
1428                            // `localhost:<port>`. Gated on the spec's policy
1429                            // (`Auto` publishes only for single-member services).
1430                            // Uses the SAME runtime-resolved `ip`/`port_override`
1431                            // as the backend above: on macOS each replica shares
1432                            // 127.0.0.1 with a unique override; on Linux/VM the
1433                            // overlay IP carries the declared target port.
1434                            if self.spec.publish_to_node_loopback() {
1435                                if let Err(e) = proxy
1436                                    .publish_loopback_for_container(
1437                                        self.deployment.as_deref(),
1438                                        &self.service_name,
1439                                        &self.spec,
1440                                        ip,
1441                                        port_override,
1442                                    )
1443                                    .await
1444                                {
1445                                    // A host-port conflict means a DIFFERENT
1446                                    // deployment/service already owns this
1447                                    // published port; refuse to cross-wire
1448                                    // (Bug 7) and surface it at deploy time.
1449                                    tracing::error!(
1450                                        service = %self.service_name,
1451                                        error = %e,
1452                                        "Failed to publish container ports on node loopback"
1453                                    );
1454                                    return Err(e);
1455                                }
1456                            }
1457
1458                            // Publish explicit Docker-style port mappings
1459                            // (`host_ip:host_port -> container_port`). Unlike the
1460                            // loopback publish above this is NOT gated on
1461                            // `publish_to_node_loopback()` and is driven by
1462                            // `port_mappings` rather than endpoints, so a workload
1463                            // with port mappings and no endpoints still gets a host
1464                            // listener. Backend port mirrors the loopback choice:
1465                            // the runtime `port_override` on macOS sandbox, else the
1466                            // mapping's `container_port`.
1467                            for mapping in &self.spec.port_mappings {
1468                                let pm_backend = SocketAddr::new(
1469                                    ip,
1470                                    port_override.unwrap_or(mapping.container_port),
1471                                );
1472                                if let Err(e) = proxy
1473                                    .publish_port_mapping(
1474                                        self.deployment.as_deref(),
1475                                        &self.service_name,
1476                                        mapping,
1477                                        pm_backend,
1478                                    )
1479                                    .await
1480                                {
1481                                    tracing::error!(
1482                                        service = %self.service_name,
1483                                        host_port = ?mapping.host_port,
1484                                        container_port = mapping.container_port,
1485                                        error = %e,
1486                                        "Failed to publish container port mapping"
1487                                    );
1488                                    return Err(e);
1489                                }
1490                            }
1491
1492                            Some((proxy, backend_addr))
1493                        } else {
1494                            None
1495                        };
1496
1497                    // The health bridge is ALWAYS attached, independent of proxy/IP
1498                    // availability. stabilization::wait_for_stabilization only treats a
1499                    // service as ready when health_states[name] == Healthy, so this write
1500                    // must happen even when the overlay is degraded and no proxy backend
1501                    // exists — otherwise the service stays healthy=false forever and
1502                    // stabilization times out.
1503                    let health_states_opt = self.health_states.clone();
1504                    let svc_name_for_states = self.service_name.clone();
1505                    let svc_name_for_proxy = self.service_name.clone();
1506                    let svc_name_for_log = self.service_name.clone();
1507
1508                    let health_callback: HealthCallback =
1509                        Arc::new(move |container_id: ContainerId, is_healthy: bool| {
1510                            tracing::info!(
1511                                container = %container_id,
1512                                service = %svc_name_for_log,
1513                                healthy = is_healthy,
1514                                has_proxy_backend = proxy_backend.is_some(),
1515                                "health status changed"
1516                            );
1517
1518                            // Always bridge health state back to ServiceManager's
1519                            // health_states map (unconditional — no proxy/IP required).
1520                            if let Some(ref health_states) = health_states_opt {
1521                                let states = Arc::clone(health_states);
1522                                let svc = svc_name_for_states.clone();
1523                                tokio::spawn(async move {
1524                                    let state = if is_healthy {
1525                                        HealthState::Healthy
1526                                    } else {
1527                                        HealthState::Unhealthy {
1528                                            failures: 0,
1529                                            reason: "health check failed".into(),
1530                                        }
1531                                    };
1532                                    states.write().await.insert(svc, state);
1533                                });
1534                            }
1535
1536                            // Update proxy backend health only when a proxy backend was
1537                            // registered (proxy manager + reachable overlay IP present).
1538                            if let Some((proxy, backend_addr)) = proxy_backend.clone() {
1539                                let svc = svc_name_for_proxy.clone();
1540                                tokio::spawn(async move {
1541                                    proxy
1542                                        .update_backend_health(&svc, backend_addr, is_healthy)
1543                                        .await;
1544                                });
1545                            }
1546                        });
1547
1548                    monitor = monitor.with_callback(health_callback);
1549
1550                    monitor.start()
1551                };
1552
1553                // Update state (short write lock)
1554                {
1555                    let mut containers = self.containers.write().await;
1556                    containers.insert(
1557                        id.clone(),
1558                        Container {
1559                            id: id.clone(),
1560                            image: self.spec.image.name.to_string(),
1561                            state: ContainerState::Running,
1562                            // Record the init PID captured at start so the
1563                            // scale-down detach can release the overlay IP +
1564                            // veth even after the container has exited (a re-
1565                            // query via `get_container_pid` returns None for a
1566                            // dead container, so the detach — and the per-
1567                            // container `/28` IP release — would be skipped,
1568                            // leaking the IP until the whole service is torn
1569                            // down).
1570                            pid: container_pid,
1571                            task: None,
1572                            overlay_ip: effective_ip,
1573                            health_monitor: Some(health_monitor_handle),
1574                            port_override,
1575                        },
1576                    );
1577                } // Lock released here
1578            }
1579        }
1580
1581        // Phase 3: Scale down - remove containers (short write lock per removal)
1582        //
1583        // Containers were created with `with_role_and_node(role, local_node_id)`
1584        // on scale-up, so we must reconstruct the same identity on scale-down
1585        // — the role is derived from `replica_groups` via `role_for_replica`
1586        // and the node id is the local cluster node. Mismatched ids would miss
1587        // the live entry in `self.containers` and leak the container.
1588        if replicas < current_replicas {
1589            for i in replicas..current_replicas {
1590                let replica_idx = i + 1;
1591                let id = ContainerId::with_role_and_node(
1592                    self.service_name.clone(),
1593                    replica_idx,
1594                    self.role_for_replica(replica_idx),
1595                    local_node_id,
1596                );
1597
1598                // Remove from state first and get the container to abort health monitor (short write lock)
1599                let removed_container = {
1600                    let mut containers = self.containers.write().await;
1601                    containers.remove(&id)
1602                }; // Lock released here
1603
1604                // Then perform cleanup (no lock held - I/O operations)
1605                if let Some(container) = removed_container {
1606                    // Abort the health monitor task if it exists
1607                    if let Some(handle) = container.health_monitor {
1608                        handle.abort();
1609                    }
1610
1611                    // Unpublish this container's node-loopback ports (mirror of
1612                    // the publish in the start path above). Recomputes the same
1613                    // backend from the container's stored runtime-resolved IP and
1614                    // port override; the last replica's removal frees the
1615                    // loopback listener. Gated identically to publish.
1616                    if self.spec.publish_to_node_loopback() {
1617                        if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip)
1618                        {
1619                            proxy
1620                                .unpublish_loopback_for_container(
1621                                    &self.spec,
1622                                    ip,
1623                                    container.port_override,
1624                                )
1625                                .await;
1626                        }
1627                    }
1628
1629                    // Unpublish explicit port mappings for this container (mirror
1630                    // of the publish in the start path). NOT gated on
1631                    // `publish_to_node_loopback()`. Only explicit host ports are
1632                    // torn down precisely; ephemeral mappings (no `host_port`) are
1633                    // freed when their listener task is aborted on shutdown.
1634                    if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip) {
1635                        for mapping in &self.spec.port_mappings {
1636                            if let Some(hp) = mapping.host_port.filter(|p| *p != 0) {
1637                                let pm_backend = SocketAddr::new(
1638                                    ip,
1639                                    container.port_override.unwrap_or(mapping.container_port),
1640                                );
1641                                proxy
1642                                    .unpublish_port_mapping(hp, mapping.protocol, pm_backend)
1643                                    .await;
1644                            }
1645                        }
1646                    }
1647
1648                    // Remove DNS records for this container
1649                    if let Some(dns) = &self.dns_server {
1650                        // Remove replica-specific DNS entry
1651                        let replica_hostname =
1652                            format!("{}.{}.service.local", id.replica, self.service_name);
1653                        if let Err(e) = dns.remove_record(&replica_hostname).await {
1654                            tracing::warn!(
1655                                hostname = %replica_hostname,
1656                                error = %e,
1657                                "failed to remove replica DNS record"
1658                            );
1659                        } else {
1660                            tracing::debug!(
1661                                hostname = %replica_hostname,
1662                                "removed replica DNS record"
1663                            );
1664                        }
1665
1666                        // Remove per-role DNS entry if this was a non-default group.
1667                        // Note: this is best-effort and removes the record even if
1668                        // other replicas in the same role still need it — the DNS
1669                        // server's add/remove API is single-record so we can't keep
1670                        // it alive for siblings. P2.3-bis (round-robin per-role)
1671                        // can fix this later via a per-role refcount; for now the
1672                        // service-level hostname keeps cluster-internal clients
1673                        // working even when the role-specific record briefly
1674                        // disappears.
1675                        if id.role != "default" {
1676                            let role_hostname =
1677                                format!("{}.{}.service.local", id.role, self.service_name);
1678                            if let Err(e) = dns.remove_record(&role_hostname).await {
1679                                tracing::warn!(
1680                                    hostname = %role_hostname,
1681                                    error = %e,
1682                                    "failed to remove role DNS record"
1683                                );
1684                            } else {
1685                                tracing::debug!(
1686                                    hostname = %role_hostname,
1687                                    "removed role DNS record"
1688                                );
1689                            }
1690                        }
1691
1692                        // Note: We don't remove the service-level hostname here because
1693                        // other replicas may still be using it. The service-level record
1694                        // should be cleaned up when the entire service is removed.
1695                    }
1696
1697                    // Detach from overlay network if manager available.
1698                    //
1699                    // Done BEFORE stop_container because:
1700                    //   - The container init process must still be in
1701                    //     /proc to look up its PID via `get_container_pid`.
1702                    //   - `OverlayManager::detach_container` deletes host-side
1703                    //     veth interfaces by name (`veth-<pid>-*`) and
1704                    //     releases the allocated overlay IPs back to the
1705                    //     per-node slice. Without this the IPs leak across
1706                    //     container churn and the slice exhausts.
1707                    //
1708                    // Best-effort: failures are logged but never abort the
1709                    // scale-down. The periodic orphan sweep
1710                    // (`start_periodic_orphan_sweep`) catches anything we
1711                    // missed.
1712                    if let Some(overlay) = &self.overlay_manager {
1713                        // VM guests have no host veth/PID — release the overlayd
1714                        // allocation (IP + registered mesh peer) by container id
1715                        // instead of by PID.
1716                        let detach_kind = self.runtime.overlay_attach_kind_for(&id).await;
1717                        if detach_kind == crate::runtime::OverlayAttachKind::HostProxy {
1718                            let overlay_guard = overlay.read().await;
1719                            if let Err(e) = overlay_guard
1720                                .detach_container_host_shared(&id.to_string())
1721                                .await
1722                            {
1723                                tracing::warn!(
1724                                    container = %id,
1725                                    error = %e,
1726                                    "overlay detach_container_host_shared failed; relying on orphan sweep"
1727                                );
1728                            }
1729                            if let Err(e) = self.runtime.detach_overlay_ip(&id).await {
1730                                tracing::warn!(
1731                                    container = %id,
1732                                    error = %e,
1733                                    "failed to stop host-shared overlay forwarders"
1734                                );
1735                            }
1736                        } else if detach_kind == crate::runtime::OverlayAttachKind::GuestManaged {
1737                            let overlay_guard = overlay.read().await;
1738                            if let Err(e) =
1739                                overlay_guard.detach_container_guest(&id.to_string()).await
1740                            {
1741                                tracing::warn!(
1742                                    container = %id,
1743                                    error = %e,
1744                                    "overlay detach_container_guest failed; relying on orphan sweep"
1745                                );
1746                            }
1747                        } else if detach_kind == crate::runtime::OverlayAttachKind::HostIp {
1748                            // HCS: overlay teardown happens inside overlayd at
1749                            // container remove-time; nothing to detach here (and the
1750                            // Windows pid is not a Linux netns PID).
1751                            tracing::debug!(
1752                                container = %id,
1753                                "HostIp (HCS) detach handled at remove-time; skipping"
1754                            );
1755                        } else if let Some(pid) = container.pid {
1756                            // Detach by the PID recorded AT START, not a live
1757                            // re-query: a crashed/exited container has no live
1758                            // PID, but overlayd still holds its attachment
1759                            // (keyed by that PID) and the host-side veth
1760                            // (`veth-<pid>-s`) — so detaching by the stored PID
1761                            // releases the per-container `/28` IP + deletes the
1762                            // veth, instead of leaking them.
1763                            let overlay_guard = overlay.read().await;
1764                            if let Err(e) = overlay_guard.detach_container(pid).await {
1765                                tracing::warn!(
1766                                    container = %id,
1767                                    pid,
1768                                    error = %e,
1769                                    "overlay detach_container failed"
1770                                );
1771                            }
1772                        } else {
1773                            tracing::debug!(
1774                                container = %id,
1775                                "no recorded PID for overlay detach (never attached or non-Linux runtime)"
1776                            );
1777                        }
1778                    }
1779
1780                    // Stop container
1781                    self.runtime
1782                        .stop_container(&id, Duration::from_secs(30))
1783                        .await?;
1784
1785                    // Sync volumes to S3 before removal (no-op if not configured)
1786                    if let Err(e) = self.runtime.sync_container_volumes(&id).await {
1787                        tracing::warn!(
1788                            container = %id,
1789                            error = %e,
1790                            "failed to sync volumes before removal"
1791                        );
1792                    }
1793
1794                    // Remove container
1795                    self.runtime.remove_container(&id).await?;
1796
1797                    // Revoke the container's scoped token (best-effort). Recompute
1798                    // the deterministic jti the runtime minted under:
1799                    // `container:<service>:<service>-<replica>`.
1800                    if let Some(sink) = self.token_sink.as_ref() {
1801                        sink.revoke(&format!(
1802                            "container:{}:{}-{}",
1803                            id.service, id.service, id.replica
1804                        ))
1805                        .await;
1806                    }
1807                }
1808            }
1809        }
1810
1811        Ok(())
1812    }
1813
1814    /// Get current number of replicas
1815    pub async fn replica_count(&self) -> usize {
1816        self.containers.read().await.len()
1817    }
1818
1819    /// Get all container IDs
1820    pub async fn container_ids(&self) -> Vec<ContainerId> {
1821        self.containers.read().await.keys().cloned().collect()
1822    }
1823
1824    /// Get per-container info (id, image, state, pid, overlay IP) for every
1825    /// live container in this instance.
1826    ///
1827    /// Surfaces the REAL image reference each container was created from and its
1828    /// REAL lifecycle state (lowercased via [`ContainerState::as_str`]) so the
1829    /// API/`ps` no longer reports a hardcoded `"running"` with no image.
1830    pub async fn container_infos(&self) -> Vec<ContainerInfo> {
1831        self.containers
1832            .read()
1833            .await
1834            .values()
1835            .map(|c| ContainerInfo {
1836                id: c.id.clone(),
1837                image: c.image.clone(),
1838                state: c.state.as_str().to_string(),
1839                pid: c.pid,
1840                overlay_ip: c.overlay_ip.map(|ip| ip.to_string()),
1841            })
1842            .collect()
1843    }
1844
1845    /// Get read access to the containers map
1846    ///
1847    /// This allows callers to access container overlay IPs and other metadata
1848    /// without copying the entire map.
1849    pub fn containers(
1850        &self,
1851    ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
1852        &self.containers
1853    }
1854
1855    /// Check if this service instance has an overlay manager configured
1856    pub fn has_overlay_manager(&self) -> bool {
1857        self.overlay_manager.is_some()
1858    }
1859
1860    /// Check if this service instance has a proxy manager configured
1861    pub fn has_proxy_manager(&self) -> bool {
1862        self.proxy_manager.is_some()
1863    }
1864
1865    /// Get the proxy manager for this instance, if configured.
1866    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1867        self.proxy_manager.as_ref()
1868    }
1869
1870    /// Check if this service instance has a DNS server configured
1871    pub fn has_dns_server(&self) -> bool {
1872        self.dns_server.is_some()
1873    }
1874}
1875
1876/// Per-container summary surfaced to callers (API / `ps`).
1877///
1878/// Carries the REAL image reference and lifecycle state of a single live
1879/// container, replacing the previous id-only view that forced the API to
1880/// fabricate a hardcoded `"running"` state with no image.
1881#[derive(Debug, Clone)]
1882pub struct ContainerInfo {
1883    /// Container identity.
1884    pub id: ContainerId,
1885    /// Image reference the container was created from (canonical form).
1886    pub image: String,
1887    /// Lowercased lifecycle state (e.g. `"running"`, `"exited"`).
1888    pub state: String,
1889    /// Process ID, when the container is running.
1890    pub pid: Option<u32>,
1891    /// Overlay IP rendered as a string, when assigned.
1892    pub overlay_ip: Option<String>,
1893}
1894
1895/// A live deployment container enriched for Docker-compat `ps` rows and for
1896/// name resolution. Produced by [`ServiceManager::list_container_views`].
1897#[derive(Debug, Clone)]
1898pub struct DeploymentContainerView {
1899    /// Deployment (compose project) name, when known.
1900    pub deployment: Option<String>,
1901    /// Service name within the deployment.
1902    pub service: String,
1903    /// Concrete container identity.
1904    pub container_id: ContainerId,
1905    /// Compose `container_name:` (the user-facing Docker name), when set.
1906    pub container_name: Option<String>,
1907    /// Image reference the container was created from.
1908    pub image: String,
1909    /// Lowercased lifecycle state (e.g. `"running"`).
1910    pub state: String,
1911    /// Process id when running.
1912    pub pid: Option<u32>,
1913    /// The service's published port mappings.
1914    pub ports: Vec<zlayer_spec::PortMapping>,
1915}
1916
1917/// Service manager for multiple services
1918pub struct ServiceManager {
1919    runtime: Arc<dyn Runtime + Send + Sync>,
1920    services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
1921    scale_semaphore: Arc<Semaphore>,
1922    /// Overlay network manager for container networking
1923    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1924    /// Stream registry for L4 proxy route registration (TCP/UDP)
1925    stream_registry: Option<Arc<StreamRegistry>>,
1926    /// Proxy manager for health-aware load balancing (hyper-based proxy)
1927    proxy_manager: Option<Arc<ProxyManager>>,
1928    /// DNS server for service discovery
1929    dns_server: Option<Arc<DnsServer>>,
1930    /// Container-injectable overlay resolver IP. When set, new service
1931    /// instances inject `<ip>` into their `spec.dns` so containers resolve
1932    /// through the overlay DNS server (bound on `<ip>:53`) rather than the
1933    /// hijacked host resolv.conf.
1934    container_dns: Option<IpAddr>,
1935    /// Deployment name (used for generating hostnames)
1936    deployment_name: Option<String>,
1937    /// Health states for dependency condition checking
1938    health_states: Arc<RwLock<HashMap<String, HealthState>>>,
1939    /// Job executor for run-to-completion workloads
1940    job_executor: Option<Arc<JobExecutor>>,
1941    /// Cron scheduler for time-based job triggers
1942    cron_scheduler: Option<Arc<CronScheduler>>,
1943    /// Container supervisor for crash/panic policy enforcement
1944    container_supervisor: Option<Arc<ContainerSupervisor>>,
1945    /// Cluster membership + dispatch handle. When `None`, scale operations
1946    /// run purely local (single-node mode). When `Some`, `scale_service`
1947    /// routes through the cluster (leader dispatches to peers; followers
1948    /// forward to the leader).
1949    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1950    /// Whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
1951    /// `NodeConfig.ingress`). Threaded onto each `ServiceInstance` so external
1952    /// service domains resolve to this node's overlay IP when it is the funnel.
1953    /// Defaults to `false`; set by the daemon from `NodeConfig.ingress`.
1954    ingress: bool,
1955    /// Sink for persisting/revoking per-container scoped tokens. `None`
1956    /// disables persistence (token minted without a `jti`, not revocable).
1957    token_sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>,
1958    /// Sink that persists each service's resolved image digest into the
1959    /// deployment store after a successful pull (threaded onto every
1960    /// `ServiceInstance`). `None` disables digest persistence.
1961    digest_sink: Option<Arc<dyn crate::auth::DeploymentDigestSink>>,
1962    /// Registry credential store threaded onto every `ServiceInstance` so the
1963    /// supervisor can resolve `zlayer login` credentials + `~/.docker/config.json`
1964    /// for a service's image on recreate (restore / drift / scale). `None` =
1965    /// not wired (standalone) → anonymous / docker-config pulls, as before.
1966    registry_store: Option<RegistryCredentialStoreHandle>,
1967}
1968
1969// ---------------------------------------------------------------------------
1970// ServiceManagerBuilder
1971// ---------------------------------------------------------------------------
1972
1973/// Builder for constructing a [`ServiceManager`] with optional subsystems.
1974///
1975/// Prefer using `ServiceManager::builder(runtime)` to start building.
1976///
1977/// # Example
1978///
1979/// ```ignore
1980/// let manager = ServiceManager::builder(runtime)
1981///     .overlay_manager(om)
1982///     .proxy_manager(proxy)
1983///     .deployment_name("prod")
1984///     .build();
1985/// ```
1986pub struct ServiceManagerBuilder {
1987    runtime: Arc<dyn Runtime + Send + Sync>,
1988    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1989    proxy_manager: Option<Arc<ProxyManager>>,
1990    stream_registry: Option<Arc<StreamRegistry>>,
1991    dns_server: Option<Arc<DnsServer>>,
1992    container_dns: Option<IpAddr>,
1993    deployment_name: Option<String>,
1994    job_executor: Option<Arc<JobExecutor>>,
1995    cron_scheduler: Option<Arc<CronScheduler>>,
1996    container_supervisor: Option<Arc<ContainerSupervisor>>,
1997    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1998}
1999
2000impl ServiceManagerBuilder {
2001    /// Create a new builder with the required runtime.
2002    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
2003        Self {
2004            runtime,
2005            overlay_manager: None,
2006            proxy_manager: None,
2007            stream_registry: None,
2008            dns_server: None,
2009            container_dns: None,
2010            deployment_name: None,
2011            job_executor: None,
2012            cron_scheduler: None,
2013            container_supervisor: None,
2014            cluster: None,
2015        }
2016    }
2017
2018    /// Set the overlay network manager for container networking.
2019    #[must_use]
2020    pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
2021        self.overlay_manager = Some(om);
2022        self
2023    }
2024
2025    /// Set the proxy manager for health-aware load balancing.
2026    #[must_use]
2027    pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
2028        self.proxy_manager = Some(pm);
2029        self
2030    }
2031
2032    /// Set the stream registry for TCP/UDP L4 proxy route registration.
2033    #[must_use]
2034    pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
2035        self.stream_registry = Some(sr);
2036        self
2037    }
2038
2039    /// Set the DNS server for service discovery.
2040    #[must_use]
2041    pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
2042        self.dns_server = Some(dns);
2043        self
2044    }
2045
2046    /// Set the container-injectable overlay resolver IP.
2047    ///
2048    /// The daemon passes the IP it bound the overlay DNS server on at port 53
2049    /// (see `daemon.rs` Phase 4). New service instances inject it into
2050    /// `spec.dns` so containers resolve through the overlay instead of the
2051    /// hijacked host resolv.conf. The port is implicitly 53 (resolv.conf has no
2052    /// port syntax), which is why only the bare IP is threaded here.
2053    #[must_use]
2054    pub fn container_dns(mut self, ip: IpAddr) -> Self {
2055        self.container_dns = Some(ip);
2056        self
2057    }
2058
2059    /// Set the deployment name (used for hostname generation).
2060    #[must_use]
2061    pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
2062        self.deployment_name = Some(name.into());
2063        self
2064    }
2065
2066    /// Set the job executor for run-to-completion workloads.
2067    #[must_use]
2068    pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
2069        self.job_executor = Some(je);
2070        self
2071    }
2072
2073    /// Set the cron scheduler for time-based job triggers.
2074    #[must_use]
2075    pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
2076        self.cron_scheduler = Some(cs);
2077        self
2078    }
2079
2080    /// Set the container supervisor for crash/panic policy enforcement.
2081    #[must_use]
2082    pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
2083        self.container_supervisor = Some(cs);
2084        self
2085    }
2086
2087    /// Set the cluster membership + dispatch handle. When set,
2088    /// [`ServiceManager::scale_service`] will route through the cluster
2089    /// (leader dispatches to peers; followers forward to the leader).
2090    /// When unset (the default), scale operations remain local-only.
2091    #[must_use]
2092    pub fn cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
2093        self.cluster = Some(cluster);
2094        self
2095    }
2096
2097    /// Consume the builder and produce a fully-wired [`ServiceManager`].
2098    ///
2099    /// Logs warnings for missing recommended subsystems (proxy,
2100    /// `stream_registry`, `container_supervisor`, `deployment_name`).
2101    pub fn build(self) -> ServiceManager {
2102        if self.proxy_manager.is_none() {
2103            tracing::warn!("ServiceManager built without proxy_manager");
2104        }
2105        if self.stream_registry.is_none() {
2106            tracing::warn!("ServiceManager built without stream_registry");
2107        }
2108        if self.container_supervisor.is_none() {
2109            tracing::warn!("ServiceManager built without container_supervisor");
2110        }
2111        if self.deployment_name.is_none() {
2112            tracing::warn!("ServiceManager built without deployment_name");
2113        }
2114
2115        ServiceManager {
2116            runtime: self.runtime,
2117            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2118            scale_semaphore: Arc::new(Semaphore::new(10)),
2119            overlay_manager: self.overlay_manager,
2120            stream_registry: self.stream_registry,
2121            proxy_manager: self.proxy_manager,
2122            dns_server: self.dns_server,
2123            container_dns: self.container_dns,
2124            deployment_name: self.deployment_name,
2125            health_states: Arc::new(RwLock::new(HashMap::new())),
2126            job_executor: self.job_executor,
2127            cron_scheduler: self.cron_scheduler,
2128            container_supervisor: self.container_supervisor,
2129            cluster: self.cluster,
2130            ingress: false,
2131            token_sink: None,
2132            digest_sink: None,
2133            registry_store: None,
2134        }
2135    }
2136}
2137
2138impl ServiceManager {
2139    /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
2140    ///
2141    /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
2142    ///
2143    /// # Example
2144    ///
2145    /// ```ignore
2146    /// let manager = ServiceManager::builder(runtime)
2147    ///     .overlay_manager(om)
2148    ///     .proxy_manager(proxy)
2149    ///     .build();
2150    /// ```
2151    pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
2152        ServiceManagerBuilder::new(runtime)
2153    }
2154
2155    /// Create a new service manager
2156    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2157    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
2158        Self {
2159            runtime,
2160            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2161            scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
2162            overlay_manager: None,
2163            stream_registry: None,
2164            proxy_manager: None,
2165            dns_server: None,
2166            container_dns: None,
2167            deployment_name: None,
2168            health_states: Arc::new(RwLock::new(HashMap::new())),
2169            job_executor: None,
2170            cron_scheduler: None,
2171            container_supervisor: None,
2172            cluster: None,
2173            ingress: false,
2174            token_sink: None,
2175            digest_sink: None,
2176            registry_store: None,
2177        }
2178    }
2179
2180    /// Create a service manager with overlay network support
2181    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2182    pub fn with_overlay(
2183        runtime: Arc<dyn Runtime + Send + Sync>,
2184        overlay_manager: Arc<RwLock<OverlayManager>>,
2185    ) -> Self {
2186        Self {
2187            runtime,
2188            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2189            scale_semaphore: Arc::new(Semaphore::new(10)),
2190            overlay_manager: Some(overlay_manager),
2191            stream_registry: None,
2192            proxy_manager: None,
2193            dns_server: None,
2194            container_dns: None,
2195            deployment_name: None,
2196            health_states: Arc::new(RwLock::new(HashMap::new())),
2197            job_executor: None,
2198            cron_scheduler: None,
2199            container_supervisor: None,
2200            cluster: None,
2201            ingress: false,
2202            token_sink: None,
2203            digest_sink: None,
2204            registry_store: None,
2205        }
2206    }
2207
2208    /// Create a fully-configured service manager with overlay and proxy support
2209    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2210    pub fn with_full_config(
2211        runtime: Arc<dyn Runtime + Send + Sync>,
2212        overlay_manager: Arc<RwLock<OverlayManager>>,
2213        deployment_name: String,
2214    ) -> Self {
2215        Self {
2216            runtime,
2217            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2218            scale_semaphore: Arc::new(Semaphore::new(10)),
2219            overlay_manager: Some(overlay_manager),
2220            stream_registry: None,
2221            proxy_manager: None,
2222            dns_server: None,
2223            container_dns: None,
2224            deployment_name: Some(deployment_name),
2225            health_states: Arc::new(RwLock::new(HashMap::new())),
2226            job_executor: None,
2227            cron_scheduler: None,
2228            container_supervisor: None,
2229            cluster: None,
2230            ingress: false,
2231            token_sink: None,
2232            digest_sink: None,
2233            registry_store: None,
2234        }
2235    }
2236
2237    /// Get the health states map for external monitoring
2238    pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
2239        Arc::clone(&self.health_states)
2240    }
2241
2242    /// Update health state for a service
2243    pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
2244        let mut states = self.health_states.write().await;
2245        states.insert(service_name.to_string(), state);
2246    }
2247
2248    /// Set the deployment name (used for generating hostnames)
2249    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2250    pub fn set_deployment_name(&mut self, name: String) {
2251        self.deployment_name = Some(name);
2252    }
2253
2254    /// Set the stream registry for L4 proxy integration (TCP/UDP)
2255    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2256    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
2257        self.stream_registry = Some(registry);
2258    }
2259
2260    /// Builder pattern: add stream registry for L4 proxy integration
2261    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2262    #[must_use]
2263    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
2264        self.stream_registry = Some(registry);
2265        self
2266    }
2267
2268    /// Get the stream registry (if configured)
2269    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
2270        self.stream_registry.as_ref()
2271    }
2272
2273    /// Set the overlay manager for container networking
2274    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2275    pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
2276        self.overlay_manager = Some(manager);
2277    }
2278
2279    /// Set the sink used to persist/revoke per-container scoped tokens.
2280    pub fn set_token_sink(&mut self, sink: Arc<dyn crate::auth::ContainerTokenSink>) {
2281        self.token_sink = Some(sink);
2282    }
2283
2284    /// Set the sink that persists each service's resolved image digest into the
2285    /// deployment store after a successful pull. Threaded onto every
2286    /// `ServiceInstance` created via `upsert_service`.
2287    pub fn set_digest_sink(&mut self, sink: Arc<dyn crate::auth::DeploymentDigestSink>) {
2288        self.digest_sink = Some(sink);
2289    }
2290
2291    /// Set the registry credential store threaded onto every `ServiceInstance`
2292    /// so the supervisor resolves `zlayer login` credentials +
2293    /// `~/.docker/config.json` for a service's image on recreate (restore /
2294    /// drift / scale). Mirrors the store wired into the API create/pull handlers.
2295    pub fn set_registry_store(&mut self, store: RegistryCredentialStoreHandle) {
2296        self.registry_store = Some(store);
2297    }
2298
2299    /// Pin a service's restore digest so its next pull resolves strictly from
2300    /// local layers (see [`ServiceInstance::set_restore_pin`]). Best-effort: a
2301    /// no-op when the service is not (yet) registered. Called by the restore
2302    /// path after `upsert_service` and before scaling.
2303    pub async fn set_service_restore_pin(&self, service: &str, digest: Option<String>) {
2304        if let Some(inst) = self.services.read().await.get(service) {
2305            inst.set_restore_pin(digest).await;
2306        }
2307    }
2308
2309    /// Set the proxy manager for health-aware load balancing
2310    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2311    pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
2312        self.proxy_manager = Some(proxy);
2313    }
2314
2315    /// Builder pattern: add proxy manager for health-aware load balancing
2316    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2317    #[must_use]
2318    pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
2319        self.proxy_manager = Some(proxy);
2320        self
2321    }
2322
2323    /// Get the proxy manager (if configured)
2324    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
2325        self.proxy_manager.as_ref()
2326    }
2327
2328    /// Set the DNS server for service discovery
2329    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2330    pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
2331        self.dns_server = Some(dns);
2332    }
2333
2334    /// Builder pattern: add DNS server for service discovery
2335    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2336    #[must_use]
2337    pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
2338        self.dns_server = Some(dns);
2339        self
2340    }
2341
2342    /// Get the DNS server (if configured)
2343    pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
2344        self.dns_server.as_ref()
2345    }
2346
2347    /// Set the job executor for run-to-completion workloads
2348    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2349    pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
2350        self.job_executor = Some(executor);
2351    }
2352
2353    /// Set the cron scheduler for time-based job triggers
2354    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2355    pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
2356        self.cron_scheduler = Some(scheduler);
2357    }
2358
2359    /// Builder pattern: add job executor
2360    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2361    #[must_use]
2362    pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
2363        self.job_executor = Some(executor);
2364        self
2365    }
2366
2367    /// Builder pattern: add cron scheduler
2368    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2369    #[must_use]
2370    pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
2371        self.cron_scheduler = Some(scheduler);
2372        self
2373    }
2374
2375    /// Set the cluster handle for cluster-aware scaling.
2376    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2377    pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
2378        self.cluster = Some(cluster);
2379    }
2380
2381    /// Builder pattern: add a cluster handle for cluster-aware scaling.
2382    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2383    #[must_use]
2384    pub fn with_cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
2385        self.cluster = Some(cluster);
2386        self
2387    }
2388
2389    /// Get the cluster handle (if configured).
2390    pub fn cluster(&self) -> Option<&Arc<dyn zlayer_scheduler::cluster::Cluster>> {
2391        self.cluster.as_ref()
2392    }
2393
2394    /// Set whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
2395    /// `NodeConfig.ingress`). Threaded onto each new `ServiceInstance` so its
2396    /// external service domains resolve to this node's overlay IP when it is
2397    /// the funnel. Defaults to `false`.
2398    pub fn set_ingress(&mut self, enabled: bool) {
2399        self.ingress = enabled;
2400    }
2401
2402    /// Whether THIS node holds the standing HTTP/HTTPS ingress.
2403    #[must_use]
2404    pub fn ingress(&self) -> bool {
2405        self.ingress
2406    }
2407
2408    /// Get the job executor (if configured)
2409    pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
2410        self.job_executor.as_ref()
2411    }
2412
2413    /// Get the cron scheduler (if configured)
2414    pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
2415        self.cron_scheduler.as_ref()
2416    }
2417
2418    /// Set the container supervisor for crash/panic policy enforcement
2419    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2420    pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
2421        self.container_supervisor = Some(supervisor);
2422    }
2423
2424    /// Builder pattern: add container supervisor
2425    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2426    #[must_use]
2427    pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
2428        self.container_supervisor = Some(supervisor);
2429        self
2430    }
2431
2432    /// Get the container supervisor (if configured)
2433    pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
2434        self.container_supervisor.as_ref()
2435    }
2436
2437    /// Start the container supervisor background task
2438    ///
2439    /// This spawns a background task that monitors containers for crashes
2440    /// and enforces the `on_panic` error policy.
2441    ///
2442    /// # Errors
2443    /// Returns an error if no container supervisor is configured.
2444    ///
2445    /// # Returns
2446    /// A `JoinHandle` for the supervisor task.
2447    pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
2448        let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
2449            AgentError::Configuration("Container supervisor not configured".to_string())
2450        })?;
2451
2452        let supervisor = Arc::clone(supervisor);
2453        Ok(tokio::spawn(async move {
2454            supervisor.run_loop().await;
2455        }))
2456    }
2457
2458    /// Shutdown the container supervisor
2459    pub fn shutdown_container_supervisor(&self) {
2460        if let Some(supervisor) = &self.container_supervisor {
2461            supervisor.shutdown();
2462        }
2463    }
2464
2465    /// Recorded init PID for `container_id` under `service_name`, if the instance
2466    /// still tracks it. This is the PID captured AT START (see the `pid` field on
2467    /// [`Container`]), so it survives the container's death — letting the overlay
2468    /// detach delete the host-side veth (`veth-<pid>-*`) by name even after the
2469    /// process is gone ("process id or not, kill the adapter").
2470    async fn recorded_container_pid(
2471        &self,
2472        service_name: &str,
2473        container_id: &ContainerId,
2474    ) -> Option<u32> {
2475        let services = self.services.read().await;
2476        let instance = services.get(service_name)?;
2477        let containers = instance.containers().read().await;
2478        containers.get(container_id).and_then(|c| c.pid)
2479    }
2480
2481    /// Spawn the container-down → overlay-detach bridge.
2482    ///
2483    /// The container supervisor (`run_loop`) detects a container going DOWN
2484    /// (exit/crash/crash-loop/isolate/service-shutdown) and emits a
2485    /// [`SupervisorEvent`]. Nothing consumed that stream before, so a container
2486    /// that died on its own — rather than via an explicit scale-down/delete —
2487    /// left its host-side veth (`veth-<pid>-*`) and its allocated overlay IP
2488    /// dangling until the whole service was torn down (or the PID-keyed periodic
2489    /// sweep eventually fired). This task drains the stream and, for every
2490    /// down-event, detaches the container's overlay attachment by the PID
2491    /// recorded AT START — so the veth is deleted and the IP released the moment
2492    /// the container goes down, regardless of restart policy.
2493    ///
2494    /// No-op (returns `None`) when no supervisor or no overlay manager is wired.
2495    /// Best-effort: detach failures are logged, never propagated.
2496    ///
2497    /// Takes the manager as `&Arc<RwLock<Self>>` (its post-`init_daemon` home)
2498    /// rather than `&Arc<Self>`: `serve()` calls `Arc::try_unwrap` on the
2499    /// `Arc<ServiceManager>` to re-wrap it in the `RwLock`, which REQUIRES sole
2500    /// ownership — so the detach task must NOT hold a strong `Arc<ServiceManager>`
2501    /// clone (that bumped the refcount and made `try_unwrap` fail, aborting
2502    /// daemon startup). Holding a clone of the shared `Arc<RwLock<Self>>` is
2503    /// safe; that Arc is meant to be shared (router, internal state, etc.).
2504    pub async fn spawn_overlay_detach_on_exit(
2505        sm: &Arc<RwLock<Self>>,
2506    ) -> Option<tokio::task::JoinHandle<()>> {
2507        let (overlay, mut events) = {
2508            let guard = sm.read().await;
2509            let overlay = guard.overlay_manager.as_ref().map(Arc::clone)?;
2510            let events = guard.take_supervisor_events().await?;
2511            (overlay, events)
2512        };
2513        let sm = Arc::clone(sm);
2514
2515        Some(tokio::spawn(async move {
2516            while let Some(event) = events.recv().await {
2517                // Every variant signals the container went DOWN; on
2518                // `ContainerRestarted` the supervisor already re-`start`ed the
2519                // SAME container, which gets a NEW PID + a NEW veth — so deleting
2520                // the OLD PID's veth/IP here is correct and never races the live
2521                // one (names are PID-scoped).
2522                let (service_name, container_id) = match &event {
2523                    SupervisorEvent::ContainerRestarted {
2524                        id, service_name, ..
2525                    }
2526                    | SupervisorEvent::CrashLoopBackOff {
2527                        id, service_name, ..
2528                    }
2529                    | SupervisorEvent::ContainerIsolated {
2530                        id, service_name, ..
2531                    }
2532                    | SupervisorEvent::ServiceShutdown {
2533                        id, service_name, ..
2534                    }
2535                    | SupervisorEvent::ContainerCompleted { id, service_name } => {
2536                        (service_name.clone(), id.clone())
2537                    }
2538                };
2539
2540                // Grab the recorded PID + a clone of the token sink in one read
2541                // guard, then drop the guard before any await on revoke.
2542                let (pid, token_sink) = {
2543                    let guard = sm.read().await;
2544                    let pid = guard
2545                        .recorded_container_pid(&service_name, &container_id)
2546                        .await;
2547                    (pid, guard.token_sink.as_ref().map(Arc::clone))
2548                };
2549
2550                // Revoke the container's scoped token (best-effort). The jti is
2551                // the deterministic `container:<service>:<service>-<replica>`
2552                // string the runtime minted under — `container_id` is already
2553                // the `{service}-{replica}` form here.
2554                if let Some(sink) = token_sink {
2555                    sink.revoke(&format!("container:{service_name}:{container_id}"))
2556                        .await;
2557                }
2558
2559                let Some(pid) = pid else {
2560                    tracing::debug!(
2561                        service = %service_name,
2562                        container = %container_id,
2563                        "container-down detach: no recorded PID (never attached or already cleaned up)"
2564                    );
2565                    continue;
2566                };
2567
2568                let overlay_guard = overlay.read().await;
2569                if let Err(e) = overlay_guard.detach_container(pid).await {
2570                    tracing::warn!(
2571                        service = %service_name,
2572                        container = %container_id,
2573                        pid,
2574                        error = %e,
2575                        "container-down overlay detach failed (veth may linger until periodic sweep)"
2576                    );
2577                } else {
2578                    tracing::info!(
2579                        service = %service_name,
2580                        container = %container_id,
2581                        pid,
2582                        "detached overlay for downed container (deleted veth + released IP)"
2583                    );
2584                }
2585            }
2586            tracing::debug!("supervisor event stream closed; overlay-detach bridge exiting");
2587        }))
2588    }
2589
2590    /// Get the supervised state of a container
2591    pub async fn get_container_supervised_state(
2592        &self,
2593        container_id: &ContainerId,
2594    ) -> Option<SupervisedState> {
2595        if let Some(supervisor) = &self.container_supervisor {
2596            supervisor.get_state(container_id).await
2597        } else {
2598            None
2599        }
2600    }
2601
2602    /// Get supervisor events receiver
2603    ///
2604    /// Note: This can only be called once; the receiver is moved to the caller.
2605    pub async fn take_supervisor_events(
2606        &self,
2607    ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
2608        if let Some(supervisor) = &self.container_supervisor {
2609            supervisor.take_event_receiver().await
2610        } else {
2611            None
2612        }
2613    }
2614
2615    // ==================== Dependency Orchestration ====================
2616
2617    /// Deploy all swarm-sharded services in `services` as coordinated gangs.
2618    ///
2619    /// Partitions the input by `gpu.sharding.swarm_id` (via [`swarm_id_of`]) and,
2620    /// for each swarm group, registers every member (`upsert_service`) and then
2621    /// places the whole group together. On the cluster **leader** this routes to
2622    /// [`Cluster::dispatch_swarm_scale`], which runs `place_swarm`'s all-or-nothing
2623    /// gang placement and pins each member to its chosen node. On a follower, or
2624    /// with no cluster handle, it falls back to per-member [`Self::scale_service`]
2625    /// (which forwards to the leader / scales locally respectively).
2626    ///
2627    /// **Gang co-location is only guaranteed on the leader path** — the normal
2628    /// single-root-node case. A follower-initiated deploy forwards each member
2629    /// independently (the leader's `/internal/scale` handler scales each member
2630    /// locally, bypassing gang placement), so co-location is best-effort there.
2631    ///
2632    /// Returns the set of service names handled here so the caller can skip them
2633    /// in the per-service deploy loop.
2634    ///
2635    /// # Errors
2636    /// Returns an error if a member fails to register or the gang fails to place.
2637    async fn deploy_swarm_groups(
2638        &self,
2639        services: &HashMap<String, ServiceSpec>,
2640    ) -> Result<std::collections::HashSet<String>> {
2641        let groups = partition_swarm_groups(services);
2642        let mut handled: std::collections::HashSet<String> = std::collections::HashSet::new();
2643
2644        for (swarm_id, mut members) in groups {
2645            // Deterministic order so logs / dispatch are stable across runs.
2646            members.sort_by(|a, b| a.0.cmp(&b.0));
2647
2648            tracing::info!(
2649                swarm_id = %swarm_id,
2650                member_count = members.len(),
2651                members = ?members.iter().map(|(n, _)| n.as_str()).collect::<Vec<_>>(),
2652                "Deploying inference swarm as a coordinated gang"
2653            );
2654
2655            // Register every member spec FIRST so the placement inputs and any
2656            // receiving node can see the service before the gang is dispatched.
2657            for (name, spec) in &members {
2658                Box::pin(self.upsert_service(name.clone(), spec.clone())).await?;
2659                handled.insert(name.clone());
2660            }
2661
2662            // Build the (name, spec, replicas) tuples the cluster dispatch wants.
2663            // Swarm members are one replica each; honor the spec's own intent.
2664            let member_reqs: Vec<(String, ServiceSpec, u32)> = members
2665                .iter()
2666                .map(|(name, spec)| {
2667                    let replicas = swarm_member_replicas(spec);
2668                    (name.clone(), spec.clone(), replicas)
2669                })
2670                .collect();
2671
2672            if let Some(cluster) = &self.cluster {
2673                if cluster.is_leader().await {
2674                    // Leader: run the real gang placement (all-or-nothing).
2675                    cluster
2676                        .dispatch_swarm_scale(&member_reqs)
2677                        .await
2678                        .map_err(|e| AgentError::CreateFailed {
2679                            id: swarm_id.clone(),
2680                            reason: format!("swarm gang dispatch: {e}"),
2681                        })?;
2682                } else {
2683                    // Follower: forward each member to the leader independently
2684                    // (best-effort co-location — see method docs).
2685                    for (name, _spec, replicas) in &member_reqs {
2686                        self.scale_service(name, *replicas).await?;
2687                    }
2688                }
2689            } else {
2690                // No cluster handle (single-node, no Cluster wired): scale each
2691                // member locally. Co-location is trivial — one node.
2692                for (name, _spec, replicas) in &member_reqs {
2693                    self.scale_service(name, *replicas).await?;
2694                }
2695            }
2696
2697            // Mark every member started in health states.
2698            for (name, _) in &members {
2699                self.update_health_state(name, HealthState::Unknown).await;
2700            }
2701        }
2702
2703        Ok(handled)
2704    }
2705
2706    /// Deploy multiple services respecting their dependency order
2707    ///
2708    /// This method:
2709    /// 1. Builds a dependency graph from the services
2710    /// 2. Validates no cycles exist
2711    /// 3. Computes topological order (services with no deps first)
2712    /// 4. For each service in order, waits for dependencies then starts the service
2713    ///
2714    /// # Arguments
2715    /// * `services` - Map of service name to service specification
2716    ///
2717    /// # Errors
2718    /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
2719    /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
2720    pub async fn deploy_with_dependencies(
2721        &self,
2722        services: HashMap<String, ServiceSpec>,
2723    ) -> Result<()> {
2724        if services.is_empty() {
2725            return Ok(());
2726        }
2727
2728        // Swarm pre-pass: any service whose spec carries `gpu.sharding` is part
2729        // of an inference swarm and must be placed as a coordinated GANG (the
2730        // whole pipeline co-located, all-or-nothing), NOT one-service-at-a-time
2731        // through the normal scale path. Handle every such service here and skip
2732        // them in the per-service loop below so non-swarm services are entirely
2733        // unchanged. (Swarm members are single-replica units placed together;
2734        // dependency ordering within a gang is the ring order `place_swarm`
2735        // imposes, not the per-service `depends` graph.)
2736        let swarm_handled = self.deploy_swarm_groups(&services).await?;
2737
2738        // Build dependency graph
2739        let graph = DependencyGraph::build(&services)?;
2740
2741        tracing::info!(
2742            service_count = services.len(),
2743            swarm_handled = swarm_handled.len(),
2744            "Starting deployment with dependency ordering"
2745        );
2746
2747        // Get startup order
2748        let order = graph.startup_order();
2749        tracing::debug!(order = ?order, "Computed startup order");
2750
2751        // Start services in dependency order
2752        for service_name in order {
2753            // Swarm-sharded services were already deployed as a gang above.
2754            if swarm_handled.contains(service_name) {
2755                continue;
2756            }
2757
2758            let service_spec = services
2759                .get(service_name)
2760                .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
2761
2762            // Wait for dependencies first
2763            if !service_spec.depends.is_empty() {
2764                tracing::info!(
2765                    service = %service_name,
2766                    dependency_count = service_spec.depends.len(),
2767                    "Waiting for dependencies"
2768                );
2769                self.wait_for_dependencies(service_name, &service_spec.depends)
2770                    .await?;
2771            }
2772
2773            // Register and start service
2774            tracing::info!(service = %service_name, "Starting service");
2775            Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
2776
2777            // Get the desired replica count from scale config
2778            let replicas = match &service_spec.scale {
2779                zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
2780                zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
2781                zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
2782            };
2783            self.scale_service(service_name, replicas).await?;
2784
2785            // Mark service as started in health states (Unknown until health check runs)
2786            self.update_health_state(service_name, HealthState::Unknown)
2787                .await;
2788
2789            tracing::info!(
2790                service = %service_name,
2791                replicas = replicas,
2792                "Service started"
2793            );
2794        }
2795
2796        tracing::info!(service_count = services.len(), "Deployment complete");
2797
2798        Ok(())
2799    }
2800
2801    /// Wait for all dependencies of a service to be satisfied
2802    ///
2803    /// # Arguments
2804    /// * `service` - Name of the service waiting for dependencies
2805    /// * `deps` - Slice of dependency specifications
2806    ///
2807    /// # Errors
2808    /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
2809    async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
2810        let condition_checker = DependencyConditionChecker::new(
2811            Arc::clone(&self.runtime),
2812            Arc::clone(&self.health_states),
2813            None,
2814        );
2815
2816        let waiter = DependencyWaiter::new(condition_checker);
2817        let results = waiter.wait_for_all(deps).await?;
2818
2819        // Check results for failures
2820        for result in results {
2821            match result {
2822                WaitResult::TimedOutFail {
2823                    service: dep_service,
2824                    condition,
2825                    timeout,
2826                } => {
2827                    return Err(AgentError::DependencyTimeout {
2828                        service: service.to_string(),
2829                        dependency: dep_service,
2830                        condition: format!("{condition:?}"),
2831                        timeout,
2832                    });
2833                }
2834                WaitResult::TimedOutWarn {
2835                    service: dep_service,
2836                    condition,
2837                } => {
2838                    tracing::warn!(
2839                        service = %service,
2840                        dependency = %dep_service,
2841                        condition = ?condition,
2842                        "Dependency timed out but continuing"
2843                    );
2844                }
2845                WaitResult::TimedOutContinue | WaitResult::Satisfied => {
2846                    // Continue silently
2847                }
2848            }
2849        }
2850
2851        Ok(())
2852    }
2853
2854    /// Check if all dependencies for a service are currently satisfied
2855    ///
2856    /// This is a one-shot check (no waiting). Useful for pre-flight validation.
2857    ///
2858    /// # Errors
2859    /// Returns an error if a dependency check fails unexpectedly.
2860    pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
2861        let condition_checker = DependencyConditionChecker::new(
2862            Arc::clone(&self.runtime),
2863            Arc::clone(&self.health_states),
2864            None,
2865        );
2866
2867        for dep in deps {
2868            if !condition_checker.check(dep).await? {
2869                return Ok(false);
2870            }
2871        }
2872
2873        Ok(true)
2874    }
2875
2876    /// Add or update a workload (service, job, or cron)
2877    ///
2878    /// This method handles different resource types appropriately:
2879    /// - **Service**: Traditional long-running containers with scaling and health checks
2880    /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
2881    /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
2882    ///
2883    /// # Errors
2884    /// Returns an error if service creation, scaling, or cron registration fails.
2885    #[allow(clippy::too_many_lines)]
2886    pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
2887        match spec.rtype {
2888            ResourceType::Service => {
2889                // Long-running service: create/update instance
2890                let mut services = self.services.write().await;
2891
2892                if let Some(instance) = services.get_mut(&name) {
2893                    // Update existing service. We need to:
2894                    //   1. Update the in-memory spec (so future scale-ups use the new image).
2895                    //   2. Recreate the local replicas when the image actually changed —
2896                    //      either a different image *reference* (e.g. tag bump
2897                    //      1.28 -> 1.29), which is a new image regardless of pull
2898                    //      policy, or, under Always/Newer, observed *digest* drift on
2899                    //      the same reference.
2900                    // The recreate is LOCAL (`scale_service_local`): `upsert_service`
2901                    // runs on whichever node owns the replicas (the leader for its
2902                    // own share, each worker via the `/internal/scale` handler). Using
2903                    // the cluster-routed `scale_service` here would bounce a worker's
2904                    // recreate back to the leader and re-enter dispatch. Cluster-wide
2905                    // distribution is the caller's job (orchestrate_deployment + the
2906                    // scale dispatch that carries this spec to every node).
2907                    let image_changed = instance.spec.image.name != spec.image.name;
2908                    instance.spec = spec.clone();
2909                    if let Some(dns) = &self.dns_server {
2910                        instance.set_dns_server(Arc::clone(dns));
2911                    }
2912                    // Re-apply overlay resolver injection: the spec was just
2913                    // replaced wholesale, so any prior injection on the old
2914                    // spec is gone. Honors host_network / user-supplied dns.
2915                    if let Some(ip) = self.container_dns {
2916                        instance.set_container_dns(ip);
2917                    }
2918
2919                    let effective = spec.image.pull_policy;
2920                    let old_digest = instance.last_pulled_digest().await;
2921                    let current_replicas =
2922                        u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
2923                    drop(services); // Release write lock before pull / scale (which take their own locks).
2924
2925                    // A changed image reference always recreates. Same-reference
2926                    // refreshes are governed by pull policy + digest drift.
2927                    let mut should_recreate = image_changed;
2928                    let mut new_digest = old_digest.clone();
2929
2930                    match effective {
2931                        PullPolicy::Never | PullPolicy::IfNotPresent => {
2932                            // No proactive pull. If the reference changed we still
2933                            // recreate below; the scale-up path pulls the (absent) new
2934                            // image per IfNotPresent. A same-reference redeploy under
2935                            // these policies is a genuine no-op.
2936                            tracing::debug!(
2937                                service = %name,
2938                                policy = ?effective,
2939                                image_changed,
2940                                "re-deploy under no-refresh pull policy"
2941                            );
2942                        }
2943                        PullPolicy::Always | PullPolicy::Newer => {
2944                            // Pull (this updates the cached digest as a side-effect).
2945                            // We need a read guard to keep the instance alive while
2946                            // calling its &self method.
2947                            let services_ro = self.services.read().await;
2948                            new_digest = if let Some(inst) = services_ro.get(&name) {
2949                                inst.pull_and_refresh_digest().await?
2950                            } else {
2951                                // The service vanished between our write-lock release
2952                                // and read-lock acquisition (race with remove_service).
2953                                // Treat this as a no-op; the caller will see the removal.
2954                                tracing::warn!(
2955                                    service = %name,
2956                                    "service removed during upsert; skipping drift recreate"
2957                                );
2958                                return Ok(());
2959                            };
2960                            drop(services_ro);
2961
2962                            // Always forces a recreate. Newer recreates on digest
2963                            // drift. When digests are unknown (runtime doesn't expose
2964                            // them), we can't observe drift safely under Newer, so the
2965                            // reference check above is the only trigger.
2966                            should_recreate = should_recreate
2967                                || match effective {
2968                                    PullPolicy::Always => true,
2969                                    PullPolicy::Newer => match (&old_digest, &new_digest) {
2970                                        (Some(old), Some(new)) => old != new,
2971                                        _ => false,
2972                                    },
2973                                    _ => false,
2974                                };
2975                        }
2976                    }
2977
2978                    if should_recreate && current_replicas > 0 {
2979                        tracing::info!(
2980                            service = %name,
2981                            policy = ?effective,
2982                            image_changed,
2983                            old_digest = ?old_digest,
2984                            new_digest = ?new_digest,
2985                            replicas = current_replicas,
2986                            "image changed; performing local rolling recreate"
2987                        );
2988                        self.scale_service_local(&name, 0).await?;
2989                        self.scale_service_local(&name, current_replicas).await?;
2990                        tracing::info!(
2991                            service = %name,
2992                            new_digest = ?new_digest,
2993                            "service recreated with refreshed image"
2994                        );
2995                    } else {
2996                        tracing::debug!(
2997                            service = %name,
2998                            policy = ?effective,
2999                            old_digest = ?old_digest,
3000                            new_digest = ?new_digest,
3001                            "service up to date; no recreate required"
3002                        );
3003                    }
3004                    return Ok(());
3005                }
3006                // Create new service with proxy manager for health-aware load balancing
3007                let overlay = self.overlay_manager.as_ref().map(Arc::clone);
3008                let mut instance = if let Some(proxy) = &self.proxy_manager {
3009                    ServiceInstance::with_proxy(
3010                        name.clone(),
3011                        spec,
3012                        self.runtime.clone(),
3013                        overlay,
3014                        Arc::clone(proxy),
3015                    )
3016                } else {
3017                    ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
3018                };
3019                // Thread the local cluster node id so new `ContainerId`s carry
3020                // owning-node identity. Defaults to `0` in single-node mode.
3021                instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
3022                // Thread ingress capability + cluster handle so external service
3023                // domains resolve to an ingress-capable node's overlay IP.
3024                instance.set_ingress_enabled(self.ingress);
3025                if let Some(cluster) = &self.cluster {
3026                    instance.set_cluster(Arc::clone(cluster));
3027                }
3028                // Thread the token sink so the container's scoped token is
3029                // revoked on scale-down / removal.
3030                instance.set_token_sink(self.token_sink.clone());
3031                // Thread the digest sink so a successful pull persists the
3032                // resolved image digest for restore-from-local.
3033                instance.set_digest_sink(self.digest_sink.clone());
3034                // Thread the registry credential store so the supervisor's
3035                // recreate pulls resolve `zlayer login` / docker-config auth.
3036                instance.set_registry_store(self.registry_store.clone());
3037                // Set DNS server if configured
3038                if let Some(dns) = &self.dns_server {
3039                    instance.set_dns_server(Arc::clone(dns));
3040                }
3041                // Inject the overlay resolver into the spec so containers use it
3042                // instead of the hijacked host resolv.conf (no-op for
3043                // host_network / user-supplied dns).
3044                if let Some(ip) = self.container_dns {
3045                    instance.set_container_dns(ip);
3046                }
3047                // Wire shared health states so callbacks bridge back to ServiceManager
3048                instance.set_health_states(Arc::clone(&self.health_states));
3049                // Register HTTP routes via proxy manager
3050                if let Some(proxy) = &self.proxy_manager {
3051                    proxy.add_service(&name, &instance.spec).await;
3052                }
3053                // Register TCP/UDP endpoints in stream registry
3054                if let Some(stream_registry) = &self.stream_registry {
3055                    for endpoint in &instance.spec.endpoints {
3056                        let svc = StreamService::new(
3057                            name.clone(),
3058                            Vec::new(), // No backends yet; added on scale-up
3059                        );
3060                        match endpoint.protocol {
3061                            Protocol::Tcp => {
3062                                stream_registry.register_tcp(endpoint.port, svc);
3063                                tracing::debug!(
3064                                    service = %name,
3065                                    port = endpoint.port,
3066                                    "Registered TCP stream route"
3067                                );
3068                            }
3069                            Protocol::Udp => {
3070                                stream_registry.register_udp(endpoint.port, svc);
3071                                tracing::debug!(
3072                                    service = %name,
3073                                    port = endpoint.port,
3074                                    "Registered UDP stream route"
3075                                );
3076                            }
3077                            _ => {} // HTTP routes handled by proxy manager
3078                        }
3079                    }
3080                }
3081                services.insert(name, instance);
3082            }
3083            ResourceType::Job => {
3084                // Job: Just store the spec for later triggering
3085                // Jobs don't start containers immediately; they're triggered on-demand
3086                if let Some(executor) = &self.job_executor {
3087                    executor.register_job(&name, spec).await;
3088                    tracing::info!(job = %name, "Registered job spec");
3089                } else {
3090                    tracing::warn!(
3091                        job = %name,
3092                        "Job executor not configured, storing as service for reference"
3093                    );
3094                    // Fallback: store as service instance for reference
3095                    let mut services = self.services.write().await;
3096                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
3097                    let mut instance = if let Some(proxy) = &self.proxy_manager {
3098                        ServiceInstance::with_proxy(
3099                            name.clone(),
3100                            spec,
3101                            self.runtime.clone(),
3102                            overlay,
3103                            Arc::clone(proxy),
3104                        )
3105                    } else {
3106                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
3107                    };
3108                    // Thread the local cluster node id (same as the Service
3109                    // branch above) so the fallback-as-service Job entry also
3110                    // carries owning-node identity.
3111                    instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
3112                    // Thread ingress capability + cluster handle (same as the
3113                    // Service branch).
3114                    instance.set_ingress_enabled(self.ingress);
3115                    if let Some(cluster) = &self.cluster {
3116                        instance.set_cluster(Arc::clone(cluster));
3117                    }
3118                    // Thread the token sink so the container's scoped token is
3119                    // revoked on scale-down / removal.
3120                    instance.set_token_sink(self.token_sink.clone());
3121                    // Thread the digest sink so a successful pull persists the
3122                    // resolved image digest for restore-from-local.
3123                    instance.set_digest_sink(self.digest_sink.clone());
3124                    // Thread the registry credential store so the supervisor's
3125                    // recreate pulls resolve `zlayer login` / docker-config auth.
3126                    instance.set_registry_store(self.registry_store.clone());
3127                    // Set DNS server if configured
3128                    if let Some(dns) = &self.dns_server {
3129                        instance.set_dns_server(Arc::clone(dns));
3130                    }
3131                    // Inject the overlay resolver (no-op for host_network /
3132                    // user-supplied dns).
3133                    if let Some(ip) = self.container_dns {
3134                        instance.set_container_dns(ip);
3135                    }
3136                    services.insert(name, instance);
3137                }
3138            }
3139            ResourceType::Cron => {
3140                // Cron: Register with the cron scheduler
3141                if let Some(scheduler) = &self.cron_scheduler {
3142                    scheduler.register(&name, &spec).await?;
3143                    tracing::info!(cron = %name, "Registered cron job with scheduler");
3144                } else {
3145                    return Err(AgentError::Configuration(format!(
3146                        "Cron scheduler not configured for cron job '{name}'"
3147                    )));
3148                }
3149            }
3150        }
3151
3152        Ok(())
3153    }
3154
3155    /// Update backend addresses via `ProxyManager` after scaling, applying
3156    /// per-endpoint `target_role` filtering.
3157    ///
3158    /// For each L7 endpoint of the service, this collects the subset of
3159    /// containers whose `ContainerId.role` matches `endpoint.target_role`
3160    /// (or all containers when `target_role` is `None`) and updates the
3161    /// proxy's backend pool for that specific endpoint via
3162    /// [`ProxyManager::update_endpoint_backends`].
3163    async fn update_proxy_backends(&self, instance: &ServiceInstance) {
3164        let Some(proxy) = &self.proxy_manager else {
3165            return;
3166        };
3167        for endpoint in &instance.spec.endpoints {
3168            // Only L7 endpoints flow through the proxy (HTTP/HTTPS/WS).
3169            if !matches!(
3170                endpoint.protocol,
3171                Protocol::Http | Protocol::Https | Protocol::Websocket
3172            ) {
3173                continue;
3174            }
3175            let addrs = self.collect_endpoint_backends(instance, endpoint).await;
3176            proxy
3177                .update_endpoint_backends(&instance.service_name, &endpoint.name, addrs)
3178                .await;
3179        }
3180    }
3181
3182    /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
3183    ///
3184    /// For containers with a port override (macOS sandbox), the addresses already
3185    /// carry the runtime-assigned port. In that case, the container listens on the
3186    /// override port for all traffic, so we use the address port directly. For
3187    /// containers without a port override (Linux, VMs), we reconstruct addresses
3188    /// using the endpoint's declared port, since each container has its own IP
3189    /// and can bind any port independently.
3190    async fn update_stream_backends(&self, instance: &ServiceInstance) {
3191        let Some(stream_registry) = &self.stream_registry else {
3192            return;
3193        };
3194
3195        for endpoint in &instance.spec.endpoints {
3196            match endpoint.protocol {
3197                Protocol::Tcp => {
3198                    let tcp_backends = self.collect_endpoint_backends(instance, endpoint).await;
3199                    let backend_count = tcp_backends.len();
3200                    stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
3201                    tracing::debug!(
3202                        endpoint = %endpoint.name,
3203                        port = endpoint.port,
3204                        backend_count = backend_count,
3205                        target_role = ?endpoint.target_role,
3206                        "Updated TCP stream backends"
3207                    );
3208                }
3209                Protocol::Udp => {
3210                    let udp_backends = self.collect_endpoint_backends(instance, endpoint).await;
3211                    let backend_count = udp_backends.len();
3212                    stream_registry.update_udp_backends(endpoint.port, udp_backends);
3213                    tracing::debug!(
3214                        endpoint = %endpoint.name,
3215                        port = endpoint.port,
3216                        backend_count = backend_count,
3217                        target_role = ?endpoint.target_role,
3218                        "Updated UDP stream backends"
3219                    );
3220                }
3221                _ => {} // HTTP endpoints handled by update_proxy_backends
3222            }
3223        }
3224    }
3225
3226    /// Scale a service. Cluster-aware: if this node has a `Cluster` handle
3227    /// and we're not the leader, forward to the leader; if leader, compute
3228    /// affinity-aware placement and dispatch each node its share via
3229    /// `dispatch_scale_distributed`; else (single-node) just scale locally.
3230    ///
3231    /// # Errors
3232    /// Returns an error if scaling fails on any participating node.
3233    #[allow(clippy::cast_possible_truncation)]
3234    pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
3235        use zlayer_scheduler::cluster::InternalScaleRequest;
3236
3237        tracing::info!(
3238            target: "zlayer::scale_distribute",
3239            service = name,
3240            replicas,
3241            has_cluster = self.cluster.is_some(),
3242            "scale_service ENTER"
3243        );
3244
3245        // Attach the current spec so every receiving node can register/update
3246        // the service before scaling. This is what propagates an image change
3247        // to worker containers and lets a fresh worker run a replica it has
3248        // never seen. `None` if the service isn't registered locally (the
3249        // receiver then falls back to its own cached spec).
3250        let spec = self
3251            .services
3252            .read()
3253            .await
3254            .get(name)
3255            .map(|inst| inst.spec.clone());
3256        let build_req = |replicas: u32| {
3257            let req = InternalScaleRequest::new(name, replicas);
3258            match spec.clone() {
3259                Some(s) => req.with_spec(s),
3260                None => req,
3261            }
3262        };
3263
3264        if let Some(cluster) = &self.cluster {
3265            let is_leader = cluster.is_leader().await;
3266            tracing::info!(
3267                target: "zlayer::scale_distribute",
3268                service = name,
3269                replicas,
3270                is_leader,
3271                spec_affinity = ?spec.as_ref().and_then(|s| s.affinity.clone()),
3272                "scale_service: cluster path"
3273            );
3274            if !is_leader {
3275                // Follower: forward to the leader and let it dispatch.
3276                return cluster
3277                    .forward_scale(build_req(replicas))
3278                    .await
3279                    .map_err(|e| AgentError::CreateFailed {
3280                        id: name.to_string(),
3281                        reason: format!("cluster forward: {e}"),
3282                    });
3283            }
3284
3285            // Leader path. Compute affinity-aware placement across the Ready
3286            // node set and dispatch each node its share. `dispatch_scale_distributed`
3287            // reuses the same placement machinery as one-off container placement
3288            // (`cluster_nodes_to_node_states` + `place_service_replicas`), honoring
3289            // `ServiceSpec.affinity` (`spread`/`pack`/`pin`). The leader's own
3290            // share short-circuits to a local call (no localhost HTTP round-trip),
3291            // and the attached spec lets fresh workers register the service before
3292            // scaling. Single-node clusters fall through the default impl, which
3293            // dispatches everything to this node (unchanged behavior).
3294            return cluster
3295                .dispatch_scale_distributed(build_req(replicas))
3296                .await
3297                .map_err(|e| AgentError::CreateFailed {
3298                    id: name.to_string(),
3299                    reason: format!("cluster dispatch: {e}"),
3300                });
3301        }
3302
3303        // No cluster handle — single-node mode.
3304        self.scale_service_local(name, replicas).await
3305    }
3306
3307    /// Local (single-node) scale: directly creates/destroys containers on
3308    /// this node only. Called by:
3309    ///   - `scale_service` in single-node mode (when `self.cluster` is None).
3310    ///   - The `/api/v1/internal/scale` handler (which the leader's
3311    ///     `Cluster::dispatch_scale` HTTP-POSTs to, bottoming out the
3312    ///     recursive loop on each receiving node).
3313    ///   - The cluster impls' `local_dispatch` closure (for the leader's own
3314    ///     share — short-circuited to avoid a localhost round-trip).
3315    ///
3316    /// # Errors
3317    /// Returns an error if the service is not found or scaling fails.
3318    #[allow(clippy::cast_possible_truncation)]
3319    pub async fn scale_service_local(&self, name: &str, replicas: u32) -> Result<()> {
3320        tracing::info!(
3321            target: "zlayer::scale_distribute",
3322            service = name,
3323            replicas,
3324            "scale_service_local ENTER"
3325        );
3326        let _permit = self.scale_semaphore.acquire().await;
3327
3328        let services = self.services.read().await;
3329        let Some(instance) = services.get(name) else {
3330            // Draining a service this node never hosted is a no-op (e.g. the
3331            // leader fans out `count=0` to a node to drain it during a
3332            // scale-down, but that node never ran the service).
3333            if replicas == 0 {
3334                return Ok(());
3335            }
3336            return Err(AgentError::NotFound {
3337                container: name.to_string(),
3338                reason: "service not found".to_string(),
3339            });
3340        };
3341
3342        // Get current replica count before scaling
3343        let current_replicas = instance.replica_count().await as u32;
3344
3345        // Perform the scaling operation. `Box::pin` keeps the (large)
3346        // per-replica `scale_to` future off this funnel's stack frame so the
3347        // chain of callers above (`scale_service`, the autoscale evaluators,
3348        // `deploy_swarm_groups`) stays under clippy's `large_futures` threshold.
3349        Box::pin(instance.scale_to(replicas)).await?;
3350
3351        // After scaling, update proxy and stream backends for each endpoint.
3352        // Per-endpoint collection (rather than a single service-wide list)
3353        // is what makes `EndpointSpec.target_role` filtering possible:
3354        // each endpoint receives only the containers whose
3355        // `ContainerId.role` matches its declared role.
3356        if self.proxy_manager.is_some() {
3357            self.update_proxy_backends(instance).await;
3358        }
3359        if self.stream_registry.is_some() {
3360            self.update_stream_backends(instance).await;
3361        }
3362
3363        // Register new containers with supervisor for crash monitoring.
3364        //
3365        // Container ids here must match what `ServiceInstance::scale_to`
3366        // constructed — same role (derived from `replica_groups`) and same
3367        // local node id. Otherwise supervise/unsupervise miss the live entry
3368        // and crash-restart bookkeeping leaks across scale events.
3369        let local_node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
3370        if let Some(supervisor) = &self.container_supervisor {
3371            // For scale-up, register new containers
3372            if replicas > current_replicas {
3373                for i in current_replicas..replicas {
3374                    let replica_idx = i + 1;
3375                    let container_id = ContainerId::with_role_and_node(
3376                        name.to_string(),
3377                        replica_idx,
3378                        instance.role_for_replica(replica_idx),
3379                        local_node_id,
3380                    );
3381                    supervisor.supervise(&container_id, &instance.spec).await;
3382                }
3383            }
3384            // For scale-down, unregister removed containers
3385            if replicas < current_replicas {
3386                for i in replicas..current_replicas {
3387                    let replica_idx = i + 1;
3388                    let container_id = ContainerId::with_role_and_node(
3389                        name.to_string(),
3390                        replica_idx,
3391                        instance.role_for_replica(replica_idx),
3392                        local_node_id,
3393                    );
3394                    supervisor.unsupervise(&container_id).await;
3395                }
3396            }
3397        }
3398
3399        Ok(())
3400    }
3401
3402    /// Collect backend addresses for a single endpoint of a service.
3403    ///
3404    /// This queries the service instance's containers for their overlay
3405    /// network IP addresses and constructs backend addresses using the
3406    /// endpoint's container target port.
3407    ///
3408    /// Containers are filtered by `endpoint.target_role`:
3409    /// - `None` (default): all containers of the service are eligible
3410    ///   (legacy behavior).
3411    /// - `Some(role)`: only containers whose `ContainerId.role` equals
3412    ///   `role` are included. Implements
3413    ///   [`zlayer_spec::EndpointSpec::target_role`].
3414    ///
3415    /// If a container has a `port_override` (e.g., macOS sandbox where all
3416    /// containers share the host network), that port is used instead of
3417    /// the spec-declared endpoint port. This allows multiple replicas on
3418    /// the same IP (`127.0.0.1`) to be distinguished by port.
3419    async fn collect_endpoint_backends(
3420        &self,
3421        instance: &ServiceInstance,
3422        endpoint: &zlayer_spec::EndpointSpec,
3423    ) -> Vec<SocketAddr> {
3424        let mut addrs = Vec::new();
3425        let endpoint_port = endpoint.target_port();
3426        let containers = instance.containers().read().await;
3427
3428        for (container_id, container) in containers.iter() {
3429            // target_role filter: skip containers whose role doesn't match.
3430            if let Some(required_role) = endpoint.target_role.as_ref() {
3431                if container_id.role != *required_role {
3432                    continue;
3433                }
3434            }
3435            let Some(ip) = container.overlay_ip else {
3436                continue;
3437            };
3438            // Use the runtime-assigned port override if present (macOS
3439            // sandbox), otherwise fall back to the endpoint's declared
3440            // target port.
3441            let port = container.port_override.unwrap_or(endpoint_port);
3442            addrs.push(SocketAddr::new(ip, port));
3443        }
3444
3445        // If we expected backends but found none, log a hint so operators
3446        // can debug. Distinguish "no containers" from "role filter
3447        // excluded everything" from "no overlay IPs".
3448        if addrs.is_empty() && !containers.is_empty() {
3449            tracing::warn!(
3450                service = %instance.service_name,
3451                endpoint = %endpoint.name,
3452                target_role = ?endpoint.target_role,
3453                container_count = containers.len(),
3454                "no backends collected for endpoint - either no matching role, no overlay IPs, or filtering excluded all"
3455            );
3456        }
3457
3458        addrs
3459    }
3460
3461    /// Get service replica count
3462    ///
3463    /// # Errors
3464    /// Returns an error if the service is not found.
3465    pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
3466        let services = self.services.read().await;
3467        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
3468            container: name.to_string(),
3469            reason: "service not found".to_string(),
3470        })?;
3471
3472        Ok(instance.replica_count().await)
3473    }
3474
3475    /// Remove a workload (service, job, or cron)
3476    ///
3477    /// This method handles cleanup for different resource types:
3478    /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
3479    /// - **Job**: Unregisters from job executor
3480    /// - **Cron**: Unregisters from cron scheduler
3481    ///
3482    /// # Errors
3483    /// Returns an error if the service cannot be removed or scale-down fails.
3484    pub async fn remove_service(&self, name: &str) -> Result<()> {
3485        // Try to unregister from cron scheduler first
3486        if let Some(scheduler) = &self.cron_scheduler {
3487            scheduler.unregister(name).await;
3488        }
3489
3490        // Try to unregister from job executor
3491        if let Some(executor) = &self.job_executor {
3492            executor.unregister_job(name).await;
3493        }
3494
3495        // Unregister stream routes (TCP/UDP) from the stream registry
3496        if let Some(stream_registry) = &self.stream_registry {
3497            // Need to get the service spec to know which ports to unregister
3498            let services = self.services.read().await;
3499            if let Some(instance) = services.get(name) {
3500                for endpoint in &instance.spec.endpoints {
3501                    match endpoint.protocol {
3502                        Protocol::Tcp => {
3503                            let _ = stream_registry.unregister_tcp(endpoint.port);
3504                            tracing::debug!(
3505                                service = %name,
3506                                port = endpoint.port,
3507                                "Unregistered TCP stream route"
3508                            );
3509                        }
3510                        Protocol::Udp => {
3511                            let _ = stream_registry.unregister_udp(endpoint.port);
3512                            tracing::debug!(
3513                                service = %name,
3514                                port = endpoint.port,
3515                                "Unregistered UDP stream route"
3516                            );
3517                        }
3518                        _ => {} // HTTP routes handled above
3519                    }
3520                }
3521            }
3522            drop(services); // Release read lock
3523        }
3524
3525        // Unpublish node-loopback ports for every live replica of this
3526        // service so the loopback listeners are freed (mirror of the
3527        // per-replica unpublish in `ServiceInstance::scale_to`). Gated on the
3528        // spec's policy; recomputes each backend from the container's stored
3529        // runtime-resolved IP and port override.
3530        {
3531            let services = self.services.read().await;
3532            if let Some(instance) = services.get(name) {
3533                if instance.spec.publish_to_node_loopback() {
3534                    if let Some(proxy) = instance.proxy_manager() {
3535                        let containers = instance.containers().read().await;
3536                        for container in containers.values() {
3537                            if let Some(ip) = container.overlay_ip {
3538                                proxy
3539                                    .unpublish_loopback_for_container(
3540                                        &instance.spec,
3541                                        ip,
3542                                        container.port_override,
3543                                    )
3544                                    .await;
3545                            }
3546                        }
3547                    }
3548                }
3549            }
3550            drop(services); // Release read lock
3551        }
3552
3553        // Unpublish explicit port mappings for every live replica (mirror of the
3554        // loopback unpublish above). NOT gated on `publish_to_node_loopback()`.
3555        {
3556            let services = self.services.read().await;
3557            if let Some(instance) = services.get(name) {
3558                if let Some(proxy) = instance.proxy_manager() {
3559                    let containers = instance.containers().read().await;
3560                    for container in containers.values() {
3561                        if let Some(ip) = container.overlay_ip {
3562                            for mapping in &instance.spec.port_mappings {
3563                                if let Some(hp) = mapping.host_port.filter(|p| *p != 0) {
3564                                    let pm_backend = SocketAddr::new(
3565                                        ip,
3566                                        container.port_override.unwrap_or(mapping.container_port),
3567                                    );
3568                                    proxy
3569                                        .unpublish_port_mapping(hp, mapping.protocol, pm_backend)
3570                                        .await;
3571                                }
3572                            }
3573                        }
3574                    }
3575                }
3576            }
3577            drop(services); // Release read lock
3578        }
3579
3580        // Unregister containers from the supervisor
3581        if let Some(supervisor) = &self.container_supervisor {
3582            let containers = self.get_service_containers(name).await;
3583            for container_id in containers {
3584                supervisor.unsupervise(&container_id).await;
3585            }
3586            tracing::debug!(service = %name, "Unregistered containers from supervisor");
3587        }
3588
3589        // Clean up DNS records for the service (bare name + FQDNs).
3590        self.cleanup_service_dns(name).await;
3591
3592        // Tear down the per-service overlay bridge THROUGH overlayd so it isn't
3593        // leaked when a service is removed via a path that doesn't go through the
3594        // API deployment handler (which does its own `teardown_service_overlay`
3595        // in its teardown_order loop). Routing through overlayd keeps its
3596        // in-memory state synced — that is the only safe way to delete a `zl-…-b`
3597        // bridge (a hand `ip link del` deletes the link out from under overlayd
3598        // and desyncs it, requiring an overlayd restart). Idempotent with the
3599        // handler's call: deleting an already-gone bridge by deterministic name
3600        // is a non-fatal no-op.
3601        if let Some(overlay) = &self.overlay_manager {
3602            overlay.read().await.teardown_service_overlay(name).await;
3603        }
3604
3605        // Remove from services map (may or may not exist depending on rtype)
3606        let mut services = self.services.write().await;
3607        if services.remove(name).is_some() {
3608            tracing::debug!(service = %name, "Removed service from manager");
3609        }
3610
3611        Ok(())
3612    }
3613
3614    /// Remove every DNS record this service registered on attach: the bare
3615    /// compose service name (`{service}`), the service-level FQDN
3616    /// (`{service}.service.local`), and each replica's FQDN
3617    /// (`{replica}.{service}.service.local`). Best-effort; failures are logged.
3618    async fn cleanup_service_dns(&self, name: &str) {
3619        let Some(dns) = &self.dns_server else {
3620            return;
3621        };
3622
3623        // Bare compose service-name record (compose discovery).
3624        if let Err(e) = dns.remove_record(name).await {
3625            tracing::warn!(
3626                hostname = %name,
3627                error = %e,
3628                "failed to remove bare service-name DNS record"
3629            );
3630        }
3631
3632        // Service-level FQDN.
3633        let service_hostname = format!("{name}.service.local");
3634        if let Err(e) = dns.remove_record(&service_hostname).await {
3635            tracing::warn!(
3636                hostname = %service_hostname,
3637                error = %e,
3638                "failed to remove service DNS record"
3639            );
3640        } else {
3641            tracing::debug!(hostname = %service_hostname, "removed service DNS record");
3642        }
3643
3644        // Any remaining replica-specific FQDNs.
3645        let services = self.services.read().await;
3646        if let Some(instance) = services.get(name) {
3647            let containers = instance.containers().read().await;
3648            for (id, _) in containers.iter() {
3649                let replica_hostname = format!("{}.{}.service.local", id.replica, name);
3650                if let Err(e) = dns.remove_record(&replica_hostname).await {
3651                    tracing::warn!(
3652                        hostname = %replica_hostname,
3653                        error = %e,
3654                        "failed to remove replica DNS record during service removal"
3655                    );
3656                }
3657            }
3658        }
3659    }
3660
3661    /// Introspect service infrastructure wiring.
3662    /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
3663    pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
3664        let services = self.services.read().await;
3665        services.get(name).map(|i| {
3666            (
3667                i.has_overlay_manager(),
3668                i.has_proxy_manager(),
3669                i.has_dns_server(),
3670            )
3671        })
3672    }
3673
3674    /// List all services
3675    pub async fn list_services(&self) -> Vec<String> {
3676        self.services.read().await.keys().cloned().collect()
3677    }
3678
3679    /// Get logs for a service, aggregated from all container replicas.
3680    ///
3681    /// # Arguments
3682    /// * `service_name` - Name of the service to fetch logs for
3683    /// * `tail` - Number of lines to return per container (0 = all)
3684    /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
3685    ///
3686    /// # Errors
3687    /// Returns an error if the service or instance is not found.
3688    ///
3689    /// # Returns
3690    /// Structured log entries from all (or specific) container replicas. Each
3691    /// entry has its `service` and `deployment` fields populated when available.
3692    pub async fn get_service_logs(
3693        &self,
3694        service_name: &str,
3695        tail: usize,
3696        instance: Option<&str>,
3697    ) -> Result<Vec<LogEntry>> {
3698        let container_ids = self.get_service_containers(service_name).await;
3699
3700        if container_ids.is_empty() {
3701            return Err(AgentError::NotFound {
3702                container: service_name.to_string(),
3703                reason: "no containers found for service".to_string(),
3704            });
3705        }
3706
3707        // If a specific instance is requested, filter to just that one
3708        let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
3709            if let Ok(replica_num) = inst.parse::<u32>() {
3710                container_ids
3711                    .iter()
3712                    .filter(|id| id.replica == replica_num)
3713                    .collect()
3714            } else {
3715                // Try matching by full container ID string suffix
3716                container_ids
3717                    .iter()
3718                    .filter(|id| id.to_string().contains(inst))
3719                    .collect()
3720            }
3721        } else {
3722            container_ids.iter().collect()
3723        };
3724
3725        if target_ids.is_empty() {
3726            return Err(AgentError::NotFound {
3727                container: format!("{}/{}", service_name, instance.unwrap_or("?")),
3728                reason: "instance not found".to_string(),
3729            });
3730        }
3731
3732        let mut all_entries: Vec<LogEntry> = Vec::new();
3733
3734        for id in &target_ids {
3735            match self.runtime.container_logs(id, tail).await {
3736                Ok(mut entries) => {
3737                    // Populate service and deployment metadata on each entry
3738                    for entry in &mut entries {
3739                        if entry.service.is_none() {
3740                            entry.service = Some(service_name.to_string());
3741                        }
3742                        if entry.deployment.is_none() {
3743                            entry.deployment.clone_from(&self.deployment_name);
3744                        }
3745                    }
3746                    all_entries.extend(entries);
3747                }
3748                Err(e) => {
3749                    tracing::warn!(
3750                        service = service_name,
3751                        container = %id,
3752                        error = %e,
3753                        "Failed to read container logs"
3754                    );
3755                }
3756            }
3757        }
3758
3759        Ok(all_entries)
3760    }
3761
3762    /// Get all container IDs for a specific service
3763    ///
3764    /// Returns an empty vector if the service doesn't exist.
3765    ///
3766    /// # Arguments
3767    /// * `service_name` - Name of the service to query
3768    ///
3769    /// # Returns
3770    /// Vector of `ContainerIds` for all replicas of the service
3771    pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
3772        let services = self.services.read().await;
3773        if let Some(instance) = services.get(service_name) {
3774            instance.container_ids().await
3775        } else {
3776            Vec::new()
3777        }
3778    }
3779
3780    /// Get per-container info (id, image, real state, pid, overlay IP) for a
3781    /// specific service.
3782    ///
3783    /// Unlike [`get_service_containers`](Self::get_service_containers) (which
3784    /// returns ids only), this surfaces the REAL image reference and lifecycle
3785    /// state of each live container so the API/`ps` can report them accurately.
3786    ///
3787    /// Returns an empty vector if the service doesn't exist.
3788    pub async fn get_service_container_infos(&self, service_name: &str) -> Vec<ContainerInfo> {
3789        let services = self.services.read().await;
3790        if let Some(instance) = services.get(service_name) {
3791            instance.container_infos().await
3792        } else {
3793            Vec::new()
3794        }
3795    }
3796
3797    /// This node's **local** view of `service` (running replica count, health,
3798    /// containers), used for cluster-wide aggregation. Served by the internal
3799    /// `/api/v1/internal/services/{svc}/state` endpoint and used as the local
3800    /// part of [`Self::cluster_service_states`].
3801    pub async fn local_service_state(
3802        &self,
3803        service: &str,
3804    ) -> zlayer_types::cluster::NodeServiceState {
3805        use zlayer_types::cluster::{ClusterContainerSummary, NodeServiceState};
3806        let node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
3807        let infos = self.get_service_container_infos(service).await;
3808        #[allow(clippy::cast_possible_truncation)]
3809        let running = infos
3810            .iter()
3811            .filter(|i| i.state.eq_ignore_ascii_case("running"))
3812            .count() as u32;
3813        // A node running 0 replicas is trivially healthy (it can't drag the
3814        // cluster-wide aggregate). Otherwise require a Healthy health state.
3815        let healthy = if running == 0 {
3816            true
3817        } else {
3818            let states = self.health_states();
3819            let guard = states.read().await;
3820            matches!(guard.get(service), Some(HealthState::Healthy))
3821        };
3822        let containers = infos
3823            .into_iter()
3824            .map(|i| ClusterContainerSummary {
3825                node_id,
3826                id: i.id.to_string(),
3827                service: i.id.service.clone(),
3828                replica: i.id.replica,
3829                image: i.image,
3830                state: i.state,
3831                pid: i.pid,
3832                overlay_ip: i.overlay_ip,
3833            })
3834            .collect();
3835        NodeServiceState {
3836            node_id,
3837            running,
3838            healthy,
3839            containers,
3840        }
3841    }
3842
3843    /// Cluster-wide per-node states for `service`: this node's local view plus
3844    /// every other node's (fetched via the cluster handle's
3845    /// `fetch_remote_service_states`). When not clustered, returns just the
3846    /// local view. This is the source of truth for distributed-service replica
3847    /// counts, health, and the `ps` container listing on the leader.
3848    pub async fn cluster_service_states(
3849        &self,
3850        service: &str,
3851    ) -> Vec<zlayer_types::cluster::NodeServiceState> {
3852        let mut states = vec![self.local_service_state(service).await];
3853        if let Some(cluster) = &self.cluster {
3854            states.extend(cluster.fetch_remote_service_states(service).await);
3855        }
3856        states
3857    }
3858
3859    /// Execute a command inside a running container for a service
3860    ///
3861    /// Picks a specific replica if provided, otherwise uses the first available container.
3862    ///
3863    /// # Arguments
3864    /// * `service_name` - Name of the service
3865    /// * `replica` - Optional replica number to target
3866    /// * `cmd` - Command and arguments to execute
3867    ///
3868    /// # Errors
3869    /// Returns an error if the service or replica is not found, or if exec fails.
3870    ///
3871    /// # Panics
3872    /// Panics if no replica is specified and the container list is unexpectedly empty
3873    /// after the emptiness check (should not happen in practice).
3874    ///
3875    /// # Returns
3876    /// Tuple of (`exit_code`, stdout, stderr)
3877    pub async fn exec_in_container(
3878        &self,
3879        service_name: &str,
3880        replica: Option<u32>,
3881        cmd: &[String],
3882    ) -> Result<(i32, String, String)> {
3883        let container_ids = self.get_service_containers(service_name).await;
3884
3885        if container_ids.is_empty() {
3886            return Err(AgentError::NotFound {
3887                container: service_name.to_string(),
3888                reason: "no containers found for service".to_string(),
3889            });
3890        }
3891
3892        // Pick the target container
3893        let target = if let Some(rep) = replica {
3894            container_ids
3895                .into_iter()
3896                .find(|cid| cid.replica == rep)
3897                .ok_or_else(|| AgentError::NotFound {
3898                    container: format!("{service_name}-rep-{rep}"),
3899                    reason: format!("replica {rep} not found for service"),
3900                })?
3901        } else {
3902            // Use the first container (lowest replica number)
3903            container_ids.into_iter().next().unwrap()
3904        };
3905
3906        self.runtime.exec(&target, cmd).await
3907    }
3908
3909    /// List every live container across all services, enriched with the data a
3910    /// Docker `ps` row needs and the data the Docker-name resolver needs.
3911    ///
3912    /// For each running container this surfaces the deployment name, the service
3913    /// name, the concrete [`ContainerId`], the compose `container_name:` label
3914    /// (when set, the user-facing Docker name), the real image, the lifecycle
3915    /// state, and the service's published port mappings. Used by the unified
3916    /// container-name resolver and by `docker ps` so compose deployments show up
3917    /// and resolve by their `container_name`.
3918    pub async fn list_container_views(&self) -> Vec<DeploymentContainerView> {
3919        let deployment = self.deployment_name.clone();
3920        let services = self.services.read().await;
3921        let mut out = Vec::new();
3922        for (service_name, instance) in services.iter() {
3923            let container_name = instance
3924                .spec
3925                .labels
3926                .get("com.docker.compose.container_name")
3927                .cloned();
3928            let ports = instance.spec.port_mappings.clone();
3929            for info in instance.container_infos().await {
3930                out.push(DeploymentContainerView {
3931                    deployment: deployment.clone(),
3932                    service: service_name.clone(),
3933                    container_id: info.id,
3934                    container_name: container_name.clone(),
3935                    image: info.image,
3936                    state: info.state,
3937                    pid: info.pid,
3938                    ports: ports.clone(),
3939                });
3940            }
3941        }
3942        out
3943    }
3944
3945    /// Resolve a Docker-style container name/id to a live deployment
3946    /// [`ContainerId`].
3947    ///
3948    /// Matching precedence (first hit wins):
3949    /// 1. The compose `container_name:` label (e.g. `forgejo-e2e`).
3950    /// 2. The conventional compose names `{deployment}-{service}-{replica}` and
3951    ///    `{deployment}_{service}_{replica}` (replica is 1-based, mirroring
3952    ///    Docker Compose; `ContainerId.replica` is 0-based so we add 1).
3953    /// 3. The bare service name (`{service}`), targeting its first replica.
3954    /// 4. The [`ContainerId`] `Display` form.
3955    ///
3956    /// Returns `None` when nothing matches a *running* container.
3957    pub async fn resolve_container_name(&self, name: &str) -> Option<ContainerId> {
3958        let views = self.list_container_views().await;
3959        // 1. explicit container_name label.
3960        if let Some(v) = views
3961            .iter()
3962            .find(|v| v.container_name.as_deref() == Some(name))
3963        {
3964            return Some(v.container_id.clone());
3965        }
3966        // 2 & 3. conventional names + bare service name.
3967        for v in &views {
3968            let dep = v.deployment.as_deref().unwrap_or("");
3969            let svc = &v.service;
3970            let rep1 = v.container_id.replica + 1;
3971            let candidates = [
3972                format!("{dep}-{svc}-{rep1}"),
3973                format!("{dep}_{svc}_{rep1}"),
3974                svc.clone(),
3975            ];
3976            if candidates.iter().any(|c| c == name) {
3977                return Some(v.container_id.clone());
3978            }
3979        }
3980        // 4. ContainerId Display form.
3981        for v in &views {
3982            if v.container_id.to_string() == name {
3983                return Some(v.container_id.clone());
3984            }
3985        }
3986        None
3987    }
3988
3989    /// Execute a command in a specific deployment container (by its concrete
3990    /// [`ContainerId`]) honouring Docker `exec` options (`--user`, `-w`, `-e`).
3991    ///
3992    /// Routes through [`Runtime::exec_with_opts`] so runtimes that support
3993    /// dropping to a uid/gid + chdir + env injection (macOS VZ-Linux) apply
3994    /// them; others fall back to a plain buffered exec.
3995    ///
3996    /// # Errors
3997    /// Propagates the runtime's exec error.
3998    pub async fn exec_in_container_id_with_opts(
3999        &self,
4000        id: &ContainerId,
4001        opts: &crate::runtime::ExecOptions,
4002    ) -> Result<(i32, String, String)> {
4003        self.runtime.exec_with_opts(id, opts).await
4004    }
4005
4006    // ==================== Job Management ====================
4007
4008    /// Trigger a job execution
4009    ///
4010    /// # Arguments
4011    /// * `name` - Name of the registered job
4012    /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
4013    ///
4014    /// # Returns
4015    /// The execution ID for tracking the job
4016    ///
4017    /// # Errors
4018    /// - Returns error if job executor is not configured
4019    /// - Returns error if the job is not registered
4020    pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
4021        let executor = self
4022            .job_executor
4023            .as_ref()
4024            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
4025
4026        let spec = executor
4027            .get_job_spec(name)
4028            .await
4029            .ok_or_else(|| AgentError::NotFound {
4030                container: name.to_string(),
4031                reason: "job not registered".to_string(),
4032            })?;
4033
4034        executor.trigger(name, &spec, trigger).await
4035    }
4036
4037    /// Get the status of a job execution
4038    ///
4039    /// # Arguments
4040    /// * `id` - The execution ID returned from `trigger_job`
4041    ///
4042    /// # Returns
4043    /// The job execution details, or None if not found
4044    pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
4045        if let Some(executor) = &self.job_executor {
4046            executor.get_execution(id).await
4047        } else {
4048            None
4049        }
4050    }
4051
4052    /// List all executions for a specific job
4053    ///
4054    /// # Arguments
4055    /// * `name` - Name of the job
4056    ///
4057    /// # Returns
4058    /// Vector of job executions for the specified job
4059    pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
4060        if let Some(executor) = &self.job_executor {
4061            executor.list_executions(name).await
4062        } else {
4063            Vec::new()
4064        }
4065    }
4066
4067    /// Cancel a running job execution
4068    ///
4069    /// # Arguments
4070    /// * `id` - The execution ID to cancel
4071    ///
4072    /// # Errors
4073    /// Returns error if job executor is not configured or if cancellation fails
4074    pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
4075        let executor = self
4076            .job_executor
4077            .as_ref()
4078            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
4079
4080        executor.cancel(id).await
4081    }
4082
4083    // ==================== Cron Management ====================
4084
4085    /// Manually trigger a cron job (outside of its schedule)
4086    ///
4087    /// # Arguments
4088    /// * `name` - Name of the cron job
4089    ///
4090    /// # Returns
4091    /// The execution ID for tracking the triggered job
4092    ///
4093    /// # Errors
4094    /// Returns error if cron scheduler is not configured or job not found
4095    pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
4096        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
4097            AgentError::Configuration("Cron scheduler not configured".to_string())
4098        })?;
4099
4100        scheduler.trigger_now(name).await
4101    }
4102
4103    /// Enable or disable a cron job
4104    ///
4105    /// # Arguments
4106    /// * `name` - Name of the cron job
4107    /// * `enabled` - Whether to enable or disable the job
4108    pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
4109        if let Some(scheduler) = &self.cron_scheduler {
4110            scheduler.set_enabled(name, enabled).await;
4111        }
4112    }
4113
4114    /// List all registered cron jobs
4115    pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
4116        if let Some(scheduler) = &self.cron_scheduler {
4117            scheduler.list_jobs().await
4118        } else {
4119            Vec::new()
4120        }
4121    }
4122
4123    /// Start the cron scheduler background task
4124    ///
4125    /// This spawns a background task that checks for due cron jobs every second.
4126    /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
4127    ///
4128    /// # Errors
4129    /// Returns error if cron scheduler is not configured
4130    pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
4131        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
4132            AgentError::Configuration("Cron scheduler not configured".to_string())
4133        })?;
4134
4135        let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
4136        Ok(tokio::spawn(async move {
4137            scheduler.run_loop().await;
4138        }))
4139    }
4140
4141    /// Shutdown the cron scheduler
4142    pub fn shutdown_cron(&self) {
4143        if let Some(scheduler) = &self.cron_scheduler {
4144            scheduler.shutdown();
4145        }
4146    }
4147
4148    /// Snapshot every live service's name paired with its full [`ServiceSpec`].
4149    ///
4150    /// Used by the agent-local autoscale controller to self-discover services
4151    /// that should be registered for adaptive scaling (and to seed the
4152    /// rolling-restart template). Returns a cloned, point-in-time view so the
4153    /// caller never holds the services lock across an await.
4154    pub async fn service_specs(&self) -> Vec<(String, ServiceSpec)> {
4155        self.services
4156            .read()
4157            .await
4158            .iter()
4159            .map(|(name, instance)| (name.clone(), instance.spec.clone()))
4160            .collect()
4161    }
4162}
4163
4164/// Bridge the post-`Arc::try_unwrap` [`ServiceManager`] (which the daemon holds
4165/// as `Arc<RwLock<ServiceManager>>`) into the proxy's [`ScaleTrigger`] so the
4166/// scale-to-zero activator can wake an idle service on the next inbound request.
4167///
4168/// Implementing the trait on `RwLock<ServiceManager>` lets the daemon's
4169/// `Arc<RwLock<ServiceManager>>` coerce directly to `Arc<dyn ScaleTrigger>`
4170/// without any wrapper type. Each `scale_to` takes a short read guard and
4171/// forwards to [`ServiceManager::scale_service`] (which itself routes through
4172/// the cluster when one is configured).
4173#[async_trait::async_trait]
4174impl crate::proxy_manager::ScaleTrigger for RwLock<ServiceManager> {
4175    async fn scale_to(&self, service: &str, replicas: u32) -> std::result::Result<(), String> {
4176        self.read()
4177            .await
4178            .scale_service(service, replicas)
4179            .await
4180            .map_err(|e| e.to_string())
4181    }
4182}
4183
4184/// The swarm identity of a service spec, if it is a member of an inference
4185/// swarm: `spec.resources.gpu.sharding.swarm_id`. `None` for ordinary services.
4186///
4187/// This is the partition key that groups swarm members so they can be placed
4188/// together as a coordinated gang (see [`partition_swarm_groups`]).
4189#[must_use]
4190pub fn swarm_id_of(spec: &ServiceSpec) -> Option<String> {
4191    spec.resources
4192        .gpu
4193        .as_ref()
4194        .and_then(|g| g.sharding.as_ref())
4195        .map(|s| s.swarm_id.clone())
4196}
4197
4198/// Desired replica count for a single swarm member. Swarm members are
4199/// single-unit ring stages / coordinators, so the gang treats each as one
4200/// replica; we still honor an explicit `Fixed`/`Adaptive` intent (defaulting to
4201/// 1) so a spec that requests more is not silently clamped.
4202#[must_use]
4203pub fn swarm_member_replicas(spec: &ServiceSpec) -> u32 {
4204    match &spec.scale {
4205        zlayer_spec::ScaleSpec::Fixed { replicas } => (*replicas).max(1),
4206        zlayer_spec::ScaleSpec::Adaptive { min, .. } => (*min).max(1),
4207        zlayer_spec::ScaleSpec::Manual => 1,
4208    }
4209}
4210
4211/// Partition a deploy's services into swarm groups keyed by `swarm_id`.
4212///
4213/// Only services whose spec carries `gpu.sharding` (see [`swarm_id_of`]) are
4214/// included; ordinary services are omitted entirely (they stay on the normal
4215/// per-service deploy path). The returned map is `swarm_id -> [(name, spec)]`.
4216#[must_use]
4217pub(crate) fn partition_swarm_groups(
4218    services: &HashMap<String, ServiceSpec>,
4219) -> HashMap<String, Vec<(String, ServiceSpec)>> {
4220    let mut groups: HashMap<String, Vec<(String, ServiceSpec)>> = HashMap::new();
4221    for (name, spec) in services {
4222        if let Some(swarm_id) = swarm_id_of(spec) {
4223            groups
4224                .entry(swarm_id)
4225                .or_default()
4226                .push((name.clone(), spec.clone()));
4227        }
4228    }
4229    groups
4230}
4231
4232#[cfg(test)]
4233#[allow(deprecated)]
4234mod tests {
4235    use super::*;
4236    use crate::runtime::MockRuntime;
4237
4238    #[tokio::test]
4239    async fn test_service_manager() {
4240        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4241        let manager = ServiceManager::new(runtime);
4242
4243        // Add service
4244        let spec = mock_spec();
4245        Box::pin(manager.upsert_service("test".to_string(), spec))
4246            .await
4247            .unwrap();
4248
4249        // Scale up
4250        manager.scale_service("test", 3).await.unwrap();
4251
4252        // Check count
4253        let count = manager.service_replica_count("test").await.unwrap();
4254        assert_eq!(count, 3);
4255
4256        // List services
4257        let services = manager.list_services().await;
4258        assert_eq!(services, vec!["test".to_string()]);
4259    }
4260
4261    #[tokio::test]
4262    async fn test_service_manager_basic_lifecycle() {
4263        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4264        let manager = ServiceManager::new(runtime);
4265
4266        // Add service with HTTP endpoint
4267        let spec = mock_spec();
4268        Box::pin(manager.upsert_service("api".to_string(), spec))
4269            .await
4270            .unwrap();
4271
4272        // Scale up
4273        manager.scale_service("api", 2).await.unwrap();
4274
4275        // Check count
4276        let count = manager.service_replica_count("api").await.unwrap();
4277        assert_eq!(count, 2);
4278
4279        // Remove service
4280        manager.remove_service("api").await.unwrap();
4281
4282        // Verify service is gone
4283        let services = manager.list_services().await;
4284        assert!(!services.contains(&"api".to_string()));
4285    }
4286
4287    #[tokio::test]
4288    async fn test_service_manager_with_full_config() {
4289        use tokio::sync::RwLock;
4290
4291        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4292
4293        // Create a mock overlay manager (skip actual network setup)
4294        let overlay_manager = Arc::new(RwLock::new(
4295            OverlayManager::new("test-deployment".to_string(), "test".to_string())
4296                .await
4297                .unwrap(),
4298        ));
4299
4300        let manager =
4301            ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
4302
4303        // Add service
4304        let spec = mock_spec();
4305        Box::pin(manager.upsert_service("web".to_string(), spec))
4306            .await
4307            .unwrap();
4308
4309        // Verify service is registered
4310        let services = manager.list_services().await;
4311        assert!(services.contains(&"web".to_string()));
4312    }
4313
4314    #[test]
4315    fn test_container_state_as_str() {
4316        use crate::runtime::ContainerState;
4317        assert_eq!(ContainerState::Pending.as_str(), "pending");
4318        assert_eq!(ContainerState::Initializing.as_str(), "initializing");
4319        assert_eq!(ContainerState::Running.as_str(), "running");
4320        assert_eq!(ContainerState::Stopping.as_str(), "stopping");
4321        assert_eq!(ContainerState::Exited { code: 0 }.as_str(), "exited");
4322        assert_eq!(
4323            ContainerState::Failed {
4324                reason: "boom".to_string()
4325            }
4326            .as_str(),
4327            "failed"
4328        );
4329        // Display delegates to as_str.
4330        assert_eq!(ContainerState::Running.to_string(), "running");
4331    }
4332
4333    /// A container created from image X must report image X and its real
4334    /// lifecycle state through the new `container_infos` accessor, replacing
4335    /// the previously hardcoded `"running"` / empty-image behavior.
4336    #[tokio::test]
4337    async fn test_container_infos_surfaces_image_and_state() {
4338        use crate::runtime::{Container, ContainerState};
4339
4340        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4341        let manager = ServiceManager::new(runtime);
4342
4343        let spec = mock_spec(); // image name = "test:latest"
4344        let image = spec.image.name.to_string();
4345        Box::pin(manager.upsert_service("web".to_string(), spec))
4346            .await
4347            .unwrap();
4348
4349        // Inject containers directly with distinct states.
4350        {
4351            let services = manager.services.read().await;
4352            let instance = services.get("web").unwrap();
4353            let mut containers = instance.containers().write().await;
4354
4355            let running_id = ContainerId::new("web", 0);
4356            containers.insert(
4357                running_id.clone(),
4358                Container {
4359                    id: running_id,
4360                    image: image.clone(),
4361                    state: ContainerState::Running,
4362                    pid: Some(4242),
4363                    task: None,
4364                    overlay_ip: None,
4365                    health_monitor: None,
4366                    port_override: None,
4367                },
4368            );
4369
4370            let exited_id = ContainerId::new("web", 1);
4371            containers.insert(
4372                exited_id.clone(),
4373                Container {
4374                    id: exited_id,
4375                    image: image.clone(),
4376                    state: ContainerState::Exited { code: 1 },
4377                    pid: None,
4378                    task: None,
4379                    overlay_ip: None,
4380                    health_monitor: None,
4381                    port_override: None,
4382                },
4383            );
4384        }
4385
4386        let mut infos = manager.get_service_container_infos("web").await;
4387        infos.sort_by_key(|i| i.id.replica);
4388        assert_eq!(infos.len(), 2);
4389
4390        // Every container reports the real image it was created from.
4391        assert!(infos.iter().all(|i| i.image == image));
4392        assert!(infos.iter().all(|i| i.image == "test:latest"));
4393
4394        // Real per-container state is surfaced (not a hardcoded "running").
4395        assert_eq!(infos[0].state, "running");
4396        assert_eq!(infos[0].pid, Some(4242));
4397        assert_eq!(infos[1].state, "exited");
4398        assert_eq!(infos[1].pid, None);
4399
4400        // Unknown service yields an empty list.
4401        assert!(manager
4402            .get_service_container_infos("missing")
4403            .await
4404            .is_empty());
4405    }
4406
4407    /// Bug 2 (`cluster_upgrade`): a changed image *reference* (tag bump) under
4408    /// `if_not_present` must still recreate the local replicas. Previously the
4409    /// recreate only fired on digest drift under `Always`/`Newer`, so a tag
4410    /// change was silently ignored and containers stayed on the old image.
4411    #[tokio::test]
4412    async fn upsert_recreates_local_replicas_on_image_reference_change() {
4413        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4414        let manager = ServiceManager::new(runtime);
4415
4416        // Deploy v1 with the e2e's pull policy (if_not_present) and scale up.
4417        let mut spec = mock_spec();
4418        spec.image.name = "docker.io/library/nginx:1.28-alpine".parse().unwrap();
4419        spec.image.pull_policy = zlayer_spec::PullPolicy::IfNotPresent;
4420        Box::pin(manager.upsert_service("web".to_string(), spec.clone()))
4421            .await
4422            .unwrap();
4423        manager.scale_service_local("web", 2).await.unwrap();
4424
4425        let v1: Vec<String> = manager
4426            .get_service_container_infos("web")
4427            .await
4428            .into_iter()
4429            .map(|i| i.image)
4430            .collect();
4431        assert_eq!(v1.len(), 2);
4432        assert!(
4433            v1.iter().all(|img| img.contains("1.28")),
4434            "expected v1 images, got {v1:?}"
4435        );
4436
4437        // Upgrade to v2 under the SAME if_not_present policy.
4438        let mut spec_v2 = spec;
4439        spec_v2.image.name = "docker.io/library/nginx:1.29-alpine".parse().unwrap();
4440        Box::pin(manager.upsert_service("web".to_string(), spec_v2))
4441            .await
4442            .unwrap();
4443
4444        let v2: Vec<String> = manager
4445            .get_service_container_infos("web")
4446            .await
4447            .into_iter()
4448            .map(|i| i.image)
4449            .collect();
4450        assert_eq!(v2.len(), 2, "replica count preserved across upgrade");
4451        assert!(
4452            v2.iter().all(|img| img.contains("1.29")),
4453            "containers must be recreated on the new image, got {v2:?}"
4454        );
4455    }
4456
4457    fn mock_spec() -> ServiceSpec {
4458        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4459            r"
4460version: v1
4461deployment: test
4462services:
4463  test:
4464    rtype: service
4465    image:
4466      name: test:latest
4467    endpoints:
4468      - name: http
4469        protocol: http
4470        port: 8080
4471    scale:
4472      mode: fixed
4473      replicas: 1
4474",
4475        )
4476        .unwrap()
4477        .services
4478        .remove("test")
4479        .unwrap()
4480    }
4481
4482    /// Build a swarm-member spec via YAML so it exercises the real deserialize
4483    /// path: a GPU block with a `sharding` entry tied to `swarm_id`.
4484    fn swarm_member_spec(swarm_id: &str, role: zlayer_spec::SwarmRole) -> ServiceSpec {
4485        let role_str = match role {
4486            zlayer_spec::SwarmRole::Coordinator => "coordinator",
4487            zlayer_spec::SwarmRole::Stage => "stage",
4488        };
4489        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(&format!(
4490            r"
4491version: v1
4492deployment: test
4493services:
4494  test:
4495    rtype: service
4496    image:
4497      name: test:latest
4498    resources:
4499      gpu:
4500        count: 1
4501        sharding:
4502          swarm_id: {swarm_id}
4503          layer_start: 0
4504          layer_end: 12
4505          layer_count: 36
4506          role: {role_str}
4507    scale:
4508      mode: fixed
4509      replicas: 1
4510"
4511        ))
4512        .unwrap()
4513        .services
4514        .remove("test")
4515        .unwrap()
4516    }
4517
4518    #[test]
4519    fn partition_swarm_groups_buckets_by_swarm_id_and_skips_plain_services() {
4520        let mut services: HashMap<String, ServiceSpec> = HashMap::new();
4521        services.insert("plain".to_string(), mock_spec()); // non-swarm: excluded
4522        services.insert(
4523            "stage-0".to_string(),
4524            swarm_member_spec("sw1", zlayer_spec::SwarmRole::Stage),
4525        );
4526        services.insert(
4527            "stage-1".to_string(),
4528            swarm_member_spec("sw1", zlayer_spec::SwarmRole::Stage),
4529        );
4530        services.insert(
4531            "other-stage".to_string(),
4532            swarm_member_spec("sw2", zlayer_spec::SwarmRole::Stage),
4533        );
4534
4535        let groups = partition_swarm_groups(&services);
4536
4537        // Only swarm members are bucketed; "plain" is omitted entirely.
4538        assert_eq!(groups.len(), 2, "two distinct swarm_ids");
4539        assert!(!groups.contains_key("plain"));
4540        let mut sw1: Vec<&str> = groups
4541            .get("sw1")
4542            .expect("sw1 group")
4543            .iter()
4544            .map(|(n, _)| n.as_str())
4545            .collect();
4546        sw1.sort_unstable();
4547        assert_eq!(sw1, vec!["stage-0", "stage-1"]);
4548        assert_eq!(groups.get("sw2").expect("sw2 group").len(), 1);
4549    }
4550
4551    #[test]
4552    fn swarm_id_of_is_none_for_plain_service() {
4553        assert_eq!(swarm_id_of(&mock_spec()), None);
4554        assert_eq!(
4555            swarm_id_of(&swarm_member_spec(
4556                "sw9",
4557                zlayer_spec::SwarmRole::Coordinator
4558            )),
4559            Some("sw9".to_string())
4560        );
4561    }
4562
4563    #[test]
4564    fn swarm_member_replicas_defaults_to_one() {
4565        // mock_spec uses Fixed{replicas:1}.
4566        assert_eq!(swarm_member_replicas(&mock_spec()), 1);
4567        let mut spec = mock_spec();
4568        spec.scale = zlayer_spec::ScaleSpec::Manual;
4569        assert_eq!(swarm_member_replicas(&spec), 1);
4570    }
4571
4572    #[test]
4573    fn test_set_container_dns_injects_when_empty() {
4574        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4575        let spec = mock_spec(); // spec.dns defaults to empty, host_network false
4576        let mut instance =
4577            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
4578        instance.set_container_dns("10.42.0.1".parse().unwrap());
4579        assert_eq!(instance.spec.dns, vec!["10.42.0.1".to_string()]);
4580    }
4581
4582    #[test]
4583    fn test_set_container_dns_skips_host_network() {
4584        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4585        let mut spec = mock_spec();
4586        spec.host_network = true;
4587        let mut instance =
4588            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
4589        instance.set_container_dns("10.42.0.1".parse().unwrap());
4590        assert!(
4591            instance.spec.dns.is_empty(),
4592            "host_network containers must inherit the host resolv.conf"
4593        );
4594    }
4595
4596    /// REGRESSION: a `host_network` container must NOT be attached to the
4597    /// overlay even when an overlay manager IS present. Attaching by the
4598    /// host-netns PID wires a stray `zl-*` veth into the host stack (and
4599    /// host-network containers have no private netns / must not get an overlay
4600    /// IP). The `start_container` attach is gated on `should_attach_overlay()`.
4601    #[tokio::test]
4602    async fn should_attach_overlay_skips_host_network_even_with_overlay_present() {
4603        use crate::overlay_manager::OverlayManager;
4604        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4605        let om = Arc::new(RwLock::new(
4606            OverlayManager::new("test".to_string(), "i0".to_string())
4607                .await
4608                .unwrap(),
4609        ));
4610
4611        // Normal service with the overlay present -> attaches.
4612        let normal = ServiceInstance::new(
4613            "web".to_string(),
4614            mock_spec(),
4615            Arc::clone(&runtime),
4616            Some(Arc::clone(&om)),
4617        );
4618        assert!(
4619            normal.should_attach_overlay(),
4620            "a non-host-network service must attach to the overlay"
4621        );
4622
4623        // Host-network service, overlay STILL present -> must skip attach.
4624        let mut hn_spec = mock_spec();
4625        hn_spec.host_network = true;
4626        let host_net =
4627            ServiceInstance::new("web".to_string(), hn_spec, Arc::clone(&runtime), Some(om));
4628        assert!(
4629            !host_net.should_attach_overlay(),
4630            "host_network containers must never attach to the overlay (stray zl-* link / no private netns)"
4631        );
4632    }
4633
4634    /// A lifecycle-recording runtime that captures the ORDER of the container
4635    /// lifecycle calls the scale-up path makes, and (like youki) makes the init
4636    /// PID available in the "created" state — at `create_container`, BEFORE
4637    /// `start_container`. Used to assert the overlay-attach window opens before
4638    /// start for host-process runtimes, and that an init/start failure detaches
4639    /// + removes (no leaked veth/container).
4640    struct LifecycleRecordingRuntime {
4641        inner: MockRuntime,
4642        /// Ordered lifecycle events: "create" | "pid" | "start" | "remove".
4643        events: std::sync::Mutex<Vec<&'static str>>,
4644        /// When true, `start_container` fails (exercises the start-failure
4645        /// cleanup path).
4646        fail_start: bool,
4647    }
4648
4649    impl std::fmt::Debug for LifecycleRecordingRuntime {
4650        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4651            f.debug_struct("LifecycleRecordingRuntime")
4652                .field("fail_start", &self.fail_start)
4653                .finish_non_exhaustive()
4654        }
4655    }
4656
4657    impl LifecycleRecordingRuntime {
4658        fn new(fail_start: bool) -> Self {
4659            Self {
4660                inner: MockRuntime::new(),
4661                events: std::sync::Mutex::new(Vec::new()),
4662                fail_start,
4663            }
4664        }
4665
4666        fn events(&self) -> Vec<&'static str> {
4667            self.events.lock().unwrap().clone()
4668        }
4669
4670        fn record(&self, ev: &'static str) {
4671            self.events.lock().unwrap().push(ev);
4672        }
4673    }
4674
4675    #[async_trait::async_trait]
4676    impl Runtime for LifecycleRecordingRuntime {
4677        async fn pull_image(&self, image: &str) -> Result<()> {
4678            self.inner.pull_image(image).await
4679        }
4680        async fn pull_image_with_policy(
4681            &self,
4682            image: &str,
4683            policy: PullPolicy,
4684            auth: Option<&zlayer_spec::RegistryAuth>,
4685            source: zlayer_spec::SourcePolicy,
4686        ) -> Result<()> {
4687            self.inner
4688                .pull_image_with_policy(image, policy, auth, source)
4689                .await
4690        }
4691        async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
4692            self.record("create");
4693            self.inner.create_container(id, spec).await
4694        }
4695        async fn start_container(&self, id: &ContainerId) -> Result<()> {
4696            self.record("start");
4697            if self.fail_start {
4698                return Err(AgentError::StartFailed {
4699                    id: id.to_string(),
4700                    reason: "injected start failure".to_string(),
4701                });
4702            }
4703            self.inner.start_container(id).await
4704        }
4705        async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
4706            self.inner.stop_container(id, timeout).await
4707        }
4708        async fn remove_container(&self, id: &ContainerId) -> Result<()> {
4709            self.record("remove");
4710            self.inner.remove_container(id).await
4711        }
4712        async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
4713            self.inner.container_state(id).await
4714        }
4715        async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
4716            self.inner.container_logs(id, tail).await
4717        }
4718        async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
4719            self.inner.exec(id, cmd).await
4720        }
4721        async fn get_container_stats(
4722            &self,
4723            id: &ContainerId,
4724        ) -> Result<crate::cgroups_stats::ContainerStats> {
4725            self.inner.get_container_stats(id).await
4726        }
4727        async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
4728            self.inner.wait_container(id).await
4729        }
4730        async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
4731            self.inner.get_logs(id).await
4732        }
4733        async fn get_container_pid(&self, _id: &ContainerId) -> Result<Option<u32>> {
4734            // youki writes the init pid-file at create time, so the PID is
4735            // available in the "created" state — even before start. Mirror that
4736            // here so the host overlay attach (which the scale-up path now runs
4737            // between create and start) sees a PID.
4738            self.record("pid");
4739            Ok(Some(std::process::id()))
4740        }
4741        async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
4742            self.inner.get_container_ip(id).await
4743        }
4744    }
4745
4746    /// REGRESSION: for a host-process (Linux youki) runtime, the scale-up path
4747    /// must capture the container PID and open the overlay-attach window in the
4748    /// "created" state — AFTER `create_container` and BEFORE `start_container`.
4749    /// Before this fix the PID was captured (and the overlay attached) only
4750    /// AFTER start, by which point the entrypoint had execve'd + dropped to a
4751    /// non-root user (non-dumpable netns → overlayd EACCES) or a one-shot had
4752    /// already exited (ENOENT). We assert ordering on the runtime calls the
4753    /// new flow makes: create → pid → start.
4754    #[tokio::test]
4755    async fn scale_up_captures_pid_before_start_for_host_runtime() {
4756        let runtime = Arc::new(LifecycleRecordingRuntime::new(false));
4757        let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
4758
4759        // No overlay manager: the attach itself is best-effort; this test pins
4760        // the ordering invariant (PID captured in the created state) that makes
4761        // a host overlay attach BEFORE start possible. The host runtime kind is
4762        // the default (`HostNetns`).
4763        let instance = ServiceInstance::new("web".to_string(), mock_spec(), runtime_dyn, None);
4764        instance.scale_to(1).await.unwrap();
4765
4766        let events = runtime.events();
4767        let create = events.iter().position(|e| *e == "create");
4768        let pid = events.iter().position(|e| *e == "pid");
4769        let start = events.iter().position(|e| *e == "start");
4770
4771        assert!(
4772            create.is_some(),
4773            "create_container must be called: {events:?}"
4774        );
4775        assert!(
4776            pid.is_some(),
4777            "the PID must be captured (get_container_pid called): {events:?}"
4778        );
4779        assert!(
4780            start.is_some(),
4781            "start_container must be called: {events:?}"
4782        );
4783        assert!(
4784            create < pid,
4785            "PID must be captured AFTER create_container: {events:?}"
4786        );
4787        assert!(
4788            pid < start,
4789            "PID must be captured (overlay-attach window opened) BEFORE \
4790             start_container for the host runtime kind: {events:?}"
4791        );
4792    }
4793
4794    /// REGRESSION: with the overlay attach moved BEFORE start, a
4795    /// `start_container` failure must not strand a half-created container — the
4796    /// cleanup path must `remove_container` (and, when attached, detach the
4797    /// overlay) before returning the error. Mirrors the job path.
4798    #[tokio::test]
4799    async fn scale_up_start_failure_removes_container() {
4800        let runtime = Arc::new(LifecycleRecordingRuntime::new(true));
4801        let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
4802
4803        let instance = ServiceInstance::new("web".to_string(), mock_spec(), runtime_dyn, None);
4804        let err = instance
4805            .scale_to(1)
4806            .await
4807            .expect_err("a start failure must propagate as an error");
4808        assert!(
4809            matches!(err, AgentError::StartFailed { .. }),
4810            "expected StartFailed, got {err:?}"
4811        );
4812
4813        let events = runtime.events();
4814        assert!(
4815            events.contains(&"start"),
4816            "start_container must have been attempted: {events:?}"
4817        );
4818        assert!(
4819            events.contains(&"remove"),
4820            "a failed start must remove the half-created container (no leak): {events:?}"
4821        );
4822        let start = events.iter().position(|e| *e == "start");
4823        let remove = events.iter().position(|e| *e == "remove");
4824        assert!(
4825            start < remove,
4826            "remove must run AFTER the failed start (cleanup): {events:?}"
4827        );
4828    }
4829
4830    /// REGRESSION: an init-action failure (now AFTER create + host overlay
4831    /// attach, BEFORE start) must also clean up — `remove_container` runs and
4832    /// the container is NEVER started.
4833    #[tokio::test]
4834    async fn scale_up_init_failure_removes_container_and_never_starts() {
4835        let runtime = Arc::new(LifecycleRecordingRuntime::new(false));
4836        let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
4837
4838        // An unknown init action fails immediately (UnknownAction →
4839        // InitActionFailed); the default error policy is `Fail` (no retries).
4840        let mut spec = mock_spec();
4841        spec.init = zlayer_spec::InitSpec {
4842            steps: vec![zlayer_spec::InitStep {
4843                id: "boom".to_string(),
4844                uses: "init.does_not_exist".to_string(),
4845                with: zlayer_spec::InitParams::default(),
4846                retry: None,
4847                timeout: None,
4848                on_failure: zlayer_spec::FailureAction::Fail,
4849            }],
4850        };
4851
4852        let instance = ServiceInstance::new("web".to_string(), spec, runtime_dyn, None);
4853        let err = instance
4854            .scale_to(1)
4855            .await
4856            .expect_err("an init-action failure must propagate as an error");
4857        assert!(
4858            matches!(err, AgentError::InitActionFailed { .. }),
4859            "expected InitActionFailed, got {err:?}"
4860        );
4861
4862        let events = runtime.events();
4863        assert!(
4864            events.contains(&"create"),
4865            "create_container must have run: {events:?}"
4866        );
4867        assert!(
4868            events.contains(&"remove"),
4869            "a failed init must remove the half-created container (no leak): {events:?}"
4870        );
4871        assert!(
4872            !events.contains(&"start"),
4873            "the container must NEVER be started when init fails: {events:?}"
4874        );
4875    }
4876
4877    #[test]
4878    fn test_set_container_dns_preserves_user_dns() {
4879        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4880        let mut spec = mock_spec();
4881        spec.dns = vec!["1.1.1.1".to_string()];
4882        let mut instance =
4883            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
4884        instance.set_container_dns("10.42.0.1".parse().unwrap());
4885        assert_eq!(
4886            instance.spec.dns,
4887            vec!["1.1.1.1".to_string()],
4888            "user-supplied spec.dns must win over the overlay resolver"
4889        );
4890    }
4891
4892    /// Helper to create a `ServiceSpec` with dependencies
4893    fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
4894        let mut spec = mock_spec();
4895        spec.depends = deps;
4896        spec
4897    }
4898
4899    /// Helper to create a `DependsSpec`
4900    fn dep(
4901        service: &str,
4902        condition: zlayer_spec::DependencyCondition,
4903        timeout_ms: u64,
4904        on_timeout: zlayer_spec::TimeoutAction,
4905    ) -> DependsSpec {
4906        DependsSpec {
4907            service: service.to_string(),
4908            condition,
4909            timeout: Some(Duration::from_millis(timeout_ms)),
4910            on_timeout,
4911        }
4912    }
4913
4914    #[tokio::test]
4915    async fn test_deploy_with_dependencies_no_deps() {
4916        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4917        let manager = ServiceManager::new(runtime);
4918
4919        // Services with no dependencies
4920        let mut services = HashMap::new();
4921        services.insert("a".to_string(), mock_spec());
4922        services.insert("b".to_string(), mock_spec());
4923
4924        // Should deploy both without issue
4925        Box::pin(manager.deploy_with_dependencies(services))
4926            .await
4927            .unwrap();
4928
4929        // Both services should be registered
4930        let service_list = manager.list_services().await;
4931        assert_eq!(service_list.len(), 2);
4932    }
4933
4934    #[tokio::test]
4935    async fn test_deploy_with_dependencies_linear() {
4936        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4937        let manager = ServiceManager::new(runtime);
4938
4939        // A -> B -> C (A depends on B, B depends on C)
4940        // All use "started" condition which is satisfied when container is running
4941        let mut services = HashMap::new();
4942        services.insert("c".to_string(), mock_spec());
4943        services.insert(
4944            "b".to_string(),
4945            mock_spec_with_deps(vec![dep(
4946                "c",
4947                zlayer_spec::DependencyCondition::Started,
4948                5000,
4949                zlayer_spec::TimeoutAction::Fail,
4950            )]),
4951        );
4952        services.insert(
4953            "a".to_string(),
4954            mock_spec_with_deps(vec![dep(
4955                "b",
4956                zlayer_spec::DependencyCondition::Started,
4957                5000,
4958                zlayer_spec::TimeoutAction::Fail,
4959            )]),
4960        );
4961
4962        // Should deploy in order: c, b, a
4963        Box::pin(manager.deploy_with_dependencies(services))
4964            .await
4965            .unwrap();
4966
4967        // All services should be registered
4968        let service_list = manager.list_services().await;
4969        assert_eq!(service_list.len(), 3);
4970    }
4971
4972    #[tokio::test]
4973    async fn test_deploy_with_dependencies_cycle_detection() {
4974        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4975        let manager = ServiceManager::new(runtime);
4976
4977        // A -> B -> A (cycle)
4978        let mut services = HashMap::new();
4979        services.insert(
4980            "a".to_string(),
4981            mock_spec_with_deps(vec![dep(
4982                "b",
4983                zlayer_spec::DependencyCondition::Started,
4984                5000,
4985                zlayer_spec::TimeoutAction::Fail,
4986            )]),
4987        );
4988        services.insert(
4989            "b".to_string(),
4990            mock_spec_with_deps(vec![dep(
4991                "a",
4992                zlayer_spec::DependencyCondition::Started,
4993                5000,
4994                zlayer_spec::TimeoutAction::Fail,
4995            )]),
4996        );
4997
4998        // Should fail with cycle detection
4999        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
5000        assert!(result.is_err());
5001        let err = result.unwrap_err().to_string();
5002        assert!(err.contains("Cyclic dependency"));
5003    }
5004
5005    #[tokio::test]
5006    async fn test_deploy_with_dependencies_timeout_continue() {
5007        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5008        let manager = ServiceManager::new(runtime);
5009
5010        // A depends on B (healthy), but B never becomes healthy
5011        // Using continue action, so it should proceed anyway
5012        let mut services = HashMap::new();
5013        services.insert("b".to_string(), mock_spec());
5014        services.insert(
5015            "a".to_string(),
5016            mock_spec_with_deps(vec![dep(
5017                "b",
5018                zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
5019                100,                                       // Short timeout
5020                zlayer_spec::TimeoutAction::Continue,      // But continue anyway
5021            )]),
5022        );
5023
5024        // Should deploy both despite timeout
5025        Box::pin(manager.deploy_with_dependencies(services))
5026            .await
5027            .unwrap();
5028
5029        let service_list = manager.list_services().await;
5030        assert_eq!(service_list.len(), 2);
5031    }
5032
5033    #[tokio::test]
5034    async fn test_deploy_with_dependencies_timeout_warn() {
5035        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5036        let manager = ServiceManager::new(runtime);
5037
5038        // A depends on B (healthy), but B never becomes healthy
5039        // Using warn action, so it should proceed with a warning
5040        let mut services = HashMap::new();
5041        services.insert("b".to_string(), mock_spec());
5042        services.insert(
5043            "a".to_string(),
5044            mock_spec_with_deps(vec![dep(
5045                "b",
5046                zlayer_spec::DependencyCondition::Healthy,
5047                100,
5048                zlayer_spec::TimeoutAction::Warn,
5049            )]),
5050        );
5051
5052        // Should deploy both despite timeout (with warning)
5053        Box::pin(manager.deploy_with_dependencies(services))
5054            .await
5055            .unwrap();
5056
5057        let service_list = manager.list_services().await;
5058        assert_eq!(service_list.len(), 2);
5059    }
5060
5061    #[tokio::test]
5062    async fn test_deploy_with_dependencies_timeout_fail() {
5063        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5064        let manager = ServiceManager::new(runtime);
5065
5066        // A depends on B (healthy), but B never becomes healthy
5067        // Using fail action, so deployment should fail
5068        let mut services = HashMap::new();
5069        services.insert("b".to_string(), mock_spec());
5070        services.insert(
5071            "a".to_string(),
5072            mock_spec_with_deps(vec![dep(
5073                "b",
5074                zlayer_spec::DependencyCondition::Healthy,
5075                100,
5076                zlayer_spec::TimeoutAction::Fail,
5077            )]),
5078        );
5079
5080        // Should fail after B is started but doesn't become healthy
5081        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
5082        assert!(result.is_err());
5083
5084        // B should be started (it has no deps), but A should fail
5085        let err = result.unwrap_err().to_string();
5086        assert!(err.contains("Dependency timeout"));
5087    }
5088
5089    #[tokio::test]
5090    async fn test_check_dependencies_all_satisfied() {
5091        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5092        let manager = ServiceManager::new(runtime);
5093
5094        // Mark a service as healthy
5095        manager
5096            .update_health_state("db", HealthState::Healthy)
5097            .await;
5098
5099        let deps = vec![DependsSpec {
5100            service: "db".to_string(),
5101            condition: zlayer_spec::DependencyCondition::Healthy,
5102            timeout: Some(Duration::from_secs(60)),
5103            on_timeout: zlayer_spec::TimeoutAction::Fail,
5104        }];
5105
5106        let satisfied = manager.check_dependencies(&deps).await.unwrap();
5107        assert!(satisfied);
5108    }
5109
5110    #[tokio::test]
5111    async fn test_check_dependencies_not_satisfied() {
5112        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5113        let manager = ServiceManager::new(runtime);
5114
5115        // Service not healthy (no state set = Unknown)
5116        let deps = vec![DependsSpec {
5117            service: "db".to_string(),
5118            condition: zlayer_spec::DependencyCondition::Healthy,
5119            timeout: Some(Duration::from_secs(60)),
5120            on_timeout: zlayer_spec::TimeoutAction::Fail,
5121        }];
5122
5123        let satisfied = manager.check_dependencies(&deps).await.unwrap();
5124        assert!(!satisfied);
5125    }
5126
5127    #[tokio::test]
5128    async fn test_health_state_tracking() {
5129        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5130        let manager = ServiceManager::new(runtime);
5131
5132        // Update health states
5133        manager
5134            .update_health_state("db", HealthState::Healthy)
5135            .await;
5136        manager
5137            .update_health_state("cache", HealthState::Unknown)
5138            .await;
5139
5140        // Verify states
5141        let states = manager.health_states();
5142        let states_read = states.read().await;
5143
5144        assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
5145        assert!(matches!(
5146            states_read.get("cache"),
5147            Some(HealthState::Unknown)
5148        ));
5149    }
5150
5151    /// Regression test for the stabilization timeout that blocked the raft-e2e
5152    /// `cluster_scaling` / `cluster_upgrade` suites.
5153    ///
5154    /// Previously the callback that bridges a container's health result into the
5155    /// `ServiceManager` `health_states` map was only attached when BOTH a proxy
5156    /// manager AND a reachable overlay IP existed. In degraded-overlay / no-proxy
5157    /// deployments that `if let` was false, so `health_states` was never written,
5158    /// the service stayed `healthy=false` forever, and stabilization timed out
5159    /// even though the container was running and its health check passing.
5160    ///
5161    /// This test drives the real `scale_to` create path with:
5162    ///   * NO `proxy_manager` (so `proxy_backend` resolves to None), and
5163    ///   * a `Command { command: "true" }` health check (always passes host-side),
5164    /// then asserts the shared `health_states` map receives `Healthy` for the
5165    /// service — proving the bridge fires unconditionally.
5166    ///
5167    /// Gated to `#[cfg(unix)]` because `HealthCheck::Command` is executed via
5168    /// `sh -c <command>` in `crate::health::HealthChecker::check_command`. On
5169    /// Windows hosts without `sh` on PATH (the default Windows CI image), no
5170    /// Command-based health check can ever pass, so the test would fail for
5171    /// reasons unrelated to the bridge it is regression-testing. The bridge
5172    /// behavior under test is platform-agnostic; only the test fixture's
5173    /// "always-passes command" needs a Unix shell.
5174    #[cfg(unix)]
5175    #[tokio::test]
5176    async fn test_health_states_bridge_fires_without_proxy() {
5177        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5178
5179        // Service spec with a host-side command health check that always passes.
5180        // Zero start-grace + a short interval keep the test fast.
5181        let mut spec = mock_spec();
5182        spec.health = zlayer_spec::HealthSpec {
5183            start_grace: Some(Duration::from_millis(0)),
5184            interval: Some(Duration::from_millis(50)),
5185            timeout: Some(Duration::from_secs(5)),
5186            retries: 1,
5187            check: HealthCheck::Command {
5188                command: "true".to_string(),
5189            },
5190        };
5191
5192        // Build a ServiceInstance with NO proxy_manager and NO overlay_manager,
5193        // then wire in the shared health_states map exactly as ServiceManager does.
5194        let mut instance =
5195            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
5196        let health_states: Arc<RwLock<HashMap<String, HealthState>>> =
5197            Arc::new(RwLock::new(HashMap::new()));
5198        instance.set_health_states(Arc::clone(&health_states));
5199
5200        // Drive the real create path (no proxy, MockRuntime IP present but proxy
5201        // absent => proxy_backend is None, hitting the previously-broken branch).
5202        instance.scale_to(1).await.unwrap();
5203
5204        // Poll for the bridged Healthy state (the monitor checks asynchronously
5205        // after its start grace). Bounded so a regression fails fast.
5206        let mut bridged = false;
5207        for _ in 0..100 {
5208            if matches!(
5209                health_states.read().await.get("web"),
5210                Some(HealthState::Healthy)
5211            ) {
5212                bridged = true;
5213                break;
5214            }
5215            tokio::time::sleep(Duration::from_millis(50)).await;
5216        }
5217
5218        assert!(
5219            bridged,
5220            "health_states must receive Healthy for the service even without a \
5221             proxy or overlay IP; the bridge regressed and stabilization would time out"
5222        );
5223    }
5224
5225    // ==================== Job/Cron Integration Tests ====================
5226
5227    fn mock_job_spec() -> ServiceSpec {
5228        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5229            r"
5230version: v1
5231deployment: test
5232services:
5233  backup:
5234    rtype: job
5235    image:
5236      name: backup:latest
5237",
5238        )
5239        .unwrap()
5240        .services
5241        .remove("backup")
5242        .unwrap()
5243    }
5244
5245    fn mock_cron_spec() -> ServiceSpec {
5246        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5247            r#"
5248version: v1
5249deployment: test
5250services:
5251  cleanup:
5252    rtype: cron
5253    schedule: "0 0 * * * * *"
5254    image:
5255      name: cleanup:latest
5256"#,
5257        )
5258        .unwrap()
5259        .services
5260        .remove("cleanup")
5261        .unwrap()
5262    }
5263
5264    #[tokio::test]
5265    async fn test_service_manager_with_job_executor() {
5266        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5267        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5268
5269        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
5270
5271        // Register job
5272        let job_spec = mock_job_spec();
5273        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
5274            .await
5275            .unwrap();
5276
5277        // Trigger job
5278        let exec_id = manager
5279            .trigger_job("backup", JobTrigger::Cli)
5280            .await
5281            .unwrap();
5282
5283        // Give job time to start
5284        tokio::time::sleep(Duration::from_millis(50)).await;
5285
5286        // Check execution exists
5287        let execution = manager.get_job_execution(&exec_id).await;
5288        assert!(execution.is_some());
5289        assert_eq!(execution.unwrap().job_name, "backup");
5290    }
5291
5292    #[tokio::test]
5293    async fn test_service_manager_with_cron_scheduler() {
5294        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5295        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5296        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
5297
5298        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
5299
5300        // Register cron job
5301        let cron_spec = mock_cron_spec();
5302        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
5303            .await
5304            .unwrap();
5305
5306        // List cron jobs
5307        let cron_jobs = manager.list_cron_jobs().await;
5308        assert_eq!(cron_jobs.len(), 1);
5309        assert_eq!(cron_jobs[0].name, "cleanup");
5310        assert!(cron_jobs[0].enabled);
5311    }
5312
5313    #[tokio::test]
5314    async fn test_service_manager_trigger_cron() {
5315        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5316        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5317        let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
5318
5319        let manager = ServiceManager::new(runtime)
5320            .with_job_executor(job_executor)
5321            .with_cron_scheduler(cron_scheduler);
5322
5323        // Register cron job
5324        let cron_spec = mock_cron_spec();
5325        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
5326            .await
5327            .unwrap();
5328
5329        // Manually trigger the cron job
5330        let exec_id = manager.trigger_cron("cleanup").await.unwrap();
5331        assert!(!exec_id.0.is_empty());
5332    }
5333
5334    #[tokio::test]
5335    async fn test_service_manager_enable_disable_cron() {
5336        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5337        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5338        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
5339
5340        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
5341
5342        // Register cron job
5343        let cron_spec = mock_cron_spec();
5344        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
5345            .await
5346            .unwrap();
5347
5348        // Initially enabled
5349        let cron_jobs = manager.list_cron_jobs().await;
5350        assert!(cron_jobs[0].enabled);
5351
5352        // Disable
5353        manager.set_cron_enabled("cleanup", false).await;
5354        let cron_jobs = manager.list_cron_jobs().await;
5355        assert!(!cron_jobs[0].enabled);
5356
5357        // Re-enable
5358        manager.set_cron_enabled("cleanup", true).await;
5359        let cron_jobs = manager.list_cron_jobs().await;
5360        assert!(cron_jobs[0].enabled);
5361    }
5362
5363    #[tokio::test]
5364    async fn test_service_manager_remove_cleans_up_job() {
5365        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5366        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5367
5368        let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
5369
5370        // Register job
5371        let job_spec = mock_job_spec();
5372        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
5373            .await
5374            .unwrap();
5375
5376        // Verify job is registered
5377        let spec = job_executor.get_job_spec("backup").await;
5378        assert!(spec.is_some());
5379
5380        // Remove job
5381        manager.remove_service("backup").await.unwrap();
5382
5383        // Verify job is unregistered
5384        let spec = job_executor.get_job_spec("backup").await;
5385        assert!(spec.is_none());
5386    }
5387
5388    #[tokio::test]
5389    async fn test_service_manager_remove_cleans_up_cron() {
5390        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5391        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5392        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
5393
5394        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
5395
5396        // Register cron job
5397        let cron_spec = mock_cron_spec();
5398        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
5399            .await
5400            .unwrap();
5401
5402        // Verify cron job is registered
5403        assert_eq!(cron_scheduler.job_count().await, 1);
5404
5405        // Remove cron job
5406        manager.remove_service("cleanup").await.unwrap();
5407
5408        // Verify cron job is unregistered
5409        assert_eq!(cron_scheduler.job_count().await, 0);
5410    }
5411
5412    #[tokio::test]
5413    async fn test_service_manager_job_without_executor() {
5414        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5415        let manager = ServiceManager::new(runtime);
5416
5417        // Try to trigger job without executor configured
5418        let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
5419        assert!(result.is_err());
5420        assert!(result.unwrap_err().to_string().contains("not configured"));
5421    }
5422
5423    #[tokio::test]
5424    async fn test_service_manager_cron_without_scheduler() {
5425        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5426        let manager = ServiceManager::new(runtime);
5427
5428        // Try to register cron job without scheduler configured
5429        let cron_spec = mock_cron_spec();
5430        let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
5431        assert!(result.is_err());
5432        assert!(result.unwrap_err().to_string().contains("not configured"));
5433    }
5434
5435    #[tokio::test]
5436    async fn test_service_manager_list_job_executions() {
5437        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5438        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5439
5440        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
5441
5442        // Register job
5443        let job_spec = mock_job_spec();
5444        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
5445            .await
5446            .unwrap();
5447
5448        // Trigger job twice
5449        manager
5450            .trigger_job("backup", JobTrigger::Cli)
5451            .await
5452            .unwrap();
5453        manager
5454            .trigger_job("backup", JobTrigger::Scheduler)
5455            .await
5456            .unwrap();
5457
5458        // Give jobs time to start
5459        tokio::time::sleep(Duration::from_millis(50)).await;
5460
5461        // List executions
5462        let executions = manager.list_job_executions("backup").await;
5463        assert_eq!(executions.len(), 2);
5464    }
5465
5466    // ==================== Container Supervisor Integration Tests ====================
5467
5468    #[tokio::test]
5469    async fn test_service_manager_with_supervisor() {
5470        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5471        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
5472
5473        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
5474
5475        // Add service
5476        let spec = mock_spec();
5477        Box::pin(manager.upsert_service("api".to_string(), spec))
5478            .await
5479            .unwrap();
5480
5481        // Scale up - containers should be registered with supervisor
5482        manager.scale_service("api", 2).await.unwrap();
5483
5484        // Verify containers are supervised
5485        assert_eq!(supervisor.supervised_count().await, 2);
5486
5487        // Scale down - containers should be unregistered
5488        manager.scale_service("api", 1).await.unwrap();
5489        assert_eq!(supervisor.supervised_count().await, 1);
5490
5491        // Remove service - remaining containers should be unregistered
5492        manager.remove_service("api").await.unwrap();
5493        assert_eq!(supervisor.supervised_count().await, 0);
5494    }
5495
5496    #[tokio::test]
5497    async fn test_service_manager_supervisor_state() {
5498        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5499        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
5500
5501        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
5502
5503        // Add and scale service
5504        let spec = mock_spec();
5505        Box::pin(manager.upsert_service("web".to_string(), spec))
5506            .await
5507            .unwrap();
5508        manager.scale_service("web", 1).await.unwrap();
5509
5510        // Check supervised state
5511        let container_id = ContainerId::new("web".to_string(), 1);
5512        let state = manager.get_container_supervised_state(&container_id).await;
5513        assert_eq!(state, Some(SupervisedState::Running));
5514    }
5515
5516    #[tokio::test]
5517    async fn test_service_manager_start_supervisor() {
5518        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5519        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
5520
5521        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
5522
5523        // Start the supervisor
5524        let handle = manager.start_container_supervisor().unwrap();
5525
5526        // Give it time to start
5527        tokio::time::sleep(Duration::from_millis(50)).await;
5528        assert!(supervisor.is_running());
5529
5530        // Shutdown
5531        manager.shutdown_container_supervisor();
5532
5533        // Wait for it to stop
5534        tokio::time::timeout(Duration::from_secs(1), handle)
5535            .await
5536            .unwrap()
5537            .unwrap();
5538
5539        assert!(!supervisor.is_running());
5540    }
5541
5542    #[tokio::test]
5543    async fn test_service_manager_supervisor_not_configured() {
5544        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5545        let manager = ServiceManager::new(runtime);
5546
5547        // Try to start supervisor without configuring it
5548        let result = manager.start_container_supervisor();
5549        assert!(result.is_err());
5550        assert!(result.unwrap_err().to_string().contains("not configured"));
5551    }
5552
5553    // ==================== Stream Registry Integration Tests ====================
5554
5555    fn mock_tcp_spec() -> ServiceSpec {
5556        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5557            r"
5558version: v1
5559deployment: test
5560services:
5561  database:
5562    rtype: service
5563    image:
5564      name: postgres:latest
5565    endpoints:
5566      - name: postgresql
5567        protocol: tcp
5568        port: 5432
5569    scale:
5570      mode: fixed
5571      replicas: 1
5572",
5573        )
5574        .unwrap()
5575        .services
5576        .remove("database")
5577        .unwrap()
5578    }
5579
5580    fn mock_udp_spec() -> ServiceSpec {
5581        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5582            r"
5583version: v1
5584deployment: test
5585services:
5586  dns:
5587    rtype: service
5588    image:
5589      name: dns:latest
5590    endpoints:
5591      - name: dns
5592        protocol: udp
5593        port: 53
5594    scale:
5595      mode: fixed
5596      replicas: 1
5597",
5598        )
5599        .unwrap()
5600        .services
5601        .remove("dns")
5602        .unwrap()
5603    }
5604
5605    fn mock_mixed_spec() -> ServiceSpec {
5606        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5607            r"
5608version: v1
5609deployment: test
5610services:
5611  mixed:
5612    rtype: service
5613    image:
5614      name: mixed:latest
5615    endpoints:
5616      - name: http
5617        protocol: http
5618        port: 8080
5619      - name: grpc
5620        protocol: tcp
5621        port: 9000
5622      - name: metrics
5623        protocol: udp
5624        port: 8125
5625    scale:
5626      mode: fixed
5627      replicas: 1
5628",
5629        )
5630        .unwrap()
5631        .services
5632        .remove("mixed")
5633        .unwrap()
5634    }
5635
5636    #[tokio::test]
5637    async fn test_service_manager_with_stream_registry_tcp() {
5638        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5639        let stream_registry = Arc::new(StreamRegistry::new());
5640
5641        let mut manager = ServiceManager::new(runtime);
5642        manager.set_stream_registry(stream_registry.clone());
5643        manager.set_deployment_name("test".to_string());
5644
5645        // Add TCP-only service
5646        let spec = mock_tcp_spec();
5647        Box::pin(manager.upsert_service("database".to_string(), spec))
5648            .await
5649            .unwrap();
5650
5651        // Verify TCP route was registered
5652        assert_eq!(stream_registry.tcp_count(), 1);
5653        assert!(stream_registry.tcp_ports().contains(&5432));
5654
5655        // Remove service and verify cleanup
5656        manager.remove_service("database").await.unwrap();
5657        assert_eq!(stream_registry.tcp_count(), 0);
5658    }
5659
5660    #[tokio::test]
5661    async fn test_service_manager_with_stream_registry_udp() {
5662        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5663        let stream_registry = Arc::new(StreamRegistry::new());
5664
5665        let mut manager = ServiceManager::new(runtime);
5666        manager.set_stream_registry(stream_registry.clone());
5667        manager.set_deployment_name("test".to_string());
5668
5669        // Add UDP-only service
5670        let spec = mock_udp_spec();
5671        Box::pin(manager.upsert_service("dns".to_string(), spec))
5672            .await
5673            .unwrap();
5674
5675        // Verify UDP route was registered
5676        assert_eq!(stream_registry.udp_count(), 1);
5677        assert!(stream_registry.udp_ports().contains(&53));
5678
5679        // Remove service and verify cleanup
5680        manager.remove_service("dns").await.unwrap();
5681        assert_eq!(stream_registry.udp_count(), 0);
5682    }
5683
5684    #[tokio::test]
5685    async fn test_service_manager_with_stream_registry_mixed() {
5686        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5687        let stream_registry = Arc::new(StreamRegistry::new());
5688
5689        let mut manager = ServiceManager::new(runtime);
5690        manager.set_stream_registry(stream_registry.clone());
5691        manager.set_deployment_name("test".to_string());
5692
5693        // Add mixed service (HTTP + TCP + UDP)
5694        let spec = mock_mixed_spec();
5695        Box::pin(manager.upsert_service("mixed".to_string(), spec))
5696            .await
5697            .unwrap();
5698
5699        // Verify stream routes were registered
5700        assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
5701        assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
5702
5703        assert!(stream_registry.tcp_ports().contains(&9000));
5704        assert!(stream_registry.udp_ports().contains(&8125));
5705
5706        // Remove service and verify stream cleanup
5707        manager.remove_service("mixed").await.unwrap();
5708        assert_eq!(stream_registry.tcp_count(), 0);
5709        assert_eq!(stream_registry.udp_count(), 0);
5710    }
5711
5712    #[tokio::test]
5713    async fn test_service_manager_stream_registry_builder() {
5714        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5715        let stream_registry = Arc::new(StreamRegistry::new());
5716
5717        // Test builder pattern
5718        let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
5719
5720        // Verify stream registry is accessible
5721        assert!(manager.stream_registry().is_some());
5722    }
5723
5724    #[tokio::test]
5725    async fn test_tcp_service_without_stream_registry() {
5726        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5727
5728        // Manager without stream registry
5729        let mut manager = ServiceManager::new(runtime);
5730        manager.set_deployment_name("test".to_string());
5731
5732        // Add TCP service - should log warning but not fail
5733        let spec = mock_tcp_spec();
5734        Box::pin(manager.upsert_service("database".to_string(), spec))
5735            .await
5736            .unwrap();
5737
5738        // No stream registry to check, but service should be tracked
5739        let services = manager.list_services().await;
5740        assert!(services.contains(&"database".to_string()));
5741    }
5742
5743    /// Verify `collect_endpoint_backends` filters containers by
5744    /// `EndpointSpec.target_role`.
5745    ///
5746    /// Given two replica groups (`primary` × 1, `read` × 2) and two
5747    /// endpoints — one with `target_role: primary` and one with
5748    /// `target_role: read` — each endpoint should receive only the
5749    /// matching containers' overlay addresses. The legacy no-filter
5750    /// endpoint (`target_role: None`) should receive all of them.
5751    #[tokio::test]
5752    #[allow(clippy::too_many_lines)]
5753    async fn test_collect_endpoint_backends_respects_target_role() {
5754        use crate::runtime::Container;
5755        use std::collections::HashMap as StdHashMap;
5756        use std::net::{IpAddr, Ipv4Addr};
5757        use zlayer_spec::{
5758            EndpointSpec, ExposeType, GroupAffinity, Protocol, ReplicaGroup, ScaleSpec,
5759        };
5760
5761        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5762        let manager = ServiceManager::new(runtime.clone());
5763
5764        // Build a spec with replica_groups and three endpoints:
5765        // - "write" targets role "primary"
5766        // - "read" targets role "read"
5767        // - "any" has no target_role (legacy)
5768        let mut spec = mock_spec();
5769        spec.replica_groups = Some(vec![
5770            ReplicaGroup {
5771                role: "primary".to_string(),
5772                count: 1,
5773                image: None,
5774                env: StdHashMap::new(),
5775                command: None,
5776                resources: None,
5777                affinity: GroupAffinity::default(),
5778            },
5779            ReplicaGroup {
5780                role: "read".to_string(),
5781                count: 2,
5782                image: None,
5783                env: StdHashMap::new(),
5784                command: None,
5785                resources: None,
5786                affinity: GroupAffinity::default(),
5787            },
5788        ]);
5789        spec.scale = ScaleSpec::Fixed { replicas: 3 };
5790        spec.endpoints = vec![
5791            EndpointSpec {
5792                name: "write".to_string(),
5793                protocol: Protocol::Tcp,
5794                port: 5432,
5795                target_port: Some(5432),
5796                path: None,
5797                host: None,
5798                expose: ExposeType::Internal,
5799                stream: None,
5800                tunnel: None,
5801                target_role: Some("primary".to_string()),
5802            },
5803            EndpointSpec {
5804                name: "read".to_string(),
5805                protocol: Protocol::Tcp,
5806                port: 5433,
5807                target_port: Some(5432),
5808                path: None,
5809                host: None,
5810                expose: ExposeType::Internal,
5811                stream: None,
5812                tunnel: None,
5813                target_role: Some("read".to_string()),
5814            },
5815            EndpointSpec {
5816                name: "any".to_string(),
5817                protocol: Protocol::Tcp,
5818                port: 5434,
5819                target_port: Some(5432),
5820                path: None,
5821                host: None,
5822                expose: ExposeType::Internal,
5823                stream: None,
5824                tunnel: None,
5825                target_role: None,
5826            },
5827        ];
5828
5829        let instance = ServiceInstance::new(
5830            "postgres".to_string(),
5831            spec.clone(),
5832            runtime,
5833            None, // overlay_manager — not exercised by this test
5834        );
5835
5836        // Inject three containers directly: one primary, two read replicas.
5837        let cid_primary = ContainerId::with_role_and_node("postgres", 1, "primary", 0);
5838        let cid_first_read = ContainerId::with_role_and_node("postgres", 2, "read", 0);
5839        let cid_second_read = ContainerId::with_role_and_node("postgres", 3, "read", 0);
5840
5841        let ip_primary = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
5842        let ip_first_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2));
5843        let ip_second_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3));
5844
5845        {
5846            let mut containers = instance.containers().write().await;
5847            containers.insert(
5848                cid_primary.clone(),
5849                Container {
5850                    id: cid_primary,
5851                    image: spec.image.name.to_string(),
5852                    state: crate::runtime::ContainerState::Running,
5853                    pid: None,
5854                    task: None,
5855                    overlay_ip: Some(ip_primary),
5856                    health_monitor: None,
5857                    port_override: None,
5858                },
5859            );
5860            containers.insert(
5861                cid_first_read.clone(),
5862                Container {
5863                    id: cid_first_read,
5864                    image: spec.image.name.to_string(),
5865                    state: crate::runtime::ContainerState::Running,
5866                    pid: None,
5867                    task: None,
5868                    overlay_ip: Some(ip_first_read),
5869                    health_monitor: None,
5870                    port_override: None,
5871                },
5872            );
5873            containers.insert(
5874                cid_second_read.clone(),
5875                Container {
5876                    id: cid_second_read,
5877                    image: spec.image.name.to_string(),
5878                    state: crate::runtime::ContainerState::Running,
5879                    pid: None,
5880                    task: None,
5881                    overlay_ip: Some(ip_second_read),
5882                    health_monitor: None,
5883                    port_override: None,
5884                },
5885            );
5886        }
5887
5888        let write_ep = &spec.endpoints[0];
5889        let read_ep = &spec.endpoints[1];
5890        let any_ep = &spec.endpoints[2];
5891
5892        let write_backends = manager.collect_endpoint_backends(&instance, write_ep).await;
5893        let read_backends = manager.collect_endpoint_backends(&instance, read_ep).await;
5894        let any_backends = manager.collect_endpoint_backends(&instance, any_ep).await;
5895
5896        // write endpoint -> only the primary container
5897        assert_eq!(write_backends.len(), 1, "write should match only primary");
5898        assert!(
5899            write_backends.iter().any(|a| a.ip() == ip_primary),
5900            "write backends missing primary IP: {write_backends:?}"
5901        );
5902
5903        // read endpoint -> both read containers, no primary
5904        assert_eq!(
5905            read_backends.len(),
5906            2,
5907            "read should match both read replicas"
5908        );
5909        assert!(read_backends.iter().any(|a| a.ip() == ip_first_read));
5910        assert!(read_backends.iter().any(|a| a.ip() == ip_second_read));
5911        assert!(
5912            !read_backends.iter().any(|a| a.ip() == ip_primary),
5913            "read backends must not contain primary: {read_backends:?}"
5914        );
5915
5916        // legacy endpoint (target_role = None) -> every container
5917        assert_eq!(
5918            any_backends.len(),
5919            3,
5920            "any-role endpoint should see all containers"
5921        );
5922    }
5923
5924    /// Build a `ServiceInstance` whose spec carries a deployment, so the
5925    /// instance's deployment-scoping helpers (`dns_hostnames` /
5926    /// `dns_search_domain`) are exercised through the real construction path
5927    /// (the constructors capture `spec.deployment`).
5928    fn instance_in_deployment(service: &str, deployment: &str) -> ServiceInstance {
5929        let mut spec = ServiceSpec::minimal(service, "postgres:16-alpine");
5930        spec.deployment = Some(deployment.to_string());
5931        ServiceInstance::new(
5932            service.to_string(),
5933            spec,
5934            Arc::new(MockRuntime::new()),
5935            None,
5936        )
5937    }
5938
5939    /// Register every hostname an instance would register into a real
5940    /// [`DnsServer`]'s authority, then resolve a name through that authority and
5941    /// assert the answer. Uses the DNS handle's direct authority lookup (no UDP
5942    /// roundtrip — a blocking sync DNS client inside a tokio runtime would
5943    /// deadlock the current-thread executor), so the test exercises the actual
5944    /// record store deterministically.
5945    async fn resolve_through_authority(
5946        handle: &zlayer_overlay::DnsHandle,
5947        fqdn: &str,
5948    ) -> Option<IpAddr> {
5949        handle.lookup_a(fqdn).await
5950    }
5951
5952    /// Bug 6 (the CRITICAL leak): two DIFFERENT deployments each with a service
5953    /// named `postgres`, registered into ONE daemon-global DNS authority, must
5954    /// resolve to their OWN instance's IP — no last-writer-wins clobber across
5955    /// deployments. The deployment-scoped FQDNs are what a guest's
5956    /// `search <D>.zlayer.local` expands the bare `postgres` / `postgres.service`
5957    /// queries into, so distinct scoped keys are exactly what prevents the leak.
5958    #[tokio::test]
5959    async fn deployment_scoped_dns_no_cross_deployment_clobber() {
5960        use zlayer_overlay::DnsServer;
5961
5962        let bind = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
5963        let server = DnsServer::new(bind, "zlayer.local.").expect("build DNS server");
5964        let handle = server.handle();
5965
5966        let inst_a = instance_in_deployment("postgres", "deploy-a");
5967        let inst_b = instance_in_deployment("postgres", "deploy-b");
5968
5969        let cid = ContainerId::with_role_and_node("postgres", 1, "default", 0);
5970        let ip_a = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 0, 11));
5971        let ip_b = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 0, 22));
5972
5973        // Register every name each instance would register for this container,
5974        // in deployment order, into the SAME authority.
5975        for name in inst_a.dns_hostnames(&cid) {
5976            handle.add_record(&name, ip_a).await.expect("add a record");
5977        }
5978        for name in inst_b.dns_hostnames(&cid) {
5979            handle.add_record(&name, ip_b).await.expect("add b record");
5980        }
5981
5982        // The deployment-scoped FQDNs stay distinct and resolve to their OWN IP.
5983        assert_eq!(
5984            resolve_through_authority(&handle, "postgres.deploy-a.zlayer.local").await,
5985            Some(ip_a),
5986            "deploy-a's postgres must resolve to deploy-a's IP"
5987        );
5988        assert_eq!(
5989            resolve_through_authority(&handle, "postgres.deploy-b.zlayer.local").await,
5990            Some(ip_b),
5991            "deploy-b's postgres must resolve to deploy-b's IP (no clobber)"
5992        );
5993        // `<svc>.service` form, scoped per deployment.
5994        assert_eq!(
5995            resolve_through_authority(&handle, "postgres.service.deploy-a.zlayer.local").await,
5996            Some(ip_a),
5997        );
5998        assert_eq!(
5999            resolve_through_authority(&handle, "postgres.service.deploy-b.zlayer.local").await,
6000            Some(ip_b),
6001        );
6002    }
6003
6004    /// Bug 6 part (a): two services in the SAME deployment resolve each other by
6005    /// their deployment-scoped names.
6006    #[tokio::test]
6007    async fn deployment_scoped_dns_same_deployment_siblings_resolve() {
6008        use zlayer_overlay::DnsServer;
6009
6010        let bind = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
6011        let server = DnsServer::new(bind, "zlayer.local.").expect("build DNS server");
6012        let handle = server.handle();
6013
6014        let db = instance_in_deployment("db", "myapp");
6015        let cache = instance_in_deployment("cache", "myapp");
6016        let cid = ContainerId::with_role_and_node("x", 1, "default", 0);
6017        let ip_db = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 1, 1));
6018        let ip_cache = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 1, 2));
6019
6020        for name in db.dns_hostnames(&cid) {
6021            handle.add_record(&name, ip_db).await.expect("add db");
6022        }
6023        for name in cache.dns_hostnames(&cid) {
6024            handle.add_record(&name, ip_cache).await.expect("add cache");
6025        }
6026
6027        // Within deployment `myapp`, the search-expanded sibling FQDNs resolve
6028        // to the same deployment's instances.
6029        assert_eq!(
6030            resolve_through_authority(&handle, "db.myapp.zlayer.local").await,
6031            Some(ip_db)
6032        );
6033        assert_eq!(
6034            resolve_through_authority(&handle, "cache.myapp.zlayer.local").await,
6035            Some(ip_cache)
6036        );
6037    }
6038
6039    /// Bug 6 part (b): the per-deployment resolv.conf `search` value is emitted
6040    /// correctly (deployment scope first, bare zone last for cross-deployment
6041    /// FQDN + global names), and is absent for a non-deployment instance.
6042    #[test]
6043    fn dns_search_domain_is_deployment_scoped() {
6044        let scoped = instance_in_deployment("api", "deploy-a");
6045        assert_eq!(
6046            scoped.dns_search_domain("zlayer.local"),
6047            Some("deploy-a.zlayer.local zlayer.local".to_string()),
6048        );
6049        // Trailing dot on the zone is normalized away.
6050        assert_eq!(
6051            scoped.dns_search_domain("zlayer.local."),
6052            Some("deploy-a.zlayer.local zlayer.local".to_string()),
6053        );
6054
6055        // No deployment => no override (caller falls back to the global zone).
6056        let unscoped = ServiceInstance::new(
6057            "api".to_string(),
6058            ServiceSpec::minimal("api", "nginx:latest"),
6059            Arc::new(MockRuntime::new()),
6060            None,
6061        );
6062        assert_eq!(unscoped.dns_search_domain("zlayer.local"), None);
6063    }
6064
6065    /// The deployment-scoped hostnames include both the bare-name scope
6066    /// (`<svc>.<D>`) and the `<svc>.service.<D>` form, and the legacy unscoped
6067    /// names are still emitted for native / compose back-compat.
6068    #[test]
6069    fn dns_hostnames_emit_scoped_and_legacy_families() {
6070        let inst = instance_in_deployment("postgres", "myapp");
6071        let cid = ContainerId::with_role_and_node("postgres", 2, "default", 0);
6072        let names = inst.dns_hostnames(&cid);
6073
6074        // Deployment-scoped family.
6075        assert!(names.contains(&"postgres.myapp".to_string()));
6076        assert!(names.contains(&"postgres.service.myapp".to_string()));
6077        assert!(names.contains(&"postgres.myapp.service".to_string()));
6078        assert!(names.contains(&"2.postgres.service.myapp".to_string()));
6079
6080        // Legacy / unscoped family (back-compat).
6081        assert!(names.contains(&"postgres".to_string()));
6082        assert!(names.contains(&"postgres.service.local".to_string()));
6083        assert!(names.contains(&"2.postgres.service.local".to_string()));
6084    }
6085
6086    /// Minimal [`Cluster`] stub for the external-domain DNS tests: only
6087    /// `select_ingress_overlay_ip` carries behavior (returns the configured
6088    /// peer overlay IP); every other method is an unreachable stub since these
6089    /// tests never exercise scaling/placement.
6090    struct IngressPeerCluster {
6091        ingress_overlay_ip: Option<String>,
6092    }
6093
6094    #[async_trait::async_trait]
6095    impl zlayer_scheduler::cluster::Cluster for IngressPeerCluster {
6096        fn node_id(&self) -> u64 {
6097            2
6098        }
6099        async fn select_ingress_overlay_ip(&self) -> Option<String> {
6100            self.ingress_overlay_ip.clone()
6101        }
6102        async fn is_leader(&self) -> bool {
6103            false
6104        }
6105        async fn leader_addr(&self) -> Option<std::net::SocketAddr> {
6106            None
6107        }
6108        async fn list_nodes(&self) -> Vec<zlayer_scheduler::cluster::NodeRecord> {
6109            Vec::new()
6110        }
6111        async fn dispatch_scale(
6112            &self,
6113            _target: u64,
6114            _req: zlayer_scheduler::cluster::InternalScaleRequest,
6115        ) -> Result<(), zlayer_scheduler::cluster::ClusterError> {
6116            unreachable!("not exercised by external-domain DNS tests")
6117        }
6118        async fn forward_scale(
6119            &self,
6120            _req: zlayer_scheduler::cluster::InternalScaleRequest,
6121        ) -> Result<(), zlayer_scheduler::cluster::ClusterError> {
6122            unreachable!("not exercised by external-domain DNS tests")
6123        }
6124        async fn place_container(
6125            &self,
6126            _spec: &zlayer_spec::ServiceSpec,
6127        ) -> Result<
6128            Option<zlayer_scheduler::cluster::ContainerPlacement>,
6129            zlayer_scheduler::cluster::ClusterError,
6130        > {
6131            unreachable!("not exercised by external-domain DNS tests")
6132        }
6133    }
6134
6135    /// A `ServiceSpec` whose single endpoint carries an external vhost domain.
6136    fn mock_spec_with_host(host: &str) -> ServiceSpec {
6137        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(&format!(
6138            r"
6139version: v1
6140deployment: test
6141services:
6142  web:
6143    rtype: service
6144    image:
6145      name: test:latest
6146    endpoints:
6147      - name: http
6148        protocol: http
6149        port: 8080
6150        host: {host}
6151    scale:
6152      mode: fixed
6153      replicas: 1
6154"
6155        ))
6156        .unwrap()
6157        .services
6158        .remove("web")
6159        .unwrap()
6160    }
6161
6162    #[tokio::test]
6163    async fn external_domain_registers_host_to_ingress_peer_ip() {
6164        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
6165        let dns = Arc::new(
6166            zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
6167        );
6168        let spec = mock_spec_with_host("app.example.com");
6169        let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
6170        instance.set_dns_server(Arc::clone(&dns));
6171        // THIS node is not the ingress; the cluster names a peer's overlay IP.
6172        instance.set_ingress_enabled(false);
6173        instance.set_cluster(Arc::new(IngressPeerCluster {
6174            ingress_overlay_ip: Some("10.200.0.7".to_string()),
6175        }));
6176
6177        instance.register_external_domains(&dns).await;
6178
6179        // The external domain resolves to the ingress peer's overlay IP.
6180        let handle = dns.handle();
6181        let ip = handle.lookup_a("app.example.com.").await;
6182        assert_eq!(ip, Some("10.200.0.7".parse().unwrap()));
6183    }
6184
6185    #[tokio::test]
6186    async fn external_domain_skips_when_no_ingress_node() {
6187        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
6188        let dns = Arc::new(
6189            zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
6190        );
6191        let spec = mock_spec_with_host("app.example.com");
6192        let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
6193        instance.set_dns_server(Arc::clone(&dns));
6194        // No ingress locally and the cluster has no ingress node → WARN + skip,
6195        // never error, and no record is registered.
6196        instance.set_ingress_enabled(false);
6197        instance.set_cluster(Arc::new(IngressPeerCluster {
6198            ingress_overlay_ip: None,
6199        }));
6200
6201        instance.register_external_domains(&dns).await;
6202
6203        let handle = dns.handle();
6204        assert_eq!(handle.lookup_a("app.example.com.").await, None);
6205    }
6206
6207    #[tokio::test]
6208    async fn external_domain_skips_wildcard_host_patterns() {
6209        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
6210        let dns = Arc::new(
6211            zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
6212        );
6213        // A wildcard host is a routing matcher, not a resolvable name.
6214        let spec = mock_spec_with_host("'*.example.com'");
6215        let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
6216        instance.set_dns_server(Arc::clone(&dns));
6217        instance.set_ingress_enabled(false);
6218        instance.set_cluster(Arc::new(IngressPeerCluster {
6219            ingress_overlay_ip: Some("10.200.0.7".to_string()),
6220        }));
6221
6222        instance.register_external_domains(&dns).await;
6223
6224        let handle = dns.handle();
6225        // Wildcard skipped → nothing registered under the literal pattern.
6226        assert_eq!(handle.lookup_a("*.example.com.").await, None);
6227    }
6228
6229    // -----------------------------------------------------------------------
6230    // Digest persistence + recreate-from-local
6231    // -----------------------------------------------------------------------
6232
6233    /// A `Runtime` test double that records every `source` policy passed to
6234    /// `pull_image_with_policy` and lets a test simulate "image is / isn't
6235    /// present locally" by failing the `LocalOnly` probe. `list_images` reports
6236    /// a fixed digest so `pull_and_refresh_digest` has something to record.
6237    ///
6238    /// Container-lifecycle methods delegate to an inner `MockRuntime` so the
6239    /// double is usable beyond the pull path; only `pull_image_with_policy` and
6240    /// `list_images` carry the recording/branching behaviour under test.
6241    struct RecordingRuntime {
6242        inner: MockRuntime,
6243        /// Every `(image, source_policy)` seen by `pull_image_with_policy`.
6244        pulls: std::sync::Mutex<Vec<(String, zlayer_spec::SourcePolicy)>>,
6245        /// When `false`, a `LocalOnly` pull FAILS (simulating a local miss);
6246        /// any non-`LocalOnly` (remote-capable) pull always succeeds.
6247        local_present: bool,
6248        /// Digest reported by `list_images` for the `test:latest` reference.
6249        digest: Option<String>,
6250    }
6251
6252    impl std::fmt::Debug for RecordingRuntime {
6253        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6254            f.debug_struct("RecordingRuntime")
6255                .field("local_present", &self.local_present)
6256                .field("digest", &self.digest)
6257                .finish_non_exhaustive()
6258        }
6259    }
6260
6261    impl RecordingRuntime {
6262        fn new(local_present: bool, digest: Option<String>) -> Self {
6263            Self {
6264                inner: MockRuntime::new(),
6265                pulls: std::sync::Mutex::new(Vec::new()),
6266                local_present,
6267                digest,
6268            }
6269        }
6270
6271        fn recorded_pulls(&self) -> Vec<(String, zlayer_spec::SourcePolicy)> {
6272            self.pulls.lock().unwrap().clone()
6273        }
6274    }
6275
6276    #[async_trait::async_trait]
6277    impl Runtime for RecordingRuntime {
6278        async fn pull_image(&self, image: &str) -> Result<()> {
6279            self.pull_image_with_policy(
6280                image,
6281                PullPolicy::IfNotPresent,
6282                None,
6283                zlayer_spec::SourcePolicy::default(),
6284            )
6285            .await
6286        }
6287
6288        async fn pull_image_with_policy(
6289            &self,
6290            image: &str,
6291            _policy: PullPolicy,
6292            _auth: Option<&zlayer_spec::RegistryAuth>,
6293            source: zlayer_spec::SourcePolicy,
6294        ) -> Result<()> {
6295            self.pulls.lock().unwrap().push((image.to_string(), source));
6296            // A LocalOnly pull only succeeds when the image is present locally;
6297            // otherwise it errors cleanly (mirroring the registry chain's
6298            // local-miss behaviour). Remote-capable policies always succeed.
6299            if matches!(source, zlayer_spec::SourcePolicy::LocalOnly) && !self.local_present {
6300                return Err(AgentError::PullFailed {
6301                    image: image.to_string(),
6302                    reason: "source_policy=local_only: not present in any local source".to_string(),
6303                });
6304            }
6305            Ok(())
6306        }
6307
6308        async fn list_images(&self) -> Result<Vec<crate::runtime::ImageInfo>> {
6309            Ok(vec![crate::runtime::ImageInfo {
6310                reference: "test:latest".to_string(),
6311                digest: self.digest.clone(),
6312                size_bytes: None,
6313            }])
6314        }
6315
6316        async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
6317            self.inner.create_container(id, spec).await
6318        }
6319        async fn start_container(&self, id: &ContainerId) -> Result<()> {
6320            self.inner.start_container(id).await
6321        }
6322        async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
6323            self.inner.stop_container(id, timeout).await
6324        }
6325        async fn remove_container(&self, id: &ContainerId) -> Result<()> {
6326            self.inner.remove_container(id).await
6327        }
6328        async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
6329            self.inner.container_state(id).await
6330        }
6331        async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
6332            self.inner.container_logs(id, tail).await
6333        }
6334        async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
6335            self.inner.exec(id, cmd).await
6336        }
6337        async fn get_container_stats(
6338            &self,
6339            id: &ContainerId,
6340        ) -> Result<crate::cgroups_stats::ContainerStats> {
6341            self.inner.get_container_stats(id).await
6342        }
6343        async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
6344            self.inner.wait_container(id).await
6345        }
6346        async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
6347            self.inner.get_logs(id).await
6348        }
6349        async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
6350            self.inner.get_container_pid(id).await
6351        }
6352        async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
6353            self.inner.get_container_ip(id).await
6354        }
6355    }
6356
6357    /// A recording [`crate::auth::DeploymentDigestSink`] that captures every
6358    /// `(deployment, service, digest)` tuple persisted after a pull.
6359    #[derive(Debug, Default)]
6360    struct RecordingDigestSink {
6361        records: std::sync::Mutex<Vec<(String, String, String)>>,
6362    }
6363
6364    impl RecordingDigestSink {
6365        fn records(&self) -> Vec<(String, String, String)> {
6366            self.records.lock().unwrap().clone()
6367        }
6368    }
6369
6370    #[async_trait::async_trait]
6371    impl crate::auth::DeploymentDigestSink for RecordingDigestSink {
6372        async fn record(&self, deployment: &str, service: &str, digest: &str) {
6373            self.records.lock().unwrap().push((
6374                deployment.to_string(),
6375                service.to_string(),
6376                digest.to_string(),
6377            ));
6378        }
6379    }
6380
6381    /// (3a) A successful pull records the resolved digest into the deployment
6382    /// store via the digest sink, keyed by `(deployment, service)`.
6383    #[tokio::test]
6384    async fn successful_pull_records_digest_into_sink() {
6385        let runtime = Arc::new(RecordingRuntime::new(
6386            true,
6387            Some("sha256:deadbeef".to_string()),
6388        ));
6389        let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
6390
6391        // The owning deployment name must be stamped on the per-service spec for
6392        // the sink to receive it (the deploy/restore paths stamp this; the
6393        // top-level DeploymentSpec `deployment` is not copied onto each service).
6394        let mut spec = mock_spec();
6395        spec.deployment = Some("test".to_string());
6396        let mut instance = ServiceInstance::new("test".to_string(), spec, runtime_dyn, None);
6397
6398        let sink = Arc::new(RecordingDigestSink::default());
6399        instance.set_digest_sink(Some(sink.clone()));
6400
6401        let digest = instance.pull_and_refresh_digest().await.unwrap();
6402        assert_eq!(digest.as_deref(), Some("sha256:deadbeef"));
6403
6404        let records = sink.records();
6405        assert_eq!(
6406            records,
6407            vec![(
6408                "test".to_string(),
6409                "test".to_string(),
6410                "sha256:deadbeef".to_string()
6411            )],
6412            "a successful pull must persist the resolved digest via the sink"
6413        );
6414    }
6415
6416    /// (3b) When a service carries a restore pin AND its image is present
6417    /// locally, the pull resolves STRICTLY locally (`SourcePolicy::LocalOnly`)
6418    /// and NEVER falls through to a remote/S3-capable policy.
6419    #[tokio::test]
6420    async fn restore_with_local_image_resolves_locally_only() {
6421        // local_present = true: the LocalOnly probe succeeds.
6422        let runtime = Arc::new(RecordingRuntime::new(
6423            true,
6424            Some("sha256:pinned".to_string()),
6425        ));
6426        let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
6427
6428        let spec = mock_spec();
6429        let instance = ServiceInstance::new("test".to_string(), spec, runtime_dyn, None);
6430
6431        // Pin the digest as the restore path does.
6432        instance
6433            .set_restore_pin(Some("sha256:pinned".to_string()))
6434            .await;
6435
6436        instance.pull_and_refresh_digest().await.unwrap();
6437
6438        let pulls = runtime.recorded_pulls();
6439        assert_eq!(
6440            pulls.len(),
6441            1,
6442            "a locally-present pinned image must be pulled exactly once"
6443        );
6444        assert!(
6445            matches!(pulls[0].1, zlayer_spec::SourcePolicy::LocalOnly),
6446            "restore-from-local must resolve via LocalOnly, got {:?}",
6447            pulls[0].1
6448        );
6449        assert!(
6450            !pulls
6451                .iter()
6452                .any(|(_, src)| !matches!(src, zlayer_spec::SourcePolicy::LocalOnly)),
6453            "no remote/S3-capable pull may occur when the image is local: {pulls:?}"
6454        );
6455    }
6456
6457    /// Restore counterpart: when the pinned image is NOT present locally, the
6458    /// pull falls back to the spec's (remote-capable) source policy so a
6459    /// genuinely-absent image still resolves normally.
6460    #[tokio::test]
6461    async fn restore_with_missing_local_image_falls_back_to_remote() {
6462        // local_present = false: the LocalOnly probe fails (local miss).
6463        let runtime = Arc::new(RecordingRuntime::new(
6464            false,
6465            Some("sha256:fresh".to_string()),
6466        ));
6467        let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
6468
6469        let spec = mock_spec();
6470        let instance = ServiceInstance::new("test".to_string(), spec, runtime_dyn, None);
6471        instance
6472            .set_restore_pin(Some("sha256:pinned".to_string()))
6473            .await;
6474
6475        instance.pull_and_refresh_digest().await.unwrap();
6476
6477        let pulls = runtime.recorded_pulls();
6478        assert_eq!(
6479            pulls.len(),
6480            2,
6481            "a local miss must try LocalOnly then fall back: {pulls:?}"
6482        );
6483        assert!(
6484            matches!(pulls[0].1, zlayer_spec::SourcePolicy::LocalOnly),
6485            "the first probe must be LocalOnly"
6486        );
6487        // mock_spec sets no source_policy → default (LocalFirst), which is
6488        // remote-capable.
6489        assert!(
6490            !matches!(pulls[1].1, zlayer_spec::SourcePolicy::LocalOnly),
6491            "the fallback must use the remote-capable spec policy, got {:?}",
6492            pulls[1].1
6493        );
6494    }
6495}