Skip to main content

zlayer_agent/
service.rs

1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6    DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::{IpAddr, SocketAddr};
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType, ServiceSpec};
24
25/// Service instance manages a single service's containers
26pub struct ServiceInstance {
27    pub service_name: String,
28    pub spec: ServiceSpec,
29    runtime: Arc<dyn Runtime + Send + Sync>,
30    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31    /// Overlay network manager for container networking (optional, not needed for Docker runtime)
32    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33    /// Proxy manager for updating backend health (optional)
34    proxy_manager: Option<Arc<ProxyManager>>,
35    /// DNS server for service discovery (optional)
36    dns_server: Option<Arc<DnsServer>>,
37    /// Container-injectable overlay resolver IP (optional). When set, this
38    /// node's overlay DNS server is reachable on `<ip>:53` and we inject it
39    /// into the container's resolv.conf so workloads resolve through the
40    /// overlay instead of inheriting the host's resolv.conf.
41    container_dns: Option<IpAddr>,
42    /// Shared health states map so callbacks can update ServiceManager-level health
43    health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
44    /// Most recently observed image digest after a successful pull. Used by
45    /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
46    /// requiring callers to track digest state externally. Wrapped in a
47    /// `RwLock` so `&self` methods (`scale_to`) can update it.
48    last_pulled_digest: tokio::sync::RwLock<Option<String>>,
49    /// Local cluster node id used when constructing new `ContainerId`s during
50    /// scale-up. `0` in single-node deployments or when the cluster handle is
51    /// not yet wired. Populated by `ServiceManager` from `Cluster::node_id()`
52    /// at instance construction time.
53    node_id: u64,
54}
55
56impl ServiceInstance {
57    /// Create a new service instance
58    pub fn new(
59        service_name: String,
60        spec: ServiceSpec,
61        runtime: Arc<dyn Runtime + Send + Sync>,
62        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
63    ) -> Self {
64        Self {
65            service_name,
66            spec,
67            runtime,
68            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
69            overlay_manager,
70            proxy_manager: None,
71            dns_server: None,
72            container_dns: None,
73            health_states: None,
74            last_pulled_digest: tokio::sync::RwLock::new(None),
75            node_id: 0,
76        }
77    }
78
79    /// Create a new service instance with proxy manager for health-aware load balancing
80    pub fn with_proxy(
81        service_name: String,
82        spec: ServiceSpec,
83        runtime: Arc<dyn Runtime + Send + Sync>,
84        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
85        proxy_manager: Arc<ProxyManager>,
86    ) -> Self {
87        Self {
88            service_name,
89            spec,
90            runtime,
91            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
92            overlay_manager,
93            proxy_manager: Some(proxy_manager),
94            dns_server: None,
95            container_dns: None,
96            health_states: None,
97            last_pulled_digest: tokio::sync::RwLock::new(None),
98            node_id: 0,
99        }
100    }
101
102    /// Set the local cluster node id. Used by `ServiceManager` to thread
103    /// `Cluster::node_id()` down to container construction so new
104    /// `ContainerId`s carry the owning node identity. Defaults to `0` (the
105    /// single-node sentinel) when unset.
106    pub fn set_node_id(&mut self, node_id: u64) {
107        self.node_id = node_id;
108    }
109
110    /// Derive the replica group role for a 1-based `replica_idx`.
111    ///
112    /// When `spec.replica_groups` is unset, returns `"default"` (the implicit
113    /// single-group case). Otherwise walks groups in declaration order,
114    /// accumulating each group's `count` until `replica_idx` falls within the
115    /// current group's range, and returns that group's `role`.
116    ///
117    /// Replicas beyond the declared total fall back to `"default"`.
118    #[must_use]
119    pub fn role_for_replica(&self, replica_idx: u32) -> String {
120        let Some(groups) = self.spec.replica_groups.as_ref() else {
121            return "default".to_string();
122        };
123        let mut cumulative = 0u32;
124        for group in groups {
125            cumulative = cumulative.saturating_add(group.count);
126            if replica_idx <= cumulative {
127                return group.role.clone();
128            }
129        }
130        "default".to_string()
131    }
132
133    /// Builder method to add DNS server for service discovery
134    #[must_use]
135    pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
136        self.dns_server = Some(dns_server);
137        self
138    }
139
140    /// Set the DNS server for service discovery
141    pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
142        self.dns_server = Some(dns_server);
143    }
144
145    /// Set the container-injectable overlay resolver IP and apply it to the
146    /// instance's spec.
147    ///
148    /// When `container_dns` is set and the spec is eligible (not host-network,
149    /// no user-supplied `dns`), this pre-populates `spec.dns` with the overlay
150    /// resolver so containers resolve through `<ip>:53` instead of inheriting
151    /// the host's `/etc/resolv.conf`.
152    ///
153    /// Why this exists: on overlay-enabled hosts the netbird `~.`
154    /// systemd-resolved hijack swallows the host resolver, so a container that
155    /// inherits the host resolv.conf cannot resolve anything. The overlay DNS
156    /// server forwards non-overlay queries upstream, so pointing the container
157    /// at it fixes resolution AND gives it service-name discovery.
158    ///
159    /// Port-53 constraint: `resolv.conf` `nameserver` lines (and Docker's
160    /// `--dns`) carry no port — they are always port 53. The injected IP is
161    /// therefore only useful because the daemon binds the overlay resolver on
162    /// `<ip>:53` (see `daemon.rs` Phase 4); the injected value is the bare IP,
163    /// not a `host:port`.
164    ///
165    /// User-supplied `spec.dns` is left untouched: an explicit resolver from
166    /// the deployment spec always wins.
167    pub fn set_container_dns(&mut self, container_dns: IpAddr) {
168        self.container_dns = Some(container_dns);
169        if !self.spec.host_network && self.spec.dns.is_empty() {
170            self.spec.dns = vec![container_dns.to_string()];
171        }
172    }
173
174    /// Set the proxy manager for health-aware load balancing
175    pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
176        self.proxy_manager = Some(proxy_manager);
177    }
178
179    /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
180    pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
181        self.health_states = Some(states);
182    }
183
184    /// Get the last observed image digest (after the most recent successful
185    /// pull). Returns `None` when no pull has happened yet, when the runtime
186    /// does not expose digests, or when no matching `ImageInfo` was found.
187    pub async fn last_pulled_digest(&self) -> Option<String> {
188        self.last_pulled_digest.read().await.clone()
189    }
190
191    /// Pull the service image using the spec's pull policy (literal Docker /
192    /// Kubernetes semantics — no silent auto-upgrade of `IfNotPresent` to
193    /// `Newer` for `:latest` tags) and refresh the cached digest from
194    /// `Runtime::list_images` when the runtime exposes it. Returns the digest
195    /// observed after the pull, when known.
196    ///
197    /// For `Never`, the runtime is still called so it can load the image
198    /// config from the local cache (without any remote round-trip); only the
199    /// remote digest refresh is skipped. Without this call the bundle builder
200    /// has no image entrypoint/cmd and falls back to `/bin/sh`.
201    async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
202        let image_str = self.spec.image.name.to_string();
203        let policy = self.spec.image.pull_policy;
204
205        self.runtime
206            .pull_image_with_policy(
207                &image_str,
208                policy,
209                None,
210                self.spec.image.source_policy.unwrap_or_default(),
211            )
212            .await
213            .map_err(|e| AgentError::PullFailed {
214                image: self.spec.image.name.to_string(),
215                reason: e.to_string(),
216            })?;
217
218        // Best-effort: try to discover the resolved digest via list_images.
219        // Runtimes that don't support introspection (Unsupported) leave the
220        // cached digest unchanged; drift detection then falls back to "always
221        // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
222        // when no digests are known".
223        let new_digest = match self.runtime.list_images().await {
224            Ok(images) => images
225                .into_iter()
226                .find(|info| info.reference == image_str)
227                .and_then(|info| info.digest),
228            Err(e) => {
229                tracing::debug!(
230                    image = %image_str,
231                    error = %e,
232                    "list_images unavailable; cannot record post-pull digest"
233                );
234                None
235            }
236        };
237
238        if let Some(ref digest) = new_digest {
239            *self.last_pulled_digest.write().await = Some(digest.clone());
240        }
241
242        Ok(new_digest)
243    }
244
245    /// Scale to the desired number of replicas
246    ///
247    /// This method uses short-lived locks to avoid blocking concurrent operations.
248    /// I/O operations (pull, create, start, stop, remove) are performed without
249    /// holding the containers lock to allow other operations to proceed.
250    ///
251    /// # Errors
252    /// Returns an error if image pull, container creation, or container lifecycle operations fail.
253    #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
254    pub async fn scale_to(&self, replicas: u32) -> Result<()> {
255        // Phase 1: Determine current state (short read lock)
256        let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
257
258        // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
259        // here with replicas == current_replicas in the steady state) actually
260        // refreshes the cached digest. We skip the call only when scaling
261        // strictly down (no new containers needed). For `Never` the runtime
262        // still needs to load the image config from the local cache so the
263        // bundle builder gets entrypoint/cmd/env — without it the container
264        // falls back to `/bin/sh` and exits instantly. `pull_and_refresh_digest`
265        // itself handles the Never case (no remote round-trip, cache-only).
266        if replicas >= current_replicas {
267            let _ = self.pull_and_refresh_digest().await?;
268        }
269
270        // Phase 2: Scale up - create new containers (no lock held during I/O)
271        //
272        // Compute (role, replica_index) tuples for each new replica. When
273        // `spec.replica_groups` is set, expand groups in declaration order so
274        // each created replica maps to its declared `(role, intra_group_index)`.
275        // Otherwise fall back to the implicit single "default" group. The
276        // `local_node_id` is captured once so every new `ContainerId` carries
277        // the owning node identity for cross-node disambiguation.
278        let local_node_id = self.node_id;
279        if replicas > current_replicas {
280            let replica_specs: Vec<(String, u32)> =
281                if let Some(groups) = self.spec.replica_groups.as_ref() {
282                    let mut specs: Vec<(String, u32)> = Vec::new();
283                    for group in groups {
284                        for idx in 0..group.count {
285                            specs.push((group.role.clone(), idx + 1));
286                        }
287                    }
288                    specs
289                        .into_iter()
290                        .skip(current_replicas as usize)
291                        .take((replicas - current_replicas) as usize)
292                        .collect()
293                } else {
294                    (current_replicas..replicas)
295                        .map(|i| ("default".to_string(), i + 1))
296                        .collect()
297                };
298
299            for (role, replica_idx) in replica_specs {
300                let id = ContainerId::with_role_and_node(
301                    self.service_name.clone(),
302                    replica_idx,
303                    role,
304                    local_node_id,
305                );
306
307                // Create container (no lock needed - I/O operation)
308                //
309                // RouteToPeer must propagate unchanged: the scheduler uses it
310                // to re-place the workload on a capable peer, and wrapping it
311                // in `CreateFailed` would hide the signal and mark the service
312                // dead instead of rescheduling it. All other errors are
313                // normalised to `CreateFailed` for upstream handling.
314                self.runtime
315                    .create_container(&id, &self.spec)
316                    .await
317                    .map_err(|e| match e {
318                        AgentError::RouteToPeer { .. } => e,
319                        other => AgentError::CreateFailed {
320                            id: id.to_string(),
321                            reason: other.to_string(),
322                        },
323                    })?;
324
325                // Run init actions with error policy enforcement (no lock needed)
326                let init_orchestrator = InitOrchestrator::with_error_policy(
327                    id.clone(),
328                    self.spec.init.clone(),
329                    self.spec.errors.clone(),
330                );
331                init_orchestrator.run().await?;
332
333                // Start container (no lock needed - I/O operation)
334                self.runtime
335                    .start_container(&id)
336                    .await
337                    .map_err(|e| AgentError::StartFailed {
338                        id: id.to_string(),
339                        reason: e.to_string(),
340                    })?;
341
342                // Get container PID with retries (may not be immediately available)
343                let mut container_pid = None;
344                for attempt in 1..=5u32 {
345                    match self.runtime.get_container_pid(&id).await {
346                        Ok(Some(pid)) => {
347                            container_pid = Some(pid);
348                            break;
349                        }
350                        Ok(None) if attempt < 5 => {
351                            tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
352                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
353                        }
354                        Ok(None) => {
355                            tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
356                        }
357                        Err(e) => {
358                            tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
359                            if attempt < 5 {
360                                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
361                            }
362                        }
363                    }
364                }
365
366                // Verify the container is still running before attempting
367                // overlay attach. If the init process crashed during start
368                // (bad image, missing libs, failed mount), the PID above is
369                // now dead and every `ip link set ... netns {pid}` will
370                // return a cryptic RTNETLINK error. Surface the real cause
371                // from the container's log tail instead of the cascade.
372                if container_pid.is_some() {
373                    let alive = match self.runtime.container_state(&id).await {
374                        Ok(
375                            ContainerState::Running
376                            | ContainerState::Pending
377                            | ContainerState::Initializing,
378                        ) => true,
379                        Ok(state) => {
380                            tracing::warn!(
381                                container = %id,
382                                ?state,
383                                "container exited before overlay attach could run"
384                            );
385                            false
386                        }
387                        Err(e) => {
388                            // State query failed — don't block the attach on
389                            // it. The overlay manager's own cleanup-on-error
390                            // path now handles the dead-PID case cleanly.
391                            tracing::warn!(
392                                container = %id,
393                                error = %e,
394                                "container state query failed before overlay attach, proceeding"
395                            );
396                            true
397                        }
398                    };
399                    if !alive {
400                        let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
401                            || "  <log read failed>".to_string(),
402                            |entries| {
403                                if entries.is_empty() {
404                                    "  <no log output>".to_string()
405                                } else {
406                                    entries
407                                        .into_iter()
408                                        .map(|e| format!("  {}", e.message))
409                                        .collect::<Vec<_>>()
410                                        .join("\n")
411                                }
412                            },
413                        );
414                        return Err(AgentError::StartFailed {
415                            id: id.to_string(),
416                            reason: format!("container exited during startup:\n{log_tail}"),
417                        });
418                    }
419                }
420
421                // Attach to overlay network if manager is available.
422                //
423                // Linux uses the container PID to enter the netns and attach a
424                // veth. Windows has no PID-addressable netns — the HCN namespace
425                // GUID (obtained from `get_container_namespace_id`) is used
426                // instead, and the endpoint's IP has already been populated by
427                // `EndpointAttachment::create_overlay` during container creation.
428                // We simply register that IP with the slice allocator so host
429                // accounting stays in sync.
430                let overlay_ip = if let Some(overlay) = &self.overlay_manager {
431                    let overlay_guard = overlay.read().await;
432                    #[cfg(target_os = "windows")]
433                    let attach_result: Option<std::net::IpAddr> = {
434                        // On Windows the overlay attach (HCN endpoint + per-container
435                        // namespace creation, via overlayd) already happened inside
436                        // `HcsRuntime::create_container`. Here we only need the IP it
437                        // assigned so we can register DNS for service discovery.
438                        let _ = (container_pid, &overlay_guard); // unused on Windows
439                        match self.runtime.get_container_ip(&id).await {
440                            Ok(Some(ip)) => Some(ip),
441                            Ok(None) => {
442                                tracing::debug!(
443                                    container = %id,
444                                    "no overlay IP recorded for container (overlay attach skipped at create time)"
445                                );
446                                None
447                            }
448                            Err(e) => {
449                                tracing::warn!(
450                                    container = %id,
451                                    error = %e,
452                                    "failed to fetch container overlay IP"
453                                );
454                                None
455                            }
456                        }
457                    };
458                    #[cfg(not(target_os = "windows"))]
459                    let attach_result: Option<std::net::IpAddr> = {
460                        match self.runtime.overlay_attach_kind() {
461                            // VM guest (macOS VZ-Linux): no host netns/PID, so
462                            // overlayd allocates the overlay identity and we push
463                            // it into the guest over vsock, where it brings up its
464                            // own kernel WireGuard device.
465                            crate::runtime::OverlayAttachKind::InGuestVsock => {
466                                let cid = id.to_string();
467                                match overlay_guard
468                                    .attach_container_guest(&cid, &self.service_name, true)
469                                    .await
470                                {
471                                    Ok(cfg) => {
472                                        let ip = cfg.overlay_ip;
473                                        match self.runtime.push_overlay_config(&id, &cfg).await {
474                                            Ok(()) => Some(ip),
475                                            Err(e) => {
476                                                tracing::warn!(
477                                                    container = %id,
478                                                    error = %e,
479                                                    "failed to push overlay config into guest; rolling back allocation"
480                                                );
481                                                // Don't leak the overlayd IP/peer.
482                                                if let Err(de) =
483                                                    overlay_guard.detach_container_guest(&cid).await
484                                                {
485                                                    tracing::warn!(
486                                                        container = %id,
487                                                        error = %de,
488                                                        "failed to roll back guest overlay allocation"
489                                                    );
490                                                }
491                                                None
492                                            }
493                                        }
494                                    }
495                                    Err(e) => {
496                                        tracing::warn!(
497                                            container = %id,
498                                            error = %e,
499                                            "failed to allocate guest overlay config from overlayd"
500                                        );
501                                        None
502                                    }
503                                }
504                            }
505                            // Host-process runtimes (Linux youki): plumb a veth
506                            // into the container's netns by PID.
507                            _ => {
508                                if let Some(pid) = container_pid {
509                                    match overlay_guard
510                                        .attach_container(pid, &self.service_name, true)
511                                        .await
512                                    {
513                                        Ok(ip) => Some(ip),
514                                        Err(e) => {
515                                            tracing::warn!(
516                                                container = %id,
517                                                error = %e,
518                                                "failed to attach container to overlay network"
519                                            );
520                                            None
521                                        }
522                                    }
523                                } else {
524                                    // No PID available (e.g. WASM runtime) - skip overlay attachment
525                                    tracing::debug!(
526                                        container = %id,
527                                        "skipping overlay attachment - no PID available"
528                                    );
529                                    None
530                                }
531                            }
532                        }
533                    };
534
535                    if let Some(ip) = attach_result {
536                        tracing::info!(
537                            container = %id,
538                            overlay_ip = %ip,
539                            "attached container to overlay network"
540                        );
541
542                        // Register DNS for service discovery
543                        if let Some(dns) = &self.dns_server {
544                            // Register the BARE compose service name ({service}) so
545                            // sibling containers can resolve each other by the name
546                            // docker-compose uses (e.g. `postgres`), not just the
547                            // FQDN. This is what `FORGEJO__database__HOST=postgres`
548                            // and every other compose service reference depends on.
549                            // Multiple replicas upsert the same name; the in-memory
550                            // authority keeps the most recent A record (round-robin
551                            // is not required for the single-replica compose case).
552                            match dns.add_record(&self.service_name, ip).await {
553                                Ok(()) => tracing::debug!(
554                                    hostname = %self.service_name,
555                                    ip = %ip,
556                                    "registered bare service-name DNS (compose discovery)"
557                                ),
558                                Err(e) => tracing::warn!(
559                                    hostname = %self.service_name,
560                                    error = %e,
561                                    "failed to register bare service-name DNS"
562                                ),
563                            }
564
565                            // Register service-level hostname: {service}.service.local
566                            let service_hostname = format!("{}.service.local", self.service_name);
567
568                            // Register replica-specific hostname: {replica}.{service}.service.local
569                            let replica_hostname =
570                                format!("{}.{}.service.local", id.replica, self.service_name);
571
572                            match dns.add_record(&service_hostname, ip).await {
573                                Ok(()) => tracing::debug!(
574                                    hostname = %service_hostname,
575                                    ip = %ip,
576                                    "registered DNS for service"
577                                ),
578                                Err(e) => tracing::warn!(
579                                    hostname = %service_hostname,
580                                    error = %e,
581                                    "failed to register DNS for service"
582                                ),
583                            }
584
585                            // Also register replica-specific entry
586                            if let Err(e) = dns.add_record(&replica_hostname, ip).await {
587                                tracing::warn!(
588                                    hostname = %replica_hostname,
589                                    error = %e,
590                                    "failed to register replica DNS"
591                                );
592                            } else {
593                                tracing::debug!(
594                                    hostname = %replica_hostname,
595                                    ip = %ip,
596                                    "registered DNS for replica"
597                                );
598                            }
599
600                            // Per-role DNS: register `{role}.{service}.service.local` when
601                            // this container belongs to a non-default replica group. Lets
602                            // intra-cluster clients reach a specific group (e.g.
603                            // `read.db.service.local` for the read replicas of a postgres
604                            // service with primary+read replica groups).
605                            if id.role != "default" {
606                                let role_hostname =
607                                    format!("{}.{}.service.local", id.role, self.service_name);
608                                match dns.add_record(&role_hostname, ip).await {
609                                    Ok(()) => tracing::debug!(
610                                        hostname = %role_hostname,
611                                        ip = %ip,
612                                        role = %id.role,
613                                        "registered DNS for replica group role"
614                                    ),
615                                    Err(e) => tracing::warn!(
616                                        hostname = %role_hostname,
617                                        error = %e,
618                                        "failed to register role DNS"
619                                    ),
620                                }
621                            }
622                        }
623
624                        Some(ip)
625                    } else {
626                        None
627                    }
628                } else {
629                    None
630                };
631
632                // If overlay failed, try the container runtime's own IP as fallback
633                let effective_ip = if overlay_ip.is_none() {
634                    match self.runtime.get_container_ip(&id).await {
635                        Ok(Some(ip)) => {
636                            tracing::info!(
637                                container = %id,
638                                ip = %ip,
639                                "using runtime container IP for proxy (overlay unavailable)"
640                            );
641                            Some(ip)
642                        }
643                        Ok(None) => {
644                            tracing::warn!(
645                                container = %id,
646                                "no container IP available from runtime, proxy routing will be unavailable"
647                            );
648                            None
649                        }
650                        Err(e) => {
651                            tracing::warn!(
652                                container = %id,
653                                error = %e,
654                                "failed to get container IP from runtime"
655                            );
656                            None
657                        }
658                    }
659                } else {
660                    overlay_ip
661                };
662
663                tracing::info!(
664                    container = %id,
665                    service = %self.service_name,
666                    overlay_ip = ?overlay_ip,
667                    effective_ip = ?effective_ip,
668                    "Container IP resolution complete"
669                );
670
671                // Query port override from the runtime.
672                // On macOS sandbox, each container is assigned a unique port since
673                // all processes share the host network (no network namespaces).
674                // The runtime passes the port to the process via the PORT env var.
675                let port_override = match self.runtime.get_container_port_override(&id).await {
676                    Ok(Some(port)) => {
677                        tracing::info!(
678                            container = %id,
679                            port = port,
680                            "runtime assigned dynamic port override for this container"
681                        );
682                        Some(port)
683                    }
684                    Ok(None) => None,
685                    Err(e) => {
686                        tracing::warn!(
687                            container = %id,
688                            error = %e,
689                            "failed to query port override from runtime, using spec port"
690                        );
691                        None
692                    }
693                };
694
695                // Start health monitoring and store handle (no lock needed during start)
696                let health_monitor_handle = {
697                    let mut check = self.spec.health.check.clone();
698
699                    // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
700                    // port the container is listening on. With mac-sandbox, each
701                    // replica gets a unique assigned port via port_override.
702                    if let HealthCheck::Tcp { ref mut port } = check {
703                        if *port == 0 {
704                            *port = port_override.unwrap_or_else(|| {
705                                self.spec
706                                    .endpoints
707                                    .iter()
708                                    .find(|ep| {
709                                        matches!(
710                                            ep.protocol,
711                                            Protocol::Http | Protocol::Https | Protocol::Websocket
712                                        )
713                                    })
714                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
715                            });
716                        }
717                    }
718
719                    let start_grace = self
720                        .spec
721                        .health
722                        .start_grace
723                        .unwrap_or(Duration::from_secs(5));
724                    let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
725                    let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
726                    let retries = self.spec.health.retries;
727
728                    let checker = HealthChecker::new(check, effective_ip);
729                    let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
730                        .with_start_grace(start_grace)
731                        .with_check_timeout(check_timeout);
732
733                    // Build the optional proxy backend handle. This is only present
734                    // when both a proxy manager AND a reachable overlay IP exist; in
735                    // degraded-overlay / no-proxy deployments it stays None and the
736                    // callback below skips all proxy work while STILL bridging health
737                    // state back into ServiceManager.
738                    let proxy_backend: Option<(Arc<ProxyManager>, SocketAddr)> =
739                        if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
740                            let proxy = Arc::clone(proxy);
741                            // Get the container's target port, using the runtime override if
742                            // present. On macOS sandbox, port_override gives each replica a
743                            // unique port so the proxy can distinguish backends sharing
744                            // 127.0.0.1.
745                            let port = port_override.unwrap_or_else(|| {
746                                self.spec
747                                    .endpoints
748                                    .iter()
749                                    .find(|ep| {
750                                        matches!(
751                                            ep.protocol,
752                                            Protocol::Http | Protocol::Https | Protocol::Websocket
753                                        )
754                                    })
755                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
756                            });
757
758                            let backend_addr = SocketAddr::new(ip, port);
759
760                            // Register backend with load balancer so proxy can route to it.
761                            // This must happen before the health callback is created, because
762                            // update_backend_health only updates *existing* backends.
763                            proxy.add_backend(&self.service_name, backend_addr).await;
764
765                            // Publish this container's exposed ports on the node
766                            // loopback (`127.0.0.1:<endpoint.port>`) so a consumer
767                            // sharing the node loopback can reach the service at
768                            // `localhost:<port>`. Gated on the spec's policy
769                            // (`Auto` publishes only for single-member services).
770                            // Uses the SAME runtime-resolved `ip`/`port_override`
771                            // as the backend above: on macOS each replica shares
772                            // 127.0.0.1 with a unique override; on Linux/VM the
773                            // overlay IP carries the declared target port.
774                            if self.spec.publish_to_node_loopback() {
775                                proxy
776                                    .publish_loopback_for_container(
777                                        &self.service_name,
778                                        &self.spec,
779                                        ip,
780                                        port_override,
781                                    )
782                                    .await;
783                            }
784
785                            Some((proxy, backend_addr))
786                        } else {
787                            None
788                        };
789
790                    // The health bridge is ALWAYS attached, independent of proxy/IP
791                    // availability. stabilization::wait_for_stabilization only treats a
792                    // service as ready when health_states[name] == Healthy, so this write
793                    // must happen even when the overlay is degraded and no proxy backend
794                    // exists — otherwise the service stays healthy=false forever and
795                    // stabilization times out.
796                    let health_states_opt = self.health_states.clone();
797                    let svc_name_for_states = self.service_name.clone();
798                    let svc_name_for_proxy = self.service_name.clone();
799                    let svc_name_for_log = self.service_name.clone();
800
801                    let health_callback: HealthCallback =
802                        Arc::new(move |container_id: ContainerId, is_healthy: bool| {
803                            tracing::info!(
804                                container = %container_id,
805                                service = %svc_name_for_log,
806                                healthy = is_healthy,
807                                has_proxy_backend = proxy_backend.is_some(),
808                                "health status changed"
809                            );
810
811                            // Always bridge health state back to ServiceManager's
812                            // health_states map (unconditional — no proxy/IP required).
813                            if let Some(ref health_states) = health_states_opt {
814                                let states = Arc::clone(health_states);
815                                let svc = svc_name_for_states.clone();
816                                tokio::spawn(async move {
817                                    let state = if is_healthy {
818                                        HealthState::Healthy
819                                    } else {
820                                        HealthState::Unhealthy {
821                                            failures: 0,
822                                            reason: "health check failed".into(),
823                                        }
824                                    };
825                                    states.write().await.insert(svc, state);
826                                });
827                            }
828
829                            // Update proxy backend health only when a proxy backend was
830                            // registered (proxy manager + reachable overlay IP present).
831                            if let Some((proxy, backend_addr)) = proxy_backend.clone() {
832                                let svc = svc_name_for_proxy.clone();
833                                tokio::spawn(async move {
834                                    proxy
835                                        .update_backend_health(&svc, backend_addr, is_healthy)
836                                        .await;
837                                });
838                            }
839                        });
840
841                    monitor = monitor.with_callback(health_callback);
842
843                    monitor.start()
844                };
845
846                // Update state (short write lock)
847                {
848                    let mut containers = self.containers.write().await;
849                    containers.insert(
850                        id.clone(),
851                        Container {
852                            id: id.clone(),
853                            image: self.spec.image.name.to_string(),
854                            state: ContainerState::Running,
855                            pid: None,
856                            task: None,
857                            overlay_ip: effective_ip,
858                            health_monitor: Some(health_monitor_handle),
859                            port_override,
860                        },
861                    );
862                } // Lock released here
863            }
864        }
865
866        // Phase 3: Scale down - remove containers (short write lock per removal)
867        //
868        // Containers were created with `with_role_and_node(role, local_node_id)`
869        // on scale-up, so we must reconstruct the same identity on scale-down
870        // — the role is derived from `replica_groups` via `role_for_replica`
871        // and the node id is the local cluster node. Mismatched ids would miss
872        // the live entry in `self.containers` and leak the container.
873        if replicas < current_replicas {
874            for i in replicas..current_replicas {
875                let replica_idx = i + 1;
876                let id = ContainerId::with_role_and_node(
877                    self.service_name.clone(),
878                    replica_idx,
879                    self.role_for_replica(replica_idx),
880                    local_node_id,
881                );
882
883                // Remove from state first and get the container to abort health monitor (short write lock)
884                let removed_container = {
885                    let mut containers = self.containers.write().await;
886                    containers.remove(&id)
887                }; // Lock released here
888
889                // Then perform cleanup (no lock held - I/O operations)
890                if let Some(container) = removed_container {
891                    // Abort the health monitor task if it exists
892                    if let Some(handle) = container.health_monitor {
893                        handle.abort();
894                    }
895
896                    // Unpublish this container's node-loopback ports (mirror of
897                    // the publish in the start path above). Recomputes the same
898                    // backend from the container's stored runtime-resolved IP and
899                    // port override; the last replica's removal frees the
900                    // loopback listener. Gated identically to publish.
901                    if self.spec.publish_to_node_loopback() {
902                        if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip)
903                        {
904                            proxy
905                                .unpublish_loopback_for_container(
906                                    &self.spec,
907                                    ip,
908                                    container.port_override,
909                                )
910                                .await;
911                        }
912                    }
913
914                    // Remove DNS records for this container
915                    if let Some(dns) = &self.dns_server {
916                        // Remove replica-specific DNS entry
917                        let replica_hostname =
918                            format!("{}.{}.service.local", id.replica, self.service_name);
919                        if let Err(e) = dns.remove_record(&replica_hostname).await {
920                            tracing::warn!(
921                                hostname = %replica_hostname,
922                                error = %e,
923                                "failed to remove replica DNS record"
924                            );
925                        } else {
926                            tracing::debug!(
927                                hostname = %replica_hostname,
928                                "removed replica DNS record"
929                            );
930                        }
931
932                        // Remove per-role DNS entry if this was a non-default group.
933                        // Note: this is best-effort and removes the record even if
934                        // other replicas in the same role still need it — the DNS
935                        // server's add/remove API is single-record so we can't keep
936                        // it alive for siblings. P2.3-bis (round-robin per-role)
937                        // can fix this later via a per-role refcount; for now the
938                        // service-level hostname keeps cluster-internal clients
939                        // working even when the role-specific record briefly
940                        // disappears.
941                        if id.role != "default" {
942                            let role_hostname =
943                                format!("{}.{}.service.local", id.role, self.service_name);
944                            if let Err(e) = dns.remove_record(&role_hostname).await {
945                                tracing::warn!(
946                                    hostname = %role_hostname,
947                                    error = %e,
948                                    "failed to remove role DNS record"
949                                );
950                            } else {
951                                tracing::debug!(
952                                    hostname = %role_hostname,
953                                    "removed role DNS record"
954                                );
955                            }
956                        }
957
958                        // Note: We don't remove the service-level hostname here because
959                        // other replicas may still be using it. The service-level record
960                        // should be cleaned up when the entire service is removed.
961                    }
962
963                    // Detach from overlay network if manager available.
964                    //
965                    // Done BEFORE stop_container because:
966                    //   - The container init process must still be in
967                    //     /proc to look up its PID via `get_container_pid`.
968                    //   - `OverlayManager::detach_container` deletes host-side
969                    //     veth interfaces by name (`veth-<pid>-*`) and
970                    //     releases the allocated overlay IPs back to the
971                    //     per-node slice. Without this the IPs leak across
972                    //     container churn and the slice exhausts.
973                    //
974                    // Best-effort: failures are logged but never abort the
975                    // scale-down. The periodic orphan sweep
976                    // (`start_periodic_orphan_sweep`) catches anything we
977                    // missed.
978                    if let Some(overlay) = &self.overlay_manager {
979                        // VM guests have no host veth/PID — release the overlayd
980                        // allocation (IP + registered mesh peer) by container id
981                        // instead of by PID.
982                        if self.runtime.overlay_attach_kind()
983                            == crate::runtime::OverlayAttachKind::InGuestVsock
984                        {
985                            let overlay_guard = overlay.read().await;
986                            if let Err(e) =
987                                overlay_guard.detach_container_guest(&id.to_string()).await
988                            {
989                                tracing::warn!(
990                                    container = %id,
991                                    error = %e,
992                                    "overlay detach_container_guest failed; relying on orphan sweep"
993                                );
994                            }
995                        } else {
996                            match self.runtime.get_container_pid(&id).await {
997                                Ok(Some(pid)) => {
998                                    let overlay_guard = overlay.read().await;
999                                    if let Err(e) = overlay_guard.detach_container(pid).await {
1000                                        tracing::warn!(
1001                                            container = %id,
1002                                            pid,
1003                                            error = %e,
1004                                            "overlay detach_container failed; relying on orphan sweep"
1005                                        );
1006                                    }
1007                                }
1008                                Ok(None) => {
1009                                    tracing::debug!(
1010                                        container = %id,
1011                                        "no PID available for overlay detach (already exited or non-Linux runtime)"
1012                                    );
1013                                }
1014                                Err(e) => {
1015                                    tracing::warn!(
1016                                        container = %id,
1017                                        error = %e,
1018                                        "failed to query container PID for overlay detach"
1019                                    );
1020                                }
1021                            }
1022                        }
1023                    }
1024
1025                    // Stop container
1026                    self.runtime
1027                        .stop_container(&id, Duration::from_secs(30))
1028                        .await?;
1029
1030                    // Sync volumes to S3 before removal (no-op if not configured)
1031                    if let Err(e) = self.runtime.sync_container_volumes(&id).await {
1032                        tracing::warn!(
1033                            container = %id,
1034                            error = %e,
1035                            "failed to sync volumes before removal"
1036                        );
1037                    }
1038
1039                    // Remove container
1040                    self.runtime.remove_container(&id).await?;
1041                }
1042            }
1043        }
1044
1045        Ok(())
1046    }
1047
1048    /// Get current number of replicas
1049    pub async fn replica_count(&self) -> usize {
1050        self.containers.read().await.len()
1051    }
1052
1053    /// Get all container IDs
1054    pub async fn container_ids(&self) -> Vec<ContainerId> {
1055        self.containers.read().await.keys().cloned().collect()
1056    }
1057
1058    /// Get per-container info (id, image, state, pid, overlay IP) for every
1059    /// live container in this instance.
1060    ///
1061    /// Surfaces the REAL image reference each container was created from and its
1062    /// REAL lifecycle state (lowercased via [`ContainerState::as_str`]) so the
1063    /// API/`ps` no longer reports a hardcoded `"running"` with no image.
1064    pub async fn container_infos(&self) -> Vec<ContainerInfo> {
1065        self.containers
1066            .read()
1067            .await
1068            .values()
1069            .map(|c| ContainerInfo {
1070                id: c.id.clone(),
1071                image: c.image.clone(),
1072                state: c.state.as_str().to_string(),
1073                pid: c.pid,
1074                overlay_ip: c.overlay_ip.map(|ip| ip.to_string()),
1075            })
1076            .collect()
1077    }
1078
1079    /// Get read access to the containers map
1080    ///
1081    /// This allows callers to access container overlay IPs and other metadata
1082    /// without copying the entire map.
1083    pub fn containers(
1084        &self,
1085    ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
1086        &self.containers
1087    }
1088
1089    /// Check if this service instance has an overlay manager configured
1090    pub fn has_overlay_manager(&self) -> bool {
1091        self.overlay_manager.is_some()
1092    }
1093
1094    /// Check if this service instance has a proxy manager configured
1095    pub fn has_proxy_manager(&self) -> bool {
1096        self.proxy_manager.is_some()
1097    }
1098
1099    /// Get the proxy manager for this instance, if configured.
1100    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1101        self.proxy_manager.as_ref()
1102    }
1103
1104    /// Check if this service instance has a DNS server configured
1105    pub fn has_dns_server(&self) -> bool {
1106        self.dns_server.is_some()
1107    }
1108}
1109
1110/// Per-container summary surfaced to callers (API / `ps`).
1111///
1112/// Carries the REAL image reference and lifecycle state of a single live
1113/// container, replacing the previous id-only view that forced the API to
1114/// fabricate a hardcoded `"running"` state with no image.
1115#[derive(Debug, Clone)]
1116pub struct ContainerInfo {
1117    /// Container identity.
1118    pub id: ContainerId,
1119    /// Image reference the container was created from (canonical form).
1120    pub image: String,
1121    /// Lowercased lifecycle state (e.g. `"running"`, `"exited"`).
1122    pub state: String,
1123    /// Process ID, when the container is running.
1124    pub pid: Option<u32>,
1125    /// Overlay IP rendered as a string, when assigned.
1126    pub overlay_ip: Option<String>,
1127}
1128
1129/// A live deployment container enriched for Docker-compat `ps` rows and for
1130/// name resolution. Produced by [`ServiceManager::list_container_views`].
1131#[derive(Debug, Clone)]
1132pub struct DeploymentContainerView {
1133    /// Deployment (compose project) name, when known.
1134    pub deployment: Option<String>,
1135    /// Service name within the deployment.
1136    pub service: String,
1137    /// Concrete container identity.
1138    pub container_id: ContainerId,
1139    /// Compose `container_name:` (the user-facing Docker name), when set.
1140    pub container_name: Option<String>,
1141    /// Image reference the container was created from.
1142    pub image: String,
1143    /// Lowercased lifecycle state (e.g. `"running"`).
1144    pub state: String,
1145    /// Process id when running.
1146    pub pid: Option<u32>,
1147    /// The service's published port mappings.
1148    pub ports: Vec<zlayer_spec::PortMapping>,
1149}
1150
1151/// Service manager for multiple services
1152pub struct ServiceManager {
1153    runtime: Arc<dyn Runtime + Send + Sync>,
1154    services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
1155    scale_semaphore: Arc<Semaphore>,
1156    /// Overlay network manager for container networking
1157    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1158    /// Stream registry for L4 proxy route registration (TCP/UDP)
1159    stream_registry: Option<Arc<StreamRegistry>>,
1160    /// Proxy manager for health-aware load balancing (hyper-based proxy)
1161    proxy_manager: Option<Arc<ProxyManager>>,
1162    /// DNS server for service discovery
1163    dns_server: Option<Arc<DnsServer>>,
1164    /// Container-injectable overlay resolver IP. When set, new service
1165    /// instances inject `<ip>` into their `spec.dns` so containers resolve
1166    /// through the overlay DNS server (bound on `<ip>:53`) rather than the
1167    /// hijacked host resolv.conf.
1168    container_dns: Option<IpAddr>,
1169    /// Deployment name (used for generating hostnames)
1170    deployment_name: Option<String>,
1171    /// Health states for dependency condition checking
1172    health_states: Arc<RwLock<HashMap<String, HealthState>>>,
1173    /// Job executor for run-to-completion workloads
1174    job_executor: Option<Arc<JobExecutor>>,
1175    /// Cron scheduler for time-based job triggers
1176    cron_scheduler: Option<Arc<CronScheduler>>,
1177    /// Container supervisor for crash/panic policy enforcement
1178    container_supervisor: Option<Arc<ContainerSupervisor>>,
1179    /// Cluster membership + dispatch handle. When `None`, scale operations
1180    /// run purely local (single-node mode). When `Some`, `scale_service`
1181    /// routes through the cluster (leader dispatches to peers; followers
1182    /// forward to the leader).
1183    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1184}
1185
1186// ---------------------------------------------------------------------------
1187// ServiceManagerBuilder
1188// ---------------------------------------------------------------------------
1189
1190/// Builder for constructing a [`ServiceManager`] with optional subsystems.
1191///
1192/// Prefer using `ServiceManager::builder(runtime)` to start building.
1193///
1194/// # Example
1195///
1196/// ```ignore
1197/// let manager = ServiceManager::builder(runtime)
1198///     .overlay_manager(om)
1199///     .proxy_manager(proxy)
1200///     .deployment_name("prod")
1201///     .build();
1202/// ```
1203pub struct ServiceManagerBuilder {
1204    runtime: Arc<dyn Runtime + Send + Sync>,
1205    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1206    proxy_manager: Option<Arc<ProxyManager>>,
1207    stream_registry: Option<Arc<StreamRegistry>>,
1208    dns_server: Option<Arc<DnsServer>>,
1209    container_dns: Option<IpAddr>,
1210    deployment_name: Option<String>,
1211    job_executor: Option<Arc<JobExecutor>>,
1212    cron_scheduler: Option<Arc<CronScheduler>>,
1213    container_supervisor: Option<Arc<ContainerSupervisor>>,
1214    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1215}
1216
1217impl ServiceManagerBuilder {
1218    /// Create a new builder with the required runtime.
1219    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1220        Self {
1221            runtime,
1222            overlay_manager: None,
1223            proxy_manager: None,
1224            stream_registry: None,
1225            dns_server: None,
1226            container_dns: None,
1227            deployment_name: None,
1228            job_executor: None,
1229            cron_scheduler: None,
1230            container_supervisor: None,
1231            cluster: None,
1232        }
1233    }
1234
1235    /// Set the overlay network manager for container networking.
1236    #[must_use]
1237    pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
1238        self.overlay_manager = Some(om);
1239        self
1240    }
1241
1242    /// Set the proxy manager for health-aware load balancing.
1243    #[must_use]
1244    pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
1245        self.proxy_manager = Some(pm);
1246        self
1247    }
1248
1249    /// Set the stream registry for TCP/UDP L4 proxy route registration.
1250    #[must_use]
1251    pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
1252        self.stream_registry = Some(sr);
1253        self
1254    }
1255
1256    /// Set the DNS server for service discovery.
1257    #[must_use]
1258    pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1259        self.dns_server = Some(dns);
1260        self
1261    }
1262
1263    /// Set the container-injectable overlay resolver IP.
1264    ///
1265    /// The daemon passes the IP it bound the overlay DNS server on at port 53
1266    /// (see `daemon.rs` Phase 4). New service instances inject it into
1267    /// `spec.dns` so containers resolve through the overlay instead of the
1268    /// hijacked host resolv.conf. The port is implicitly 53 (resolv.conf has no
1269    /// port syntax), which is why only the bare IP is threaded here.
1270    #[must_use]
1271    pub fn container_dns(mut self, ip: IpAddr) -> Self {
1272        self.container_dns = Some(ip);
1273        self
1274    }
1275
1276    /// Set the deployment name (used for hostname generation).
1277    #[must_use]
1278    pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
1279        self.deployment_name = Some(name.into());
1280        self
1281    }
1282
1283    /// Set the job executor for run-to-completion workloads.
1284    #[must_use]
1285    pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
1286        self.job_executor = Some(je);
1287        self
1288    }
1289
1290    /// Set the cron scheduler for time-based job triggers.
1291    #[must_use]
1292    pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
1293        self.cron_scheduler = Some(cs);
1294        self
1295    }
1296
1297    /// Set the container supervisor for crash/panic policy enforcement.
1298    #[must_use]
1299    pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
1300        self.container_supervisor = Some(cs);
1301        self
1302    }
1303
1304    /// Set the cluster membership + dispatch handle. When set,
1305    /// [`ServiceManager::scale_service`] will route through the cluster
1306    /// (leader dispatches to peers; followers forward to the leader).
1307    /// When unset (the default), scale operations remain local-only.
1308    #[must_use]
1309    pub fn cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1310        self.cluster = Some(cluster);
1311        self
1312    }
1313
1314    /// Consume the builder and produce a fully-wired [`ServiceManager`].
1315    ///
1316    /// Logs warnings for missing recommended subsystems (proxy,
1317    /// `stream_registry`, `container_supervisor`, `deployment_name`).
1318    pub fn build(self) -> ServiceManager {
1319        if self.proxy_manager.is_none() {
1320            tracing::warn!("ServiceManager built without proxy_manager");
1321        }
1322        if self.stream_registry.is_none() {
1323            tracing::warn!("ServiceManager built without stream_registry");
1324        }
1325        if self.container_supervisor.is_none() {
1326            tracing::warn!("ServiceManager built without container_supervisor");
1327        }
1328        if self.deployment_name.is_none() {
1329            tracing::warn!("ServiceManager built without deployment_name");
1330        }
1331
1332        ServiceManager {
1333            runtime: self.runtime,
1334            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1335            scale_semaphore: Arc::new(Semaphore::new(10)),
1336            overlay_manager: self.overlay_manager,
1337            stream_registry: self.stream_registry,
1338            proxy_manager: self.proxy_manager,
1339            dns_server: self.dns_server,
1340            container_dns: self.container_dns,
1341            deployment_name: self.deployment_name,
1342            health_states: Arc::new(RwLock::new(HashMap::new())),
1343            job_executor: self.job_executor,
1344            cron_scheduler: self.cron_scheduler,
1345            container_supervisor: self.container_supervisor,
1346            cluster: self.cluster,
1347        }
1348    }
1349}
1350
1351impl ServiceManager {
1352    /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
1353    ///
1354    /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
1355    ///
1356    /// # Example
1357    ///
1358    /// ```ignore
1359    /// let manager = ServiceManager::builder(runtime)
1360    ///     .overlay_manager(om)
1361    ///     .proxy_manager(proxy)
1362    ///     .build();
1363    /// ```
1364    pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
1365        ServiceManagerBuilder::new(runtime)
1366    }
1367
1368    /// Create a new service manager
1369    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1370    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1371        Self {
1372            runtime,
1373            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1374            scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
1375            overlay_manager: None,
1376            stream_registry: None,
1377            proxy_manager: None,
1378            dns_server: None,
1379            container_dns: None,
1380            deployment_name: None,
1381            health_states: Arc::new(RwLock::new(HashMap::new())),
1382            job_executor: None,
1383            cron_scheduler: None,
1384            container_supervisor: None,
1385            cluster: None,
1386        }
1387    }
1388
1389    /// Create a service manager with overlay network support
1390    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1391    pub fn with_overlay(
1392        runtime: Arc<dyn Runtime + Send + Sync>,
1393        overlay_manager: Arc<RwLock<OverlayManager>>,
1394    ) -> Self {
1395        Self {
1396            runtime,
1397            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1398            scale_semaphore: Arc::new(Semaphore::new(10)),
1399            overlay_manager: Some(overlay_manager),
1400            stream_registry: None,
1401            proxy_manager: None,
1402            dns_server: None,
1403            container_dns: None,
1404            deployment_name: None,
1405            health_states: Arc::new(RwLock::new(HashMap::new())),
1406            job_executor: None,
1407            cron_scheduler: None,
1408            container_supervisor: None,
1409            cluster: None,
1410        }
1411    }
1412
1413    /// Create a fully-configured service manager with overlay and proxy support
1414    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1415    pub fn with_full_config(
1416        runtime: Arc<dyn Runtime + Send + Sync>,
1417        overlay_manager: Arc<RwLock<OverlayManager>>,
1418        deployment_name: String,
1419    ) -> Self {
1420        Self {
1421            runtime,
1422            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1423            scale_semaphore: Arc::new(Semaphore::new(10)),
1424            overlay_manager: Some(overlay_manager),
1425            stream_registry: None,
1426            proxy_manager: None,
1427            dns_server: None,
1428            container_dns: None,
1429            deployment_name: Some(deployment_name),
1430            health_states: Arc::new(RwLock::new(HashMap::new())),
1431            job_executor: None,
1432            cron_scheduler: None,
1433            container_supervisor: None,
1434            cluster: None,
1435        }
1436    }
1437
1438    /// Get the health states map for external monitoring
1439    pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
1440        Arc::clone(&self.health_states)
1441    }
1442
1443    /// Update health state for a service
1444    pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1445        let mut states = self.health_states.write().await;
1446        states.insert(service_name.to_string(), state);
1447    }
1448
1449    /// Set the deployment name (used for generating hostnames)
1450    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1451    pub fn set_deployment_name(&mut self, name: String) {
1452        self.deployment_name = Some(name);
1453    }
1454
1455    /// Set the stream registry for L4 proxy integration (TCP/UDP)
1456    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1457    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1458        self.stream_registry = Some(registry);
1459    }
1460
1461    /// Builder pattern: add stream registry for L4 proxy integration
1462    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1463    #[must_use]
1464    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1465        self.stream_registry = Some(registry);
1466        self
1467    }
1468
1469    /// Get the stream registry (if configured)
1470    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1471        self.stream_registry.as_ref()
1472    }
1473
1474    /// Set the overlay manager for container networking
1475    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1476    pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1477        self.overlay_manager = Some(manager);
1478    }
1479
1480    /// Set the proxy manager for health-aware load balancing
1481    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1482    pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1483        self.proxy_manager = Some(proxy);
1484    }
1485
1486    /// Builder pattern: add proxy manager for health-aware load balancing
1487    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1488    #[must_use]
1489    pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1490        self.proxy_manager = Some(proxy);
1491        self
1492    }
1493
1494    /// Get the proxy manager (if configured)
1495    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1496        self.proxy_manager.as_ref()
1497    }
1498
1499    /// Set the DNS server for service discovery
1500    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1501    pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1502        self.dns_server = Some(dns);
1503    }
1504
1505    /// Builder pattern: add DNS server for service discovery
1506    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1507    #[must_use]
1508    pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1509        self.dns_server = Some(dns);
1510        self
1511    }
1512
1513    /// Get the DNS server (if configured)
1514    pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1515        self.dns_server.as_ref()
1516    }
1517
1518    /// Set the job executor for run-to-completion workloads
1519    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1520    pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1521        self.job_executor = Some(executor);
1522    }
1523
1524    /// Set the cron scheduler for time-based job triggers
1525    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1526    pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1527        self.cron_scheduler = Some(scheduler);
1528    }
1529
1530    /// Builder pattern: add job executor
1531    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1532    #[must_use]
1533    pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1534        self.job_executor = Some(executor);
1535        self
1536    }
1537
1538    /// Builder pattern: add cron scheduler
1539    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1540    #[must_use]
1541    pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1542        self.cron_scheduler = Some(scheduler);
1543        self
1544    }
1545
1546    /// Set the cluster handle for cluster-aware scaling.
1547    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1548    pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
1549        self.cluster = Some(cluster);
1550    }
1551
1552    /// Builder pattern: add a cluster handle for cluster-aware scaling.
1553    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1554    #[must_use]
1555    pub fn with_cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1556        self.cluster = Some(cluster);
1557        self
1558    }
1559
1560    /// Get the cluster handle (if configured).
1561    pub fn cluster(&self) -> Option<&Arc<dyn zlayer_scheduler::cluster::Cluster>> {
1562        self.cluster.as_ref()
1563    }
1564
1565    /// Get the job executor (if configured)
1566    pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1567        self.job_executor.as_ref()
1568    }
1569
1570    /// Get the cron scheduler (if configured)
1571    pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1572        self.cron_scheduler.as_ref()
1573    }
1574
1575    /// Set the container supervisor for crash/panic policy enforcement
1576    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1577    pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1578        self.container_supervisor = Some(supervisor);
1579    }
1580
1581    /// Builder pattern: add container supervisor
1582    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1583    #[must_use]
1584    pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1585        self.container_supervisor = Some(supervisor);
1586        self
1587    }
1588
1589    /// Get the container supervisor (if configured)
1590    pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1591        self.container_supervisor.as_ref()
1592    }
1593
1594    /// Start the container supervisor background task
1595    ///
1596    /// This spawns a background task that monitors containers for crashes
1597    /// and enforces the `on_panic` error policy.
1598    ///
1599    /// # Errors
1600    /// Returns an error if no container supervisor is configured.
1601    ///
1602    /// # Returns
1603    /// A `JoinHandle` for the supervisor task.
1604    pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1605        let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1606            AgentError::Configuration("Container supervisor not configured".to_string())
1607        })?;
1608
1609        let supervisor = Arc::clone(supervisor);
1610        Ok(tokio::spawn(async move {
1611            supervisor.run_loop().await;
1612        }))
1613    }
1614
1615    /// Shutdown the container supervisor
1616    pub fn shutdown_container_supervisor(&self) {
1617        if let Some(supervisor) = &self.container_supervisor {
1618            supervisor.shutdown();
1619        }
1620    }
1621
1622    /// Get the supervised state of a container
1623    pub async fn get_container_supervised_state(
1624        &self,
1625        container_id: &ContainerId,
1626    ) -> Option<SupervisedState> {
1627        if let Some(supervisor) = &self.container_supervisor {
1628            supervisor.get_state(container_id).await
1629        } else {
1630            None
1631        }
1632    }
1633
1634    /// Get supervisor events receiver
1635    ///
1636    /// Note: This can only be called once; the receiver is moved to the caller.
1637    pub async fn take_supervisor_events(
1638        &self,
1639    ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1640        if let Some(supervisor) = &self.container_supervisor {
1641            supervisor.take_event_receiver().await
1642        } else {
1643            None
1644        }
1645    }
1646
1647    // ==================== Dependency Orchestration ====================
1648
1649    /// Deploy multiple services respecting their dependency order
1650    ///
1651    /// This method:
1652    /// 1. Builds a dependency graph from the services
1653    /// 2. Validates no cycles exist
1654    /// 3. Computes topological order (services with no deps first)
1655    /// 4. For each service in order, waits for dependencies then starts the service
1656    ///
1657    /// # Arguments
1658    /// * `services` - Map of service name to service specification
1659    ///
1660    /// # Errors
1661    /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1662    /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1663    pub async fn deploy_with_dependencies(
1664        &self,
1665        services: HashMap<String, ServiceSpec>,
1666    ) -> Result<()> {
1667        if services.is_empty() {
1668            return Ok(());
1669        }
1670
1671        // Build dependency graph
1672        let graph = DependencyGraph::build(&services)?;
1673
1674        tracing::info!(
1675            service_count = services.len(),
1676            "Starting deployment with dependency ordering"
1677        );
1678
1679        // Get startup order
1680        let order = graph.startup_order();
1681        tracing::debug!(order = ?order, "Computed startup order");
1682
1683        // Start services in dependency order
1684        for service_name in order {
1685            let service_spec = services
1686                .get(service_name)
1687                .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1688
1689            // Wait for dependencies first
1690            if !service_spec.depends.is_empty() {
1691                tracing::info!(
1692                    service = %service_name,
1693                    dependency_count = service_spec.depends.len(),
1694                    "Waiting for dependencies"
1695                );
1696                self.wait_for_dependencies(service_name, &service_spec.depends)
1697                    .await?;
1698            }
1699
1700            // Register and start service
1701            tracing::info!(service = %service_name, "Starting service");
1702            Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
1703
1704            // Get the desired replica count from scale config
1705            let replicas = match &service_spec.scale {
1706                zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1707                zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1708                zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1709            };
1710            self.scale_service(service_name, replicas).await?;
1711
1712            // Mark service as started in health states (Unknown until health check runs)
1713            self.update_health_state(service_name, HealthState::Unknown)
1714                .await;
1715
1716            tracing::info!(
1717                service = %service_name,
1718                replicas = replicas,
1719                "Service started"
1720            );
1721        }
1722
1723        tracing::info!(service_count = services.len(), "Deployment complete");
1724
1725        Ok(())
1726    }
1727
1728    /// Wait for all dependencies of a service to be satisfied
1729    ///
1730    /// # Arguments
1731    /// * `service` - Name of the service waiting for dependencies
1732    /// * `deps` - Slice of dependency specifications
1733    ///
1734    /// # Errors
1735    /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1736    async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1737        let condition_checker = DependencyConditionChecker::new(
1738            Arc::clone(&self.runtime),
1739            Arc::clone(&self.health_states),
1740            None,
1741        );
1742
1743        let waiter = DependencyWaiter::new(condition_checker);
1744        let results = waiter.wait_for_all(deps).await?;
1745
1746        // Check results for failures
1747        for result in results {
1748            match result {
1749                WaitResult::TimedOutFail {
1750                    service: dep_service,
1751                    condition,
1752                    timeout,
1753                } => {
1754                    return Err(AgentError::DependencyTimeout {
1755                        service: service.to_string(),
1756                        dependency: dep_service,
1757                        condition: format!("{condition:?}"),
1758                        timeout,
1759                    });
1760                }
1761                WaitResult::TimedOutWarn {
1762                    service: dep_service,
1763                    condition,
1764                } => {
1765                    tracing::warn!(
1766                        service = %service,
1767                        dependency = %dep_service,
1768                        condition = ?condition,
1769                        "Dependency timed out but continuing"
1770                    );
1771                }
1772                WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1773                    // Continue silently
1774                }
1775            }
1776        }
1777
1778        Ok(())
1779    }
1780
1781    /// Check if all dependencies for a service are currently satisfied
1782    ///
1783    /// This is a one-shot check (no waiting). Useful for pre-flight validation.
1784    ///
1785    /// # Errors
1786    /// Returns an error if a dependency check fails unexpectedly.
1787    pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1788        let condition_checker = DependencyConditionChecker::new(
1789            Arc::clone(&self.runtime),
1790            Arc::clone(&self.health_states),
1791            None,
1792        );
1793
1794        for dep in deps {
1795            if !condition_checker.check(dep).await? {
1796                return Ok(false);
1797            }
1798        }
1799
1800        Ok(true)
1801    }
1802
1803    /// Add or update a workload (service, job, or cron)
1804    ///
1805    /// This method handles different resource types appropriately:
1806    /// - **Service**: Traditional long-running containers with scaling and health checks
1807    /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
1808    /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
1809    ///
1810    /// # Errors
1811    /// Returns an error if service creation, scaling, or cron registration fails.
1812    #[allow(clippy::too_many_lines)]
1813    pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1814        match spec.rtype {
1815            ResourceType::Service => {
1816                // Long-running service: create/update instance
1817                let mut services = self.services.write().await;
1818
1819                if let Some(instance) = services.get_mut(&name) {
1820                    // Update existing service. We need to:
1821                    //   1. Update the in-memory spec (so future scale-ups use the new image).
1822                    //   2. Recreate the local replicas when the image actually changed —
1823                    //      either a different image *reference* (e.g. tag bump
1824                    //      1.28 -> 1.29), which is a new image regardless of pull
1825                    //      policy, or, under Always/Newer, observed *digest* drift on
1826                    //      the same reference.
1827                    // The recreate is LOCAL (`scale_service_local`): `upsert_service`
1828                    // runs on whichever node owns the replicas (the leader for its
1829                    // own share, each worker via the `/internal/scale` handler). Using
1830                    // the cluster-routed `scale_service` here would bounce a worker's
1831                    // recreate back to the leader and re-enter dispatch. Cluster-wide
1832                    // distribution is the caller's job (orchestrate_deployment + the
1833                    // scale dispatch that carries this spec to every node).
1834                    let image_changed = instance.spec.image.name != spec.image.name;
1835                    instance.spec = spec.clone();
1836                    if let Some(dns) = &self.dns_server {
1837                        instance.set_dns_server(Arc::clone(dns));
1838                    }
1839                    // Re-apply overlay resolver injection: the spec was just
1840                    // replaced wholesale, so any prior injection on the old
1841                    // spec is gone. Honors host_network / user-supplied dns.
1842                    if let Some(ip) = self.container_dns {
1843                        instance.set_container_dns(ip);
1844                    }
1845
1846                    let effective = spec.image.pull_policy;
1847                    let old_digest = instance.last_pulled_digest().await;
1848                    let current_replicas =
1849                        u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
1850                    drop(services); // Release write lock before pull / scale (which take their own locks).
1851
1852                    // A changed image reference always recreates. Same-reference
1853                    // refreshes are governed by pull policy + digest drift.
1854                    let mut should_recreate = image_changed;
1855                    let mut new_digest = old_digest.clone();
1856
1857                    match effective {
1858                        PullPolicy::Never | PullPolicy::IfNotPresent => {
1859                            // No proactive pull. If the reference changed we still
1860                            // recreate below; the scale-up path pulls the (absent) new
1861                            // image per IfNotPresent. A same-reference redeploy under
1862                            // these policies is a genuine no-op.
1863                            tracing::debug!(
1864                                service = %name,
1865                                policy = ?effective,
1866                                image_changed,
1867                                "re-deploy under no-refresh pull policy"
1868                            );
1869                        }
1870                        PullPolicy::Always | PullPolicy::Newer => {
1871                            // Pull (this updates the cached digest as a side-effect).
1872                            // We need a read guard to keep the instance alive while
1873                            // calling its &self method.
1874                            let services_ro = self.services.read().await;
1875                            new_digest = if let Some(inst) = services_ro.get(&name) {
1876                                inst.pull_and_refresh_digest().await?
1877                            } else {
1878                                // The service vanished between our write-lock release
1879                                // and read-lock acquisition (race with remove_service).
1880                                // Treat this as a no-op; the caller will see the removal.
1881                                tracing::warn!(
1882                                    service = %name,
1883                                    "service removed during upsert; skipping drift recreate"
1884                                );
1885                                return Ok(());
1886                            };
1887                            drop(services_ro);
1888
1889                            // Always forces a recreate. Newer recreates on digest
1890                            // drift. When digests are unknown (runtime doesn't expose
1891                            // them), we can't observe drift safely under Newer, so the
1892                            // reference check above is the only trigger.
1893                            should_recreate = should_recreate
1894                                || match effective {
1895                                    PullPolicy::Always => true,
1896                                    PullPolicy::Newer => match (&old_digest, &new_digest) {
1897                                        (Some(old), Some(new)) => old != new,
1898                                        _ => false,
1899                                    },
1900                                    _ => false,
1901                                };
1902                        }
1903                    }
1904
1905                    if should_recreate && current_replicas > 0 {
1906                        tracing::info!(
1907                            service = %name,
1908                            policy = ?effective,
1909                            image_changed,
1910                            old_digest = ?old_digest,
1911                            new_digest = ?new_digest,
1912                            replicas = current_replicas,
1913                            "image changed; performing local rolling recreate"
1914                        );
1915                        self.scale_service_local(&name, 0).await?;
1916                        self.scale_service_local(&name, current_replicas).await?;
1917                        tracing::info!(
1918                            service = %name,
1919                            new_digest = ?new_digest,
1920                            "service recreated with refreshed image"
1921                        );
1922                    } else {
1923                        tracing::debug!(
1924                            service = %name,
1925                            policy = ?effective,
1926                            old_digest = ?old_digest,
1927                            new_digest = ?new_digest,
1928                            "service up to date; no recreate required"
1929                        );
1930                    }
1931                    return Ok(());
1932                }
1933                // Create new service with proxy manager for health-aware load balancing
1934                let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1935                let mut instance = if let Some(proxy) = &self.proxy_manager {
1936                    ServiceInstance::with_proxy(
1937                        name.clone(),
1938                        spec,
1939                        self.runtime.clone(),
1940                        overlay,
1941                        Arc::clone(proxy),
1942                    )
1943                } else {
1944                    ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1945                };
1946                // Thread the local cluster node id so new `ContainerId`s carry
1947                // owning-node identity. Defaults to `0` in single-node mode.
1948                instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
1949                // Set DNS server if configured
1950                if let Some(dns) = &self.dns_server {
1951                    instance.set_dns_server(Arc::clone(dns));
1952                }
1953                // Inject the overlay resolver into the spec so containers use it
1954                // instead of the hijacked host resolv.conf (no-op for
1955                // host_network / user-supplied dns).
1956                if let Some(ip) = self.container_dns {
1957                    instance.set_container_dns(ip);
1958                }
1959                // Wire shared health states so callbacks bridge back to ServiceManager
1960                instance.set_health_states(Arc::clone(&self.health_states));
1961                // Register HTTP routes via proxy manager
1962                if let Some(proxy) = &self.proxy_manager {
1963                    proxy.add_service(&name, &instance.spec).await;
1964                }
1965                // Register TCP/UDP endpoints in stream registry
1966                if let Some(stream_registry) = &self.stream_registry {
1967                    for endpoint in &instance.spec.endpoints {
1968                        let svc = StreamService::new(
1969                            name.clone(),
1970                            Vec::new(), // No backends yet; added on scale-up
1971                        );
1972                        match endpoint.protocol {
1973                            Protocol::Tcp => {
1974                                stream_registry.register_tcp(endpoint.port, svc);
1975                                tracing::debug!(
1976                                    service = %name,
1977                                    port = endpoint.port,
1978                                    "Registered TCP stream route"
1979                                );
1980                            }
1981                            Protocol::Udp => {
1982                                stream_registry.register_udp(endpoint.port, svc);
1983                                tracing::debug!(
1984                                    service = %name,
1985                                    port = endpoint.port,
1986                                    "Registered UDP stream route"
1987                                );
1988                            }
1989                            _ => {} // HTTP routes handled by proxy manager
1990                        }
1991                    }
1992                }
1993                services.insert(name, instance);
1994            }
1995            ResourceType::Job => {
1996                // Job: Just store the spec for later triggering
1997                // Jobs don't start containers immediately; they're triggered on-demand
1998                if let Some(executor) = &self.job_executor {
1999                    executor.register_job(&name, spec).await;
2000                    tracing::info!(job = %name, "Registered job spec");
2001                } else {
2002                    tracing::warn!(
2003                        job = %name,
2004                        "Job executor not configured, storing as service for reference"
2005                    );
2006                    // Fallback: store as service instance for reference
2007                    let mut services = self.services.write().await;
2008                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
2009                    let mut instance = if let Some(proxy) = &self.proxy_manager {
2010                        ServiceInstance::with_proxy(
2011                            name.clone(),
2012                            spec,
2013                            self.runtime.clone(),
2014                            overlay,
2015                            Arc::clone(proxy),
2016                        )
2017                    } else {
2018                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
2019                    };
2020                    // Thread the local cluster node id (same as the Service
2021                    // branch above) so the fallback-as-service Job entry also
2022                    // carries owning-node identity.
2023                    instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
2024                    // Set DNS server if configured
2025                    if let Some(dns) = &self.dns_server {
2026                        instance.set_dns_server(Arc::clone(dns));
2027                    }
2028                    // Inject the overlay resolver (no-op for host_network /
2029                    // user-supplied dns).
2030                    if let Some(ip) = self.container_dns {
2031                        instance.set_container_dns(ip);
2032                    }
2033                    services.insert(name, instance);
2034                }
2035            }
2036            ResourceType::Cron => {
2037                // Cron: Register with the cron scheduler
2038                if let Some(scheduler) = &self.cron_scheduler {
2039                    scheduler.register(&name, &spec).await?;
2040                    tracing::info!(cron = %name, "Registered cron job with scheduler");
2041                } else {
2042                    return Err(AgentError::Configuration(format!(
2043                        "Cron scheduler not configured for cron job '{name}'"
2044                    )));
2045                }
2046            }
2047        }
2048
2049        Ok(())
2050    }
2051
2052    /// Update backend addresses via `ProxyManager` after scaling, applying
2053    /// per-endpoint `target_role` filtering.
2054    ///
2055    /// For each L7 endpoint of the service, this collects the subset of
2056    /// containers whose `ContainerId.role` matches `endpoint.target_role`
2057    /// (or all containers when `target_role` is `None`) and updates the
2058    /// proxy's backend pool for that specific endpoint via
2059    /// [`ProxyManager::update_endpoint_backends`].
2060    async fn update_proxy_backends(&self, instance: &ServiceInstance) {
2061        let Some(proxy) = &self.proxy_manager else {
2062            return;
2063        };
2064        for endpoint in &instance.spec.endpoints {
2065            // Only L7 endpoints flow through the proxy (HTTP/HTTPS/WS).
2066            if !matches!(
2067                endpoint.protocol,
2068                Protocol::Http | Protocol::Https | Protocol::Websocket
2069            ) {
2070                continue;
2071            }
2072            let addrs = self.collect_endpoint_backends(instance, endpoint).await;
2073            proxy
2074                .update_endpoint_backends(&instance.service_name, &endpoint.name, addrs)
2075                .await;
2076        }
2077    }
2078
2079    /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
2080    ///
2081    /// For containers with a port override (macOS sandbox), the addresses already
2082    /// carry the runtime-assigned port. In that case, the container listens on the
2083    /// override port for all traffic, so we use the address port directly. For
2084    /// containers without a port override (Linux, VMs), we reconstruct addresses
2085    /// using the endpoint's declared port, since each container has its own IP
2086    /// and can bind any port independently.
2087    async fn update_stream_backends(&self, instance: &ServiceInstance) {
2088        let Some(stream_registry) = &self.stream_registry else {
2089            return;
2090        };
2091
2092        for endpoint in &instance.spec.endpoints {
2093            match endpoint.protocol {
2094                Protocol::Tcp => {
2095                    let tcp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2096                    let backend_count = tcp_backends.len();
2097                    stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
2098                    tracing::debug!(
2099                        endpoint = %endpoint.name,
2100                        port = endpoint.port,
2101                        backend_count = backend_count,
2102                        target_role = ?endpoint.target_role,
2103                        "Updated TCP stream backends"
2104                    );
2105                }
2106                Protocol::Udp => {
2107                    let udp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2108                    let backend_count = udp_backends.len();
2109                    stream_registry.update_udp_backends(endpoint.port, udp_backends);
2110                    tracing::debug!(
2111                        endpoint = %endpoint.name,
2112                        port = endpoint.port,
2113                        backend_count = backend_count,
2114                        target_role = ?endpoint.target_role,
2115                        "Updated UDP stream backends"
2116                    );
2117                }
2118                _ => {} // HTTP endpoints handled by update_proxy_backends
2119            }
2120        }
2121    }
2122
2123    /// Scale a service. Cluster-aware: if this node has a `Cluster` handle
2124    /// and we're not the leader, forward to the leader; if leader, compute
2125    /// affinity-aware placement and dispatch each node its share via
2126    /// `dispatch_scale_distributed`; else (single-node) just scale locally.
2127    ///
2128    /// # Errors
2129    /// Returns an error if scaling fails on any participating node.
2130    #[allow(clippy::cast_possible_truncation)]
2131    pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
2132        use zlayer_scheduler::cluster::InternalScaleRequest;
2133
2134        tracing::info!(
2135            target: "zlayer::scale_distribute",
2136            service = name,
2137            replicas,
2138            has_cluster = self.cluster.is_some(),
2139            "scale_service ENTER"
2140        );
2141
2142        // Attach the current spec so every receiving node can register/update
2143        // the service before scaling. This is what propagates an image change
2144        // to worker containers and lets a fresh worker run a replica it has
2145        // never seen. `None` if the service isn't registered locally (the
2146        // receiver then falls back to its own cached spec).
2147        let spec = self
2148            .services
2149            .read()
2150            .await
2151            .get(name)
2152            .map(|inst| inst.spec.clone());
2153        let build_req = |replicas: u32| {
2154            let req = InternalScaleRequest::new(name, replicas);
2155            match spec.clone() {
2156                Some(s) => req.with_spec(s),
2157                None => req,
2158            }
2159        };
2160
2161        if let Some(cluster) = &self.cluster {
2162            let is_leader = cluster.is_leader().await;
2163            tracing::info!(
2164                target: "zlayer::scale_distribute",
2165                service = name,
2166                replicas,
2167                is_leader,
2168                spec_affinity = ?spec.as_ref().and_then(|s| s.affinity.clone()),
2169                "scale_service: cluster path"
2170            );
2171            if !is_leader {
2172                // Follower: forward to the leader and let it dispatch.
2173                return cluster
2174                    .forward_scale(build_req(replicas))
2175                    .await
2176                    .map_err(|e| AgentError::CreateFailed {
2177                        id: name.to_string(),
2178                        reason: format!("cluster forward: {e}"),
2179                    });
2180            }
2181
2182            // Leader path. Compute affinity-aware placement across the Ready
2183            // node set and dispatch each node its share. `dispatch_scale_distributed`
2184            // reuses the same placement machinery as one-off container placement
2185            // (`cluster_nodes_to_node_states` + `place_service_replicas`), honoring
2186            // `ServiceSpec.affinity` (`spread`/`pack`/`pin`). The leader's own
2187            // share short-circuits to a local call (no localhost HTTP round-trip),
2188            // and the attached spec lets fresh workers register the service before
2189            // scaling. Single-node clusters fall through the default impl, which
2190            // dispatches everything to this node (unchanged behavior).
2191            return cluster
2192                .dispatch_scale_distributed(build_req(replicas))
2193                .await
2194                .map_err(|e| AgentError::CreateFailed {
2195                    id: name.to_string(),
2196                    reason: format!("cluster dispatch: {e}"),
2197                });
2198        }
2199
2200        // No cluster handle — single-node mode.
2201        self.scale_service_local(name, replicas).await
2202    }
2203
2204    /// Local (single-node) scale: directly creates/destroys containers on
2205    /// this node only. Called by:
2206    ///   - `scale_service` in single-node mode (when `self.cluster` is None).
2207    ///   - The `/api/v1/internal/scale` handler (which the leader's
2208    ///     `Cluster::dispatch_scale` HTTP-POSTs to, bottoming out the
2209    ///     recursive loop on each receiving node).
2210    ///   - The cluster impls' `local_dispatch` closure (for the leader's own
2211    ///     share — short-circuited to avoid a localhost round-trip).
2212    ///
2213    /// # Errors
2214    /// Returns an error if the service is not found or scaling fails.
2215    #[allow(clippy::cast_possible_truncation)]
2216    pub async fn scale_service_local(&self, name: &str, replicas: u32) -> Result<()> {
2217        tracing::info!(
2218            target: "zlayer::scale_distribute",
2219            service = name,
2220            replicas,
2221            "scale_service_local ENTER"
2222        );
2223        let _permit = self.scale_semaphore.acquire().await;
2224
2225        let services = self.services.read().await;
2226        let Some(instance) = services.get(name) else {
2227            // Draining a service this node never hosted is a no-op (e.g. the
2228            // leader fans out `count=0` to a node to drain it during a
2229            // scale-down, but that node never ran the service).
2230            if replicas == 0 {
2231                return Ok(());
2232            }
2233            return Err(AgentError::NotFound {
2234                container: name.to_string(),
2235                reason: "service not found".to_string(),
2236            });
2237        };
2238
2239        // Get current replica count before scaling
2240        let current_replicas = instance.replica_count().await as u32;
2241
2242        // Perform the scaling operation
2243        instance.scale_to(replicas).await?;
2244
2245        // After scaling, update proxy and stream backends for each endpoint.
2246        // Per-endpoint collection (rather than a single service-wide list)
2247        // is what makes `EndpointSpec.target_role` filtering possible:
2248        // each endpoint receives only the containers whose
2249        // `ContainerId.role` matches its declared role.
2250        if self.proxy_manager.is_some() {
2251            self.update_proxy_backends(instance).await;
2252        }
2253        if self.stream_registry.is_some() {
2254            self.update_stream_backends(instance).await;
2255        }
2256
2257        // Register new containers with supervisor for crash monitoring.
2258        //
2259        // Container ids here must match what `ServiceInstance::scale_to`
2260        // constructed — same role (derived from `replica_groups`) and same
2261        // local node id. Otherwise supervise/unsupervise miss the live entry
2262        // and crash-restart bookkeeping leaks across scale events.
2263        let local_node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2264        if let Some(supervisor) = &self.container_supervisor {
2265            // For scale-up, register new containers
2266            if replicas > current_replicas {
2267                for i in current_replicas..replicas {
2268                    let replica_idx = i + 1;
2269                    let container_id = ContainerId::with_role_and_node(
2270                        name.to_string(),
2271                        replica_idx,
2272                        instance.role_for_replica(replica_idx),
2273                        local_node_id,
2274                    );
2275                    supervisor.supervise(&container_id, &instance.spec).await;
2276                }
2277            }
2278            // For scale-down, unregister removed containers
2279            if replicas < current_replicas {
2280                for i in replicas..current_replicas {
2281                    let replica_idx = i + 1;
2282                    let container_id = ContainerId::with_role_and_node(
2283                        name.to_string(),
2284                        replica_idx,
2285                        instance.role_for_replica(replica_idx),
2286                        local_node_id,
2287                    );
2288                    supervisor.unsupervise(&container_id).await;
2289                }
2290            }
2291        }
2292
2293        Ok(())
2294    }
2295
2296    /// Collect backend addresses for a single endpoint of a service.
2297    ///
2298    /// This queries the service instance's containers for their overlay
2299    /// network IP addresses and constructs backend addresses using the
2300    /// endpoint's container target port.
2301    ///
2302    /// Containers are filtered by `endpoint.target_role`:
2303    /// - `None` (default): all containers of the service are eligible
2304    ///   (legacy behavior).
2305    /// - `Some(role)`: only containers whose `ContainerId.role` equals
2306    ///   `role` are included. Implements
2307    ///   [`zlayer_spec::EndpointSpec::target_role`].
2308    ///
2309    /// If a container has a `port_override` (e.g., macOS sandbox where all
2310    /// containers share the host network), that port is used instead of
2311    /// the spec-declared endpoint port. This allows multiple replicas on
2312    /// the same IP (`127.0.0.1`) to be distinguished by port.
2313    async fn collect_endpoint_backends(
2314        &self,
2315        instance: &ServiceInstance,
2316        endpoint: &zlayer_spec::EndpointSpec,
2317    ) -> Vec<SocketAddr> {
2318        let mut addrs = Vec::new();
2319        let endpoint_port = endpoint.target_port();
2320        let containers = instance.containers().read().await;
2321
2322        for (container_id, container) in containers.iter() {
2323            // target_role filter: skip containers whose role doesn't match.
2324            if let Some(required_role) = endpoint.target_role.as_ref() {
2325                if container_id.role != *required_role {
2326                    continue;
2327                }
2328            }
2329            let Some(ip) = container.overlay_ip else {
2330                continue;
2331            };
2332            // Use the runtime-assigned port override if present (macOS
2333            // sandbox), otherwise fall back to the endpoint's declared
2334            // target port.
2335            let port = container.port_override.unwrap_or(endpoint_port);
2336            addrs.push(SocketAddr::new(ip, port));
2337        }
2338
2339        // If we expected backends but found none, log a hint so operators
2340        // can debug. Distinguish "no containers" from "role filter
2341        // excluded everything" from "no overlay IPs".
2342        if addrs.is_empty() && !containers.is_empty() {
2343            tracing::warn!(
2344                service = %instance.service_name,
2345                endpoint = %endpoint.name,
2346                target_role = ?endpoint.target_role,
2347                container_count = containers.len(),
2348                "no backends collected for endpoint - either no matching role, no overlay IPs, or filtering excluded all"
2349            );
2350        }
2351
2352        addrs
2353    }
2354
2355    /// Get service replica count
2356    ///
2357    /// # Errors
2358    /// Returns an error if the service is not found.
2359    pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
2360        let services = self.services.read().await;
2361        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
2362            container: name.to_string(),
2363            reason: "service not found".to_string(),
2364        })?;
2365
2366        Ok(instance.replica_count().await)
2367    }
2368
2369    /// Remove a workload (service, job, or cron)
2370    ///
2371    /// This method handles cleanup for different resource types:
2372    /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
2373    /// - **Job**: Unregisters from job executor
2374    /// - **Cron**: Unregisters from cron scheduler
2375    ///
2376    /// # Errors
2377    /// Returns an error if the service cannot be removed or scale-down fails.
2378    pub async fn remove_service(&self, name: &str) -> Result<()> {
2379        // Try to unregister from cron scheduler first
2380        if let Some(scheduler) = &self.cron_scheduler {
2381            scheduler.unregister(name).await;
2382        }
2383
2384        // Try to unregister from job executor
2385        if let Some(executor) = &self.job_executor {
2386            executor.unregister_job(name).await;
2387        }
2388
2389        // Unregister stream routes (TCP/UDP) from the stream registry
2390        if let Some(stream_registry) = &self.stream_registry {
2391            // Need to get the service spec to know which ports to unregister
2392            let services = self.services.read().await;
2393            if let Some(instance) = services.get(name) {
2394                for endpoint in &instance.spec.endpoints {
2395                    match endpoint.protocol {
2396                        Protocol::Tcp => {
2397                            let _ = stream_registry.unregister_tcp(endpoint.port);
2398                            tracing::debug!(
2399                                service = %name,
2400                                port = endpoint.port,
2401                                "Unregistered TCP stream route"
2402                            );
2403                        }
2404                        Protocol::Udp => {
2405                            let _ = stream_registry.unregister_udp(endpoint.port);
2406                            tracing::debug!(
2407                                service = %name,
2408                                port = endpoint.port,
2409                                "Unregistered UDP stream route"
2410                            );
2411                        }
2412                        _ => {} // HTTP routes handled above
2413                    }
2414                }
2415            }
2416            drop(services); // Release read lock
2417        }
2418
2419        // Unpublish node-loopback ports for every live replica of this
2420        // service so the loopback listeners are freed (mirror of the
2421        // per-replica unpublish in `ServiceInstance::scale_to`). Gated on the
2422        // spec's policy; recomputes each backend from the container's stored
2423        // runtime-resolved IP and port override.
2424        {
2425            let services = self.services.read().await;
2426            if let Some(instance) = services.get(name) {
2427                if instance.spec.publish_to_node_loopback() {
2428                    if let Some(proxy) = instance.proxy_manager() {
2429                        let containers = instance.containers().read().await;
2430                        for container in containers.values() {
2431                            if let Some(ip) = container.overlay_ip {
2432                                proxy
2433                                    .unpublish_loopback_for_container(
2434                                        &instance.spec,
2435                                        ip,
2436                                        container.port_override,
2437                                    )
2438                                    .await;
2439                            }
2440                        }
2441                    }
2442                }
2443            }
2444            drop(services); // Release read lock
2445        }
2446
2447        // Unregister containers from the supervisor
2448        if let Some(supervisor) = &self.container_supervisor {
2449            let containers = self.get_service_containers(name).await;
2450            for container_id in containers {
2451                supervisor.unsupervise(&container_id).await;
2452            }
2453            tracing::debug!(service = %name, "Unregistered containers from supervisor");
2454        }
2455
2456        // Clean up DNS records for the service (bare name + FQDNs).
2457        self.cleanup_service_dns(name).await;
2458
2459        // Remove from services map (may or may not exist depending on rtype)
2460        let mut services = self.services.write().await;
2461        if services.remove(name).is_some() {
2462            tracing::debug!(service = %name, "Removed service from manager");
2463        }
2464
2465        Ok(())
2466    }
2467
2468    /// Remove every DNS record this service registered on attach: the bare
2469    /// compose service name (`{service}`), the service-level FQDN
2470    /// (`{service}.service.local`), and each replica's FQDN
2471    /// (`{replica}.{service}.service.local`). Best-effort; failures are logged.
2472    async fn cleanup_service_dns(&self, name: &str) {
2473        let Some(dns) = &self.dns_server else {
2474            return;
2475        };
2476
2477        // Bare compose service-name record (compose discovery).
2478        if let Err(e) = dns.remove_record(name).await {
2479            tracing::warn!(
2480                hostname = %name,
2481                error = %e,
2482                "failed to remove bare service-name DNS record"
2483            );
2484        }
2485
2486        // Service-level FQDN.
2487        let service_hostname = format!("{name}.service.local");
2488        if let Err(e) = dns.remove_record(&service_hostname).await {
2489            tracing::warn!(
2490                hostname = %service_hostname,
2491                error = %e,
2492                "failed to remove service DNS record"
2493            );
2494        } else {
2495            tracing::debug!(hostname = %service_hostname, "removed service DNS record");
2496        }
2497
2498        // Any remaining replica-specific FQDNs.
2499        let services = self.services.read().await;
2500        if let Some(instance) = services.get(name) {
2501            let containers = instance.containers().read().await;
2502            for (id, _) in containers.iter() {
2503                let replica_hostname = format!("{}.{}.service.local", id.replica, name);
2504                if let Err(e) = dns.remove_record(&replica_hostname).await {
2505                    tracing::warn!(
2506                        hostname = %replica_hostname,
2507                        error = %e,
2508                        "failed to remove replica DNS record during service removal"
2509                    );
2510                }
2511            }
2512        }
2513    }
2514
2515    /// Introspect service infrastructure wiring.
2516    /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
2517    pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
2518        let services = self.services.read().await;
2519        services.get(name).map(|i| {
2520            (
2521                i.has_overlay_manager(),
2522                i.has_proxy_manager(),
2523                i.has_dns_server(),
2524            )
2525        })
2526    }
2527
2528    /// List all services
2529    pub async fn list_services(&self) -> Vec<String> {
2530        self.services.read().await.keys().cloned().collect()
2531    }
2532
2533    /// Get logs for a service, aggregated from all container replicas.
2534    ///
2535    /// # Arguments
2536    /// * `service_name` - Name of the service to fetch logs for
2537    /// * `tail` - Number of lines to return per container (0 = all)
2538    /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
2539    ///
2540    /// # Errors
2541    /// Returns an error if the service or instance is not found.
2542    ///
2543    /// # Returns
2544    /// Structured log entries from all (or specific) container replicas. Each
2545    /// entry has its `service` and `deployment` fields populated when available.
2546    pub async fn get_service_logs(
2547        &self,
2548        service_name: &str,
2549        tail: usize,
2550        instance: Option<&str>,
2551    ) -> Result<Vec<LogEntry>> {
2552        let container_ids = self.get_service_containers(service_name).await;
2553
2554        if container_ids.is_empty() {
2555            return Err(AgentError::NotFound {
2556                container: service_name.to_string(),
2557                reason: "no containers found for service".to_string(),
2558            });
2559        }
2560
2561        // If a specific instance is requested, filter to just that one
2562        let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
2563            if let Ok(replica_num) = inst.parse::<u32>() {
2564                container_ids
2565                    .iter()
2566                    .filter(|id| id.replica == replica_num)
2567                    .collect()
2568            } else {
2569                // Try matching by full container ID string suffix
2570                container_ids
2571                    .iter()
2572                    .filter(|id| id.to_string().contains(inst))
2573                    .collect()
2574            }
2575        } else {
2576            container_ids.iter().collect()
2577        };
2578
2579        if target_ids.is_empty() {
2580            return Err(AgentError::NotFound {
2581                container: format!("{}/{}", service_name, instance.unwrap_or("?")),
2582                reason: "instance not found".to_string(),
2583            });
2584        }
2585
2586        let mut all_entries: Vec<LogEntry> = Vec::new();
2587
2588        for id in &target_ids {
2589            match self.runtime.container_logs(id, tail).await {
2590                Ok(mut entries) => {
2591                    // Populate service and deployment metadata on each entry
2592                    for entry in &mut entries {
2593                        if entry.service.is_none() {
2594                            entry.service = Some(service_name.to_string());
2595                        }
2596                        if entry.deployment.is_none() {
2597                            entry.deployment.clone_from(&self.deployment_name);
2598                        }
2599                    }
2600                    all_entries.extend(entries);
2601                }
2602                Err(e) => {
2603                    tracing::warn!(
2604                        service = service_name,
2605                        container = %id,
2606                        error = %e,
2607                        "Failed to read container logs"
2608                    );
2609                }
2610            }
2611        }
2612
2613        Ok(all_entries)
2614    }
2615
2616    /// Get all container IDs for a specific service
2617    ///
2618    /// Returns an empty vector if the service doesn't exist.
2619    ///
2620    /// # Arguments
2621    /// * `service_name` - Name of the service to query
2622    ///
2623    /// # Returns
2624    /// Vector of `ContainerIds` for all replicas of the service
2625    pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
2626        let services = self.services.read().await;
2627        if let Some(instance) = services.get(service_name) {
2628            instance.container_ids().await
2629        } else {
2630            Vec::new()
2631        }
2632    }
2633
2634    /// Get per-container info (id, image, real state, pid, overlay IP) for a
2635    /// specific service.
2636    ///
2637    /// Unlike [`get_service_containers`](Self::get_service_containers) (which
2638    /// returns ids only), this surfaces the REAL image reference and lifecycle
2639    /// state of each live container so the API/`ps` can report them accurately.
2640    ///
2641    /// Returns an empty vector if the service doesn't exist.
2642    pub async fn get_service_container_infos(&self, service_name: &str) -> Vec<ContainerInfo> {
2643        let services = self.services.read().await;
2644        if let Some(instance) = services.get(service_name) {
2645            instance.container_infos().await
2646        } else {
2647            Vec::new()
2648        }
2649    }
2650
2651    /// This node's **local** view of `service` (running replica count, health,
2652    /// containers), used for cluster-wide aggregation. Served by the internal
2653    /// `/api/v1/internal/services/{svc}/state` endpoint and used as the local
2654    /// part of [`Self::cluster_service_states`].
2655    pub async fn local_service_state(
2656        &self,
2657        service: &str,
2658    ) -> zlayer_types::cluster::NodeServiceState {
2659        use zlayer_types::cluster::{ClusterContainerSummary, NodeServiceState};
2660        let node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2661        let infos = self.get_service_container_infos(service).await;
2662        #[allow(clippy::cast_possible_truncation)]
2663        let running = infos
2664            .iter()
2665            .filter(|i| i.state.eq_ignore_ascii_case("running"))
2666            .count() as u32;
2667        // A node running 0 replicas is trivially healthy (it can't drag the
2668        // cluster-wide aggregate). Otherwise require a Healthy health state.
2669        let healthy = if running == 0 {
2670            true
2671        } else {
2672            let states = self.health_states();
2673            let guard = states.read().await;
2674            matches!(guard.get(service), Some(HealthState::Healthy))
2675        };
2676        let containers = infos
2677            .into_iter()
2678            .map(|i| ClusterContainerSummary {
2679                node_id,
2680                id: i.id.to_string(),
2681                service: i.id.service.clone(),
2682                replica: i.id.replica,
2683                image: i.image,
2684                state: i.state,
2685                pid: i.pid,
2686                overlay_ip: i.overlay_ip,
2687            })
2688            .collect();
2689        NodeServiceState {
2690            node_id,
2691            running,
2692            healthy,
2693            containers,
2694        }
2695    }
2696
2697    /// Cluster-wide per-node states for `service`: this node's local view plus
2698    /// every other node's (fetched via the cluster handle's
2699    /// `fetch_remote_service_states`). When not clustered, returns just the
2700    /// local view. This is the source of truth for distributed-service replica
2701    /// counts, health, and the `ps` container listing on the leader.
2702    pub async fn cluster_service_states(
2703        &self,
2704        service: &str,
2705    ) -> Vec<zlayer_types::cluster::NodeServiceState> {
2706        let mut states = vec![self.local_service_state(service).await];
2707        if let Some(cluster) = &self.cluster {
2708            states.extend(cluster.fetch_remote_service_states(service).await);
2709        }
2710        states
2711    }
2712
2713    /// Execute a command inside a running container for a service
2714    ///
2715    /// Picks a specific replica if provided, otherwise uses the first available container.
2716    ///
2717    /// # Arguments
2718    /// * `service_name` - Name of the service
2719    /// * `replica` - Optional replica number to target
2720    /// * `cmd` - Command and arguments to execute
2721    ///
2722    /// # Errors
2723    /// Returns an error if the service or replica is not found, or if exec fails.
2724    ///
2725    /// # Panics
2726    /// Panics if no replica is specified and the container list is unexpectedly empty
2727    /// after the emptiness check (should not happen in practice).
2728    ///
2729    /// # Returns
2730    /// Tuple of (`exit_code`, stdout, stderr)
2731    pub async fn exec_in_container(
2732        &self,
2733        service_name: &str,
2734        replica: Option<u32>,
2735        cmd: &[String],
2736    ) -> Result<(i32, String, String)> {
2737        let container_ids = self.get_service_containers(service_name).await;
2738
2739        if container_ids.is_empty() {
2740            return Err(AgentError::NotFound {
2741                container: service_name.to_string(),
2742                reason: "no containers found for service".to_string(),
2743            });
2744        }
2745
2746        // Pick the target container
2747        let target = if let Some(rep) = replica {
2748            container_ids
2749                .into_iter()
2750                .find(|cid| cid.replica == rep)
2751                .ok_or_else(|| AgentError::NotFound {
2752                    container: format!("{service_name}-rep-{rep}"),
2753                    reason: format!("replica {rep} not found for service"),
2754                })?
2755        } else {
2756            // Use the first container (lowest replica number)
2757            container_ids.into_iter().next().unwrap()
2758        };
2759
2760        self.runtime.exec(&target, cmd).await
2761    }
2762
2763    /// List every live container across all services, enriched with the data a
2764    /// Docker `ps` row needs and the data the Docker-name resolver needs.
2765    ///
2766    /// For each running container this surfaces the deployment name, the service
2767    /// name, the concrete [`ContainerId`], the compose `container_name:` label
2768    /// (when set, the user-facing Docker name), the real image, the lifecycle
2769    /// state, and the service's published port mappings. Used by the unified
2770    /// container-name resolver and by `docker ps` so compose deployments show up
2771    /// and resolve by their `container_name`.
2772    pub async fn list_container_views(&self) -> Vec<DeploymentContainerView> {
2773        let deployment = self.deployment_name.clone();
2774        let services = self.services.read().await;
2775        let mut out = Vec::new();
2776        for (service_name, instance) in services.iter() {
2777            let container_name = instance
2778                .spec
2779                .labels
2780                .get("com.docker.compose.container_name")
2781                .cloned();
2782            let ports = instance.spec.port_mappings.clone();
2783            for info in instance.container_infos().await {
2784                out.push(DeploymentContainerView {
2785                    deployment: deployment.clone(),
2786                    service: service_name.clone(),
2787                    container_id: info.id,
2788                    container_name: container_name.clone(),
2789                    image: info.image,
2790                    state: info.state,
2791                    pid: info.pid,
2792                    ports: ports.clone(),
2793                });
2794            }
2795        }
2796        out
2797    }
2798
2799    /// Resolve a Docker-style container name/id to a live deployment
2800    /// [`ContainerId`].
2801    ///
2802    /// Matching precedence (first hit wins):
2803    /// 1. The compose `container_name:` label (e.g. `forgejo-e2e`).
2804    /// 2. The conventional compose names `{deployment}-{service}-{replica}` and
2805    ///    `{deployment}_{service}_{replica}` (replica is 1-based, mirroring
2806    ///    Docker Compose; `ContainerId.replica` is 0-based so we add 1).
2807    /// 3. The bare service name (`{service}`), targeting its first replica.
2808    /// 4. The [`ContainerId`] `Display` form.
2809    ///
2810    /// Returns `None` when nothing matches a *running* container.
2811    pub async fn resolve_container_name(&self, name: &str) -> Option<ContainerId> {
2812        let views = self.list_container_views().await;
2813        // 1. explicit container_name label.
2814        if let Some(v) = views
2815            .iter()
2816            .find(|v| v.container_name.as_deref() == Some(name))
2817        {
2818            return Some(v.container_id.clone());
2819        }
2820        // 2 & 3. conventional names + bare service name.
2821        for v in &views {
2822            let dep = v.deployment.as_deref().unwrap_or("");
2823            let svc = &v.service;
2824            let rep1 = v.container_id.replica + 1;
2825            let candidates = [
2826                format!("{dep}-{svc}-{rep1}"),
2827                format!("{dep}_{svc}_{rep1}"),
2828                svc.clone(),
2829            ];
2830            if candidates.iter().any(|c| c == name) {
2831                return Some(v.container_id.clone());
2832            }
2833        }
2834        // 4. ContainerId Display form.
2835        for v in &views {
2836            if v.container_id.to_string() == name {
2837                return Some(v.container_id.clone());
2838            }
2839        }
2840        None
2841    }
2842
2843    /// Execute a command in a specific deployment container (by its concrete
2844    /// [`ContainerId`]) honouring Docker `exec` options (`--user`, `-w`, `-e`).
2845    ///
2846    /// Routes through [`Runtime::exec_with_opts`] so runtimes that support
2847    /// dropping to a uid/gid + chdir + env injection (macOS VZ-Linux) apply
2848    /// them; others fall back to a plain buffered exec.
2849    ///
2850    /// # Errors
2851    /// Propagates the runtime's exec error.
2852    pub async fn exec_in_container_id_with_opts(
2853        &self,
2854        id: &ContainerId,
2855        opts: &crate::runtime::ExecOptions,
2856    ) -> Result<(i32, String, String)> {
2857        self.runtime.exec_with_opts(id, opts).await
2858    }
2859
2860    // ==================== Job Management ====================
2861
2862    /// Trigger a job execution
2863    ///
2864    /// # Arguments
2865    /// * `name` - Name of the registered job
2866    /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
2867    ///
2868    /// # Returns
2869    /// The execution ID for tracking the job
2870    ///
2871    /// # Errors
2872    /// - Returns error if job executor is not configured
2873    /// - Returns error if the job is not registered
2874    pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
2875        let executor = self
2876            .job_executor
2877            .as_ref()
2878            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2879
2880        let spec = executor
2881            .get_job_spec(name)
2882            .await
2883            .ok_or_else(|| AgentError::NotFound {
2884                container: name.to_string(),
2885                reason: "job not registered".to_string(),
2886            })?;
2887
2888        executor.trigger(name, &spec, trigger).await
2889    }
2890
2891    /// Get the status of a job execution
2892    ///
2893    /// # Arguments
2894    /// * `id` - The execution ID returned from `trigger_job`
2895    ///
2896    /// # Returns
2897    /// The job execution details, or None if not found
2898    pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
2899        if let Some(executor) = &self.job_executor {
2900            executor.get_execution(id).await
2901        } else {
2902            None
2903        }
2904    }
2905
2906    /// List all executions for a specific job
2907    ///
2908    /// # Arguments
2909    /// * `name` - Name of the job
2910    ///
2911    /// # Returns
2912    /// Vector of job executions for the specified job
2913    pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
2914        if let Some(executor) = &self.job_executor {
2915            executor.list_executions(name).await
2916        } else {
2917            Vec::new()
2918        }
2919    }
2920
2921    /// Cancel a running job execution
2922    ///
2923    /// # Arguments
2924    /// * `id` - The execution ID to cancel
2925    ///
2926    /// # Errors
2927    /// Returns error if job executor is not configured or if cancellation fails
2928    pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
2929        let executor = self
2930            .job_executor
2931            .as_ref()
2932            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2933
2934        executor.cancel(id).await
2935    }
2936
2937    // ==================== Cron Management ====================
2938
2939    /// Manually trigger a cron job (outside of its schedule)
2940    ///
2941    /// # Arguments
2942    /// * `name` - Name of the cron job
2943    ///
2944    /// # Returns
2945    /// The execution ID for tracking the triggered job
2946    ///
2947    /// # Errors
2948    /// Returns error if cron scheduler is not configured or job not found
2949    pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
2950        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2951            AgentError::Configuration("Cron scheduler not configured".to_string())
2952        })?;
2953
2954        scheduler.trigger_now(name).await
2955    }
2956
2957    /// Enable or disable a cron job
2958    ///
2959    /// # Arguments
2960    /// * `name` - Name of the cron job
2961    /// * `enabled` - Whether to enable or disable the job
2962    pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
2963        if let Some(scheduler) = &self.cron_scheduler {
2964            scheduler.set_enabled(name, enabled).await;
2965        }
2966    }
2967
2968    /// List all registered cron jobs
2969    pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2970        if let Some(scheduler) = &self.cron_scheduler {
2971            scheduler.list_jobs().await
2972        } else {
2973            Vec::new()
2974        }
2975    }
2976
2977    /// Start the cron scheduler background task
2978    ///
2979    /// This spawns a background task that checks for due cron jobs every second.
2980    /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
2981    ///
2982    /// # Errors
2983    /// Returns error if cron scheduler is not configured
2984    pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2985        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2986            AgentError::Configuration("Cron scheduler not configured".to_string())
2987        })?;
2988
2989        let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2990        Ok(tokio::spawn(async move {
2991            scheduler.run_loop().await;
2992        }))
2993    }
2994
2995    /// Shutdown the cron scheduler
2996    pub fn shutdown_cron(&self) {
2997        if let Some(scheduler) = &self.cron_scheduler {
2998            scheduler.shutdown();
2999        }
3000    }
3001}
3002
3003#[cfg(test)]
3004#[allow(deprecated)]
3005mod tests {
3006    use super::*;
3007    use crate::runtime::MockRuntime;
3008
3009    #[tokio::test]
3010    async fn test_service_manager() {
3011        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3012        let manager = ServiceManager::new(runtime);
3013
3014        // Add service
3015        let spec = mock_spec();
3016        Box::pin(manager.upsert_service("test".to_string(), spec))
3017            .await
3018            .unwrap();
3019
3020        // Scale up
3021        manager.scale_service("test", 3).await.unwrap();
3022
3023        // Check count
3024        let count = manager.service_replica_count("test").await.unwrap();
3025        assert_eq!(count, 3);
3026
3027        // List services
3028        let services = manager.list_services().await;
3029        assert_eq!(services, vec!["test".to_string()]);
3030    }
3031
3032    #[tokio::test]
3033    async fn test_service_manager_basic_lifecycle() {
3034        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3035        let manager = ServiceManager::new(runtime);
3036
3037        // Add service with HTTP endpoint
3038        let spec = mock_spec();
3039        Box::pin(manager.upsert_service("api".to_string(), spec))
3040            .await
3041            .unwrap();
3042
3043        // Scale up
3044        manager.scale_service("api", 2).await.unwrap();
3045
3046        // Check count
3047        let count = manager.service_replica_count("api").await.unwrap();
3048        assert_eq!(count, 2);
3049
3050        // Remove service
3051        manager.remove_service("api").await.unwrap();
3052
3053        // Verify service is gone
3054        let services = manager.list_services().await;
3055        assert!(!services.contains(&"api".to_string()));
3056    }
3057
3058    #[tokio::test]
3059    async fn test_service_manager_with_full_config() {
3060        use tokio::sync::RwLock;
3061
3062        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3063
3064        // Create a mock overlay manager (skip actual network setup)
3065        let overlay_manager = Arc::new(RwLock::new(
3066            OverlayManager::new("test-deployment".to_string(), "test".to_string())
3067                .await
3068                .unwrap(),
3069        ));
3070
3071        let manager =
3072            ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
3073
3074        // Add service
3075        let spec = mock_spec();
3076        Box::pin(manager.upsert_service("web".to_string(), spec))
3077            .await
3078            .unwrap();
3079
3080        // Verify service is registered
3081        let services = manager.list_services().await;
3082        assert!(services.contains(&"web".to_string()));
3083    }
3084
3085    #[test]
3086    fn test_container_state_as_str() {
3087        use crate::runtime::ContainerState;
3088        assert_eq!(ContainerState::Pending.as_str(), "pending");
3089        assert_eq!(ContainerState::Initializing.as_str(), "initializing");
3090        assert_eq!(ContainerState::Running.as_str(), "running");
3091        assert_eq!(ContainerState::Stopping.as_str(), "stopping");
3092        assert_eq!(ContainerState::Exited { code: 0 }.as_str(), "exited");
3093        assert_eq!(
3094            ContainerState::Failed {
3095                reason: "boom".to_string()
3096            }
3097            .as_str(),
3098            "failed"
3099        );
3100        // Display delegates to as_str.
3101        assert_eq!(ContainerState::Running.to_string(), "running");
3102    }
3103
3104    /// A container created from image X must report image X and its real
3105    /// lifecycle state through the new `container_infos` accessor, replacing
3106    /// the previously hardcoded `"running"` / empty-image behavior.
3107    #[tokio::test]
3108    async fn test_container_infos_surfaces_image_and_state() {
3109        use crate::runtime::{Container, ContainerState};
3110
3111        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3112        let manager = ServiceManager::new(runtime);
3113
3114        let spec = mock_spec(); // image name = "test:latest"
3115        let image = spec.image.name.to_string();
3116        Box::pin(manager.upsert_service("web".to_string(), spec))
3117            .await
3118            .unwrap();
3119
3120        // Inject containers directly with distinct states.
3121        {
3122            let services = manager.services.read().await;
3123            let instance = services.get("web").unwrap();
3124            let mut containers = instance.containers().write().await;
3125
3126            let running_id = ContainerId::new("web", 0);
3127            containers.insert(
3128                running_id.clone(),
3129                Container {
3130                    id: running_id,
3131                    image: image.clone(),
3132                    state: ContainerState::Running,
3133                    pid: Some(4242),
3134                    task: None,
3135                    overlay_ip: None,
3136                    health_monitor: None,
3137                    port_override: None,
3138                },
3139            );
3140
3141            let exited_id = ContainerId::new("web", 1);
3142            containers.insert(
3143                exited_id.clone(),
3144                Container {
3145                    id: exited_id,
3146                    image: image.clone(),
3147                    state: ContainerState::Exited { code: 1 },
3148                    pid: None,
3149                    task: None,
3150                    overlay_ip: None,
3151                    health_monitor: None,
3152                    port_override: None,
3153                },
3154            );
3155        }
3156
3157        let mut infos = manager.get_service_container_infos("web").await;
3158        infos.sort_by_key(|i| i.id.replica);
3159        assert_eq!(infos.len(), 2);
3160
3161        // Every container reports the real image it was created from.
3162        assert!(infos.iter().all(|i| i.image == image));
3163        assert!(infos.iter().all(|i| i.image == "test:latest"));
3164
3165        // Real per-container state is surfaced (not a hardcoded "running").
3166        assert_eq!(infos[0].state, "running");
3167        assert_eq!(infos[0].pid, Some(4242));
3168        assert_eq!(infos[1].state, "exited");
3169        assert_eq!(infos[1].pid, None);
3170
3171        // Unknown service yields an empty list.
3172        assert!(manager
3173            .get_service_container_infos("missing")
3174            .await
3175            .is_empty());
3176    }
3177
3178    /// Bug 2 (`cluster_upgrade`): a changed image *reference* (tag bump) under
3179    /// `if_not_present` must still recreate the local replicas. Previously the
3180    /// recreate only fired on digest drift under `Always`/`Newer`, so a tag
3181    /// change was silently ignored and containers stayed on the old image.
3182    #[tokio::test]
3183    async fn upsert_recreates_local_replicas_on_image_reference_change() {
3184        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3185        let manager = ServiceManager::new(runtime);
3186
3187        // Deploy v1 with the e2e's pull policy (if_not_present) and scale up.
3188        let mut spec = mock_spec();
3189        spec.image.name = "docker.io/library/nginx:1.28-alpine".parse().unwrap();
3190        spec.image.pull_policy = zlayer_spec::PullPolicy::IfNotPresent;
3191        Box::pin(manager.upsert_service("web".to_string(), spec.clone()))
3192            .await
3193            .unwrap();
3194        manager.scale_service_local("web", 2).await.unwrap();
3195
3196        let v1: Vec<String> = manager
3197            .get_service_container_infos("web")
3198            .await
3199            .into_iter()
3200            .map(|i| i.image)
3201            .collect();
3202        assert_eq!(v1.len(), 2);
3203        assert!(
3204            v1.iter().all(|img| img.contains("1.28")),
3205            "expected v1 images, got {v1:?}"
3206        );
3207
3208        // Upgrade to v2 under the SAME if_not_present policy.
3209        let mut spec_v2 = spec;
3210        spec_v2.image.name = "docker.io/library/nginx:1.29-alpine".parse().unwrap();
3211        Box::pin(manager.upsert_service("web".to_string(), spec_v2))
3212            .await
3213            .unwrap();
3214
3215        let v2: Vec<String> = manager
3216            .get_service_container_infos("web")
3217            .await
3218            .into_iter()
3219            .map(|i| i.image)
3220            .collect();
3221        assert_eq!(v2.len(), 2, "replica count preserved across upgrade");
3222        assert!(
3223            v2.iter().all(|img| img.contains("1.29")),
3224            "containers must be recreated on the new image, got {v2:?}"
3225        );
3226    }
3227
3228    fn mock_spec() -> ServiceSpec {
3229        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3230            r"
3231version: v1
3232deployment: test
3233services:
3234  test:
3235    rtype: service
3236    image:
3237      name: test:latest
3238    endpoints:
3239      - name: http
3240        protocol: http
3241        port: 8080
3242    scale:
3243      mode: fixed
3244      replicas: 1
3245",
3246        )
3247        .unwrap()
3248        .services
3249        .remove("test")
3250        .unwrap()
3251    }
3252
3253    #[test]
3254    fn test_set_container_dns_injects_when_empty() {
3255        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3256        let spec = mock_spec(); // spec.dns defaults to empty, host_network false
3257        let mut instance =
3258            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3259        instance.set_container_dns("10.42.0.1".parse().unwrap());
3260        assert_eq!(instance.spec.dns, vec!["10.42.0.1".to_string()]);
3261    }
3262
3263    #[test]
3264    fn test_set_container_dns_skips_host_network() {
3265        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3266        let mut spec = mock_spec();
3267        spec.host_network = true;
3268        let mut instance =
3269            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3270        instance.set_container_dns("10.42.0.1".parse().unwrap());
3271        assert!(
3272            instance.spec.dns.is_empty(),
3273            "host_network containers must inherit the host resolv.conf"
3274        );
3275    }
3276
3277    #[test]
3278    fn test_set_container_dns_preserves_user_dns() {
3279        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3280        let mut spec = mock_spec();
3281        spec.dns = vec!["1.1.1.1".to_string()];
3282        let mut instance =
3283            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3284        instance.set_container_dns("10.42.0.1".parse().unwrap());
3285        assert_eq!(
3286            instance.spec.dns,
3287            vec!["1.1.1.1".to_string()],
3288            "user-supplied spec.dns must win over the overlay resolver"
3289        );
3290    }
3291
3292    /// Helper to create a `ServiceSpec` with dependencies
3293    fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
3294        let mut spec = mock_spec();
3295        spec.depends = deps;
3296        spec
3297    }
3298
3299    /// Helper to create a `DependsSpec`
3300    fn dep(
3301        service: &str,
3302        condition: zlayer_spec::DependencyCondition,
3303        timeout_ms: u64,
3304        on_timeout: zlayer_spec::TimeoutAction,
3305    ) -> DependsSpec {
3306        DependsSpec {
3307            service: service.to_string(),
3308            condition,
3309            timeout: Some(Duration::from_millis(timeout_ms)),
3310            on_timeout,
3311        }
3312    }
3313
3314    #[tokio::test]
3315    async fn test_deploy_with_dependencies_no_deps() {
3316        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3317        let manager = ServiceManager::new(runtime);
3318
3319        // Services with no dependencies
3320        let mut services = HashMap::new();
3321        services.insert("a".to_string(), mock_spec());
3322        services.insert("b".to_string(), mock_spec());
3323
3324        // Should deploy both without issue
3325        Box::pin(manager.deploy_with_dependencies(services))
3326            .await
3327            .unwrap();
3328
3329        // Both services should be registered
3330        let service_list = manager.list_services().await;
3331        assert_eq!(service_list.len(), 2);
3332    }
3333
3334    #[tokio::test]
3335    async fn test_deploy_with_dependencies_linear() {
3336        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3337        let manager = ServiceManager::new(runtime);
3338
3339        // A -> B -> C (A depends on B, B depends on C)
3340        // All use "started" condition which is satisfied when container is running
3341        let mut services = HashMap::new();
3342        services.insert("c".to_string(), mock_spec());
3343        services.insert(
3344            "b".to_string(),
3345            mock_spec_with_deps(vec![dep(
3346                "c",
3347                zlayer_spec::DependencyCondition::Started,
3348                5000,
3349                zlayer_spec::TimeoutAction::Fail,
3350            )]),
3351        );
3352        services.insert(
3353            "a".to_string(),
3354            mock_spec_with_deps(vec![dep(
3355                "b",
3356                zlayer_spec::DependencyCondition::Started,
3357                5000,
3358                zlayer_spec::TimeoutAction::Fail,
3359            )]),
3360        );
3361
3362        // Should deploy in order: c, b, a
3363        Box::pin(manager.deploy_with_dependencies(services))
3364            .await
3365            .unwrap();
3366
3367        // All services should be registered
3368        let service_list = manager.list_services().await;
3369        assert_eq!(service_list.len(), 3);
3370    }
3371
3372    #[tokio::test]
3373    async fn test_deploy_with_dependencies_cycle_detection() {
3374        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3375        let manager = ServiceManager::new(runtime);
3376
3377        // A -> B -> A (cycle)
3378        let mut services = HashMap::new();
3379        services.insert(
3380            "a".to_string(),
3381            mock_spec_with_deps(vec![dep(
3382                "b",
3383                zlayer_spec::DependencyCondition::Started,
3384                5000,
3385                zlayer_spec::TimeoutAction::Fail,
3386            )]),
3387        );
3388        services.insert(
3389            "b".to_string(),
3390            mock_spec_with_deps(vec![dep(
3391                "a",
3392                zlayer_spec::DependencyCondition::Started,
3393                5000,
3394                zlayer_spec::TimeoutAction::Fail,
3395            )]),
3396        );
3397
3398        // Should fail with cycle detection
3399        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3400        assert!(result.is_err());
3401        let err = result.unwrap_err().to_string();
3402        assert!(err.contains("Cyclic dependency"));
3403    }
3404
3405    #[tokio::test]
3406    async fn test_deploy_with_dependencies_timeout_continue() {
3407        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3408        let manager = ServiceManager::new(runtime);
3409
3410        // A depends on B (healthy), but B never becomes healthy
3411        // Using continue action, so it should proceed anyway
3412        let mut services = HashMap::new();
3413        services.insert("b".to_string(), mock_spec());
3414        services.insert(
3415            "a".to_string(),
3416            mock_spec_with_deps(vec![dep(
3417                "b",
3418                zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
3419                100,                                       // Short timeout
3420                zlayer_spec::TimeoutAction::Continue,      // But continue anyway
3421            )]),
3422        );
3423
3424        // Should deploy both despite timeout
3425        Box::pin(manager.deploy_with_dependencies(services))
3426            .await
3427            .unwrap();
3428
3429        let service_list = manager.list_services().await;
3430        assert_eq!(service_list.len(), 2);
3431    }
3432
3433    #[tokio::test]
3434    async fn test_deploy_with_dependencies_timeout_warn() {
3435        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3436        let manager = ServiceManager::new(runtime);
3437
3438        // A depends on B (healthy), but B never becomes healthy
3439        // Using warn action, so it should proceed with a warning
3440        let mut services = HashMap::new();
3441        services.insert("b".to_string(), mock_spec());
3442        services.insert(
3443            "a".to_string(),
3444            mock_spec_with_deps(vec![dep(
3445                "b",
3446                zlayer_spec::DependencyCondition::Healthy,
3447                100,
3448                zlayer_spec::TimeoutAction::Warn,
3449            )]),
3450        );
3451
3452        // Should deploy both despite timeout (with warning)
3453        Box::pin(manager.deploy_with_dependencies(services))
3454            .await
3455            .unwrap();
3456
3457        let service_list = manager.list_services().await;
3458        assert_eq!(service_list.len(), 2);
3459    }
3460
3461    #[tokio::test]
3462    async fn test_deploy_with_dependencies_timeout_fail() {
3463        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3464        let manager = ServiceManager::new(runtime);
3465
3466        // A depends on B (healthy), but B never becomes healthy
3467        // Using fail action, so deployment should fail
3468        let mut services = HashMap::new();
3469        services.insert("b".to_string(), mock_spec());
3470        services.insert(
3471            "a".to_string(),
3472            mock_spec_with_deps(vec![dep(
3473                "b",
3474                zlayer_spec::DependencyCondition::Healthy,
3475                100,
3476                zlayer_spec::TimeoutAction::Fail,
3477            )]),
3478        );
3479
3480        // Should fail after B is started but doesn't become healthy
3481        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3482        assert!(result.is_err());
3483
3484        // B should be started (it has no deps), but A should fail
3485        let err = result.unwrap_err().to_string();
3486        assert!(err.contains("Dependency timeout"));
3487    }
3488
3489    #[tokio::test]
3490    async fn test_check_dependencies_all_satisfied() {
3491        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3492        let manager = ServiceManager::new(runtime);
3493
3494        // Mark a service as healthy
3495        manager
3496            .update_health_state("db", HealthState::Healthy)
3497            .await;
3498
3499        let deps = vec![DependsSpec {
3500            service: "db".to_string(),
3501            condition: zlayer_spec::DependencyCondition::Healthy,
3502            timeout: Some(Duration::from_secs(60)),
3503            on_timeout: zlayer_spec::TimeoutAction::Fail,
3504        }];
3505
3506        let satisfied = manager.check_dependencies(&deps).await.unwrap();
3507        assert!(satisfied);
3508    }
3509
3510    #[tokio::test]
3511    async fn test_check_dependencies_not_satisfied() {
3512        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3513        let manager = ServiceManager::new(runtime);
3514
3515        // Service not healthy (no state set = Unknown)
3516        let deps = vec![DependsSpec {
3517            service: "db".to_string(),
3518            condition: zlayer_spec::DependencyCondition::Healthy,
3519            timeout: Some(Duration::from_secs(60)),
3520            on_timeout: zlayer_spec::TimeoutAction::Fail,
3521        }];
3522
3523        let satisfied = manager.check_dependencies(&deps).await.unwrap();
3524        assert!(!satisfied);
3525    }
3526
3527    #[tokio::test]
3528    async fn test_health_state_tracking() {
3529        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3530        let manager = ServiceManager::new(runtime);
3531
3532        // Update health states
3533        manager
3534            .update_health_state("db", HealthState::Healthy)
3535            .await;
3536        manager
3537            .update_health_state("cache", HealthState::Unknown)
3538            .await;
3539
3540        // Verify states
3541        let states = manager.health_states();
3542        let states_read = states.read().await;
3543
3544        assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
3545        assert!(matches!(
3546            states_read.get("cache"),
3547            Some(HealthState::Unknown)
3548        ));
3549    }
3550
3551    /// Regression test for the stabilization timeout that blocked the raft-e2e
3552    /// `cluster_scaling` / `cluster_upgrade` suites.
3553    ///
3554    /// Previously the callback that bridges a container's health result into the
3555    /// `ServiceManager` `health_states` map was only attached when BOTH a proxy
3556    /// manager AND a reachable overlay IP existed. In degraded-overlay / no-proxy
3557    /// deployments that `if let` was false, so `health_states` was never written,
3558    /// the service stayed `healthy=false` forever, and stabilization timed out
3559    /// even though the container was running and its health check passing.
3560    ///
3561    /// This test drives the real `scale_to` create path with:
3562    ///   * NO `proxy_manager` (so `proxy_backend` resolves to None), and
3563    ///   * a `Command { command: "true" }` health check (always passes host-side),
3564    /// then asserts the shared `health_states` map receives `Healthy` for the
3565    /// service — proving the bridge fires unconditionally.
3566    ///
3567    /// Gated to `#[cfg(unix)]` because `HealthCheck::Command` is executed via
3568    /// `sh -c <command>` in `crate::health::HealthChecker::check_command`. On
3569    /// Windows hosts without `sh` on PATH (the default Windows CI image), no
3570    /// Command-based health check can ever pass, so the test would fail for
3571    /// reasons unrelated to the bridge it is regression-testing. The bridge
3572    /// behavior under test is platform-agnostic; only the test fixture's
3573    /// "always-passes command" needs a Unix shell.
3574    #[cfg(unix)]
3575    #[tokio::test]
3576    async fn test_health_states_bridge_fires_without_proxy() {
3577        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3578
3579        // Service spec with a host-side command health check that always passes.
3580        // Zero start-grace + a short interval keep the test fast.
3581        let mut spec = mock_spec();
3582        spec.health = zlayer_spec::HealthSpec {
3583            start_grace: Some(Duration::from_millis(0)),
3584            interval: Some(Duration::from_millis(50)),
3585            timeout: Some(Duration::from_secs(5)),
3586            retries: 1,
3587            check: HealthCheck::Command {
3588                command: "true".to_string(),
3589            },
3590        };
3591
3592        // Build a ServiceInstance with NO proxy_manager and NO overlay_manager,
3593        // then wire in the shared health_states map exactly as ServiceManager does.
3594        let mut instance =
3595            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3596        let health_states: Arc<RwLock<HashMap<String, HealthState>>> =
3597            Arc::new(RwLock::new(HashMap::new()));
3598        instance.set_health_states(Arc::clone(&health_states));
3599
3600        // Drive the real create path (no proxy, MockRuntime IP present but proxy
3601        // absent => proxy_backend is None, hitting the previously-broken branch).
3602        instance.scale_to(1).await.unwrap();
3603
3604        // Poll for the bridged Healthy state (the monitor checks asynchronously
3605        // after its start grace). Bounded so a regression fails fast.
3606        let mut bridged = false;
3607        for _ in 0..100 {
3608            if matches!(
3609                health_states.read().await.get("web"),
3610                Some(HealthState::Healthy)
3611            ) {
3612                bridged = true;
3613                break;
3614            }
3615            tokio::time::sleep(Duration::from_millis(50)).await;
3616        }
3617
3618        assert!(
3619            bridged,
3620            "health_states must receive Healthy for the service even without a \
3621             proxy or overlay IP; the bridge regressed and stabilization would time out"
3622        );
3623    }
3624
3625    // ==================== Job/Cron Integration Tests ====================
3626
3627    fn mock_job_spec() -> ServiceSpec {
3628        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3629            r"
3630version: v1
3631deployment: test
3632services:
3633  backup:
3634    rtype: job
3635    image:
3636      name: backup:latest
3637",
3638        )
3639        .unwrap()
3640        .services
3641        .remove("backup")
3642        .unwrap()
3643    }
3644
3645    fn mock_cron_spec() -> ServiceSpec {
3646        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3647            r#"
3648version: v1
3649deployment: test
3650services:
3651  cleanup:
3652    rtype: cron
3653    schedule: "0 0 * * * * *"
3654    image:
3655      name: cleanup:latest
3656"#,
3657        )
3658        .unwrap()
3659        .services
3660        .remove("cleanup")
3661        .unwrap()
3662    }
3663
3664    #[tokio::test]
3665    async fn test_service_manager_with_job_executor() {
3666        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3667        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3668
3669        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3670
3671        // Register job
3672        let job_spec = mock_job_spec();
3673        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3674            .await
3675            .unwrap();
3676
3677        // Trigger job
3678        let exec_id = manager
3679            .trigger_job("backup", JobTrigger::Cli)
3680            .await
3681            .unwrap();
3682
3683        // Give job time to start
3684        tokio::time::sleep(Duration::from_millis(50)).await;
3685
3686        // Check execution exists
3687        let execution = manager.get_job_execution(&exec_id).await;
3688        assert!(execution.is_some());
3689        assert_eq!(execution.unwrap().job_name, "backup");
3690    }
3691
3692    #[tokio::test]
3693    async fn test_service_manager_with_cron_scheduler() {
3694        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3695        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3696        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3697
3698        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3699
3700        // Register cron job
3701        let cron_spec = mock_cron_spec();
3702        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3703            .await
3704            .unwrap();
3705
3706        // List cron jobs
3707        let cron_jobs = manager.list_cron_jobs().await;
3708        assert_eq!(cron_jobs.len(), 1);
3709        assert_eq!(cron_jobs[0].name, "cleanup");
3710        assert!(cron_jobs[0].enabled);
3711    }
3712
3713    #[tokio::test]
3714    async fn test_service_manager_trigger_cron() {
3715        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3716        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3717        let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
3718
3719        let manager = ServiceManager::new(runtime)
3720            .with_job_executor(job_executor)
3721            .with_cron_scheduler(cron_scheduler);
3722
3723        // Register cron job
3724        let cron_spec = mock_cron_spec();
3725        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3726            .await
3727            .unwrap();
3728
3729        // Manually trigger the cron job
3730        let exec_id = manager.trigger_cron("cleanup").await.unwrap();
3731        assert!(!exec_id.0.is_empty());
3732    }
3733
3734    #[tokio::test]
3735    async fn test_service_manager_enable_disable_cron() {
3736        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3737        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3738        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3739
3740        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3741
3742        // Register cron job
3743        let cron_spec = mock_cron_spec();
3744        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3745            .await
3746            .unwrap();
3747
3748        // Initially enabled
3749        let cron_jobs = manager.list_cron_jobs().await;
3750        assert!(cron_jobs[0].enabled);
3751
3752        // Disable
3753        manager.set_cron_enabled("cleanup", false).await;
3754        let cron_jobs = manager.list_cron_jobs().await;
3755        assert!(!cron_jobs[0].enabled);
3756
3757        // Re-enable
3758        manager.set_cron_enabled("cleanup", true).await;
3759        let cron_jobs = manager.list_cron_jobs().await;
3760        assert!(cron_jobs[0].enabled);
3761    }
3762
3763    #[tokio::test]
3764    async fn test_service_manager_remove_cleans_up_job() {
3765        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3766        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3767
3768        let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
3769
3770        // Register job
3771        let job_spec = mock_job_spec();
3772        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3773            .await
3774            .unwrap();
3775
3776        // Verify job is registered
3777        let spec = job_executor.get_job_spec("backup").await;
3778        assert!(spec.is_some());
3779
3780        // Remove job
3781        manager.remove_service("backup").await.unwrap();
3782
3783        // Verify job is unregistered
3784        let spec = job_executor.get_job_spec("backup").await;
3785        assert!(spec.is_none());
3786    }
3787
3788    #[tokio::test]
3789    async fn test_service_manager_remove_cleans_up_cron() {
3790        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3791        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3792        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3793
3794        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
3795
3796        // Register cron job
3797        let cron_spec = mock_cron_spec();
3798        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3799            .await
3800            .unwrap();
3801
3802        // Verify cron job is registered
3803        assert_eq!(cron_scheduler.job_count().await, 1);
3804
3805        // Remove cron job
3806        manager.remove_service("cleanup").await.unwrap();
3807
3808        // Verify cron job is unregistered
3809        assert_eq!(cron_scheduler.job_count().await, 0);
3810    }
3811
3812    #[tokio::test]
3813    async fn test_service_manager_job_without_executor() {
3814        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3815        let manager = ServiceManager::new(runtime);
3816
3817        // Try to trigger job without executor configured
3818        let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
3819        assert!(result.is_err());
3820        assert!(result.unwrap_err().to_string().contains("not configured"));
3821    }
3822
3823    #[tokio::test]
3824    async fn test_service_manager_cron_without_scheduler() {
3825        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3826        let manager = ServiceManager::new(runtime);
3827
3828        // Try to register cron job without scheduler configured
3829        let cron_spec = mock_cron_spec();
3830        let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
3831        assert!(result.is_err());
3832        assert!(result.unwrap_err().to_string().contains("not configured"));
3833    }
3834
3835    #[tokio::test]
3836    async fn test_service_manager_list_job_executions() {
3837        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3838        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3839
3840        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3841
3842        // Register job
3843        let job_spec = mock_job_spec();
3844        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3845            .await
3846            .unwrap();
3847
3848        // Trigger job twice
3849        manager
3850            .trigger_job("backup", JobTrigger::Cli)
3851            .await
3852            .unwrap();
3853        manager
3854            .trigger_job("backup", JobTrigger::Scheduler)
3855            .await
3856            .unwrap();
3857
3858        // Give jobs time to start
3859        tokio::time::sleep(Duration::from_millis(50)).await;
3860
3861        // List executions
3862        let executions = manager.list_job_executions("backup").await;
3863        assert_eq!(executions.len(), 2);
3864    }
3865
3866    // ==================== Container Supervisor Integration Tests ====================
3867
3868    #[tokio::test]
3869    async fn test_service_manager_with_supervisor() {
3870        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3871        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3872
3873        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
3874
3875        // Add service
3876        let spec = mock_spec();
3877        Box::pin(manager.upsert_service("api".to_string(), spec))
3878            .await
3879            .unwrap();
3880
3881        // Scale up - containers should be registered with supervisor
3882        manager.scale_service("api", 2).await.unwrap();
3883
3884        // Verify containers are supervised
3885        assert_eq!(supervisor.supervised_count().await, 2);
3886
3887        // Scale down - containers should be unregistered
3888        manager.scale_service("api", 1).await.unwrap();
3889        assert_eq!(supervisor.supervised_count().await, 1);
3890
3891        // Remove service - remaining containers should be unregistered
3892        manager.remove_service("api").await.unwrap();
3893        assert_eq!(supervisor.supervised_count().await, 0);
3894    }
3895
3896    #[tokio::test]
3897    async fn test_service_manager_supervisor_state() {
3898        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3899        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3900
3901        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
3902
3903        // Add and scale service
3904        let spec = mock_spec();
3905        Box::pin(manager.upsert_service("web".to_string(), spec))
3906            .await
3907            .unwrap();
3908        manager.scale_service("web", 1).await.unwrap();
3909
3910        // Check supervised state
3911        let container_id = ContainerId::new("web".to_string(), 1);
3912        let state = manager.get_container_supervised_state(&container_id).await;
3913        assert_eq!(state, Some(SupervisedState::Running));
3914    }
3915
3916    #[tokio::test]
3917    async fn test_service_manager_start_supervisor() {
3918        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3919        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3920
3921        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
3922
3923        // Start the supervisor
3924        let handle = manager.start_container_supervisor().unwrap();
3925
3926        // Give it time to start
3927        tokio::time::sleep(Duration::from_millis(50)).await;
3928        assert!(supervisor.is_running());
3929
3930        // Shutdown
3931        manager.shutdown_container_supervisor();
3932
3933        // Wait for it to stop
3934        tokio::time::timeout(Duration::from_secs(1), handle)
3935            .await
3936            .unwrap()
3937            .unwrap();
3938
3939        assert!(!supervisor.is_running());
3940    }
3941
3942    #[tokio::test]
3943    async fn test_service_manager_supervisor_not_configured() {
3944        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3945        let manager = ServiceManager::new(runtime);
3946
3947        // Try to start supervisor without configuring it
3948        let result = manager.start_container_supervisor();
3949        assert!(result.is_err());
3950        assert!(result.unwrap_err().to_string().contains("not configured"));
3951    }
3952
3953    // ==================== Stream Registry Integration Tests ====================
3954
3955    fn mock_tcp_spec() -> ServiceSpec {
3956        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3957            r"
3958version: v1
3959deployment: test
3960services:
3961  database:
3962    rtype: service
3963    image:
3964      name: postgres:latest
3965    endpoints:
3966      - name: postgresql
3967        protocol: tcp
3968        port: 5432
3969    scale:
3970      mode: fixed
3971      replicas: 1
3972",
3973        )
3974        .unwrap()
3975        .services
3976        .remove("database")
3977        .unwrap()
3978    }
3979
3980    fn mock_udp_spec() -> ServiceSpec {
3981        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3982            r"
3983version: v1
3984deployment: test
3985services:
3986  dns:
3987    rtype: service
3988    image:
3989      name: dns:latest
3990    endpoints:
3991      - name: dns
3992        protocol: udp
3993        port: 53
3994    scale:
3995      mode: fixed
3996      replicas: 1
3997",
3998        )
3999        .unwrap()
4000        .services
4001        .remove("dns")
4002        .unwrap()
4003    }
4004
4005    fn mock_mixed_spec() -> ServiceSpec {
4006        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4007            r"
4008version: v1
4009deployment: test
4010services:
4011  mixed:
4012    rtype: service
4013    image:
4014      name: mixed:latest
4015    endpoints:
4016      - name: http
4017        protocol: http
4018        port: 8080
4019      - name: grpc
4020        protocol: tcp
4021        port: 9000
4022      - name: metrics
4023        protocol: udp
4024        port: 8125
4025    scale:
4026      mode: fixed
4027      replicas: 1
4028",
4029        )
4030        .unwrap()
4031        .services
4032        .remove("mixed")
4033        .unwrap()
4034    }
4035
4036    #[tokio::test]
4037    async fn test_service_manager_with_stream_registry_tcp() {
4038        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4039        let stream_registry = Arc::new(StreamRegistry::new());
4040
4041        let mut manager = ServiceManager::new(runtime);
4042        manager.set_stream_registry(stream_registry.clone());
4043        manager.set_deployment_name("test".to_string());
4044
4045        // Add TCP-only service
4046        let spec = mock_tcp_spec();
4047        Box::pin(manager.upsert_service("database".to_string(), spec))
4048            .await
4049            .unwrap();
4050
4051        // Verify TCP route was registered
4052        assert_eq!(stream_registry.tcp_count(), 1);
4053        assert!(stream_registry.tcp_ports().contains(&5432));
4054
4055        // Remove service and verify cleanup
4056        manager.remove_service("database").await.unwrap();
4057        assert_eq!(stream_registry.tcp_count(), 0);
4058    }
4059
4060    #[tokio::test]
4061    async fn test_service_manager_with_stream_registry_udp() {
4062        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4063        let stream_registry = Arc::new(StreamRegistry::new());
4064
4065        let mut manager = ServiceManager::new(runtime);
4066        manager.set_stream_registry(stream_registry.clone());
4067        manager.set_deployment_name("test".to_string());
4068
4069        // Add UDP-only service
4070        let spec = mock_udp_spec();
4071        Box::pin(manager.upsert_service("dns".to_string(), spec))
4072            .await
4073            .unwrap();
4074
4075        // Verify UDP route was registered
4076        assert_eq!(stream_registry.udp_count(), 1);
4077        assert!(stream_registry.udp_ports().contains(&53));
4078
4079        // Remove service and verify cleanup
4080        manager.remove_service("dns").await.unwrap();
4081        assert_eq!(stream_registry.udp_count(), 0);
4082    }
4083
4084    #[tokio::test]
4085    async fn test_service_manager_with_stream_registry_mixed() {
4086        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4087        let stream_registry = Arc::new(StreamRegistry::new());
4088
4089        let mut manager = ServiceManager::new(runtime);
4090        manager.set_stream_registry(stream_registry.clone());
4091        manager.set_deployment_name("test".to_string());
4092
4093        // Add mixed service (HTTP + TCP + UDP)
4094        let spec = mock_mixed_spec();
4095        Box::pin(manager.upsert_service("mixed".to_string(), spec))
4096            .await
4097            .unwrap();
4098
4099        // Verify stream routes were registered
4100        assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
4101        assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
4102
4103        assert!(stream_registry.tcp_ports().contains(&9000));
4104        assert!(stream_registry.udp_ports().contains(&8125));
4105
4106        // Remove service and verify stream cleanup
4107        manager.remove_service("mixed").await.unwrap();
4108        assert_eq!(stream_registry.tcp_count(), 0);
4109        assert_eq!(stream_registry.udp_count(), 0);
4110    }
4111
4112    #[tokio::test]
4113    async fn test_service_manager_stream_registry_builder() {
4114        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4115        let stream_registry = Arc::new(StreamRegistry::new());
4116
4117        // Test builder pattern
4118        let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
4119
4120        // Verify stream registry is accessible
4121        assert!(manager.stream_registry().is_some());
4122    }
4123
4124    #[tokio::test]
4125    async fn test_tcp_service_without_stream_registry() {
4126        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4127
4128        // Manager without stream registry
4129        let mut manager = ServiceManager::new(runtime);
4130        manager.set_deployment_name("test".to_string());
4131
4132        // Add TCP service - should log warning but not fail
4133        let spec = mock_tcp_spec();
4134        Box::pin(manager.upsert_service("database".to_string(), spec))
4135            .await
4136            .unwrap();
4137
4138        // No stream registry to check, but service should be tracked
4139        let services = manager.list_services().await;
4140        assert!(services.contains(&"database".to_string()));
4141    }
4142
4143    /// Verify `collect_endpoint_backends` filters containers by
4144    /// `EndpointSpec.target_role`.
4145    ///
4146    /// Given two replica groups (`primary` × 1, `read` × 2) and two
4147    /// endpoints — one with `target_role: primary` and one with
4148    /// `target_role: read` — each endpoint should receive only the
4149    /// matching containers' overlay addresses. The legacy no-filter
4150    /// endpoint (`target_role: None`) should receive all of them.
4151    #[tokio::test]
4152    #[allow(clippy::too_many_lines)]
4153    async fn test_collect_endpoint_backends_respects_target_role() {
4154        use crate::runtime::Container;
4155        use std::collections::HashMap as StdHashMap;
4156        use std::net::{IpAddr, Ipv4Addr};
4157        use zlayer_spec::{
4158            EndpointSpec, ExposeType, GroupAffinity, Protocol, ReplicaGroup, ScaleSpec,
4159        };
4160
4161        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4162        let manager = ServiceManager::new(runtime.clone());
4163
4164        // Build a spec with replica_groups and three endpoints:
4165        // - "write" targets role "primary"
4166        // - "read" targets role "read"
4167        // - "any" has no target_role (legacy)
4168        let mut spec = mock_spec();
4169        spec.replica_groups = Some(vec![
4170            ReplicaGroup {
4171                role: "primary".to_string(),
4172                count: 1,
4173                image: None,
4174                env: StdHashMap::new(),
4175                command: None,
4176                resources: None,
4177                affinity: GroupAffinity::default(),
4178            },
4179            ReplicaGroup {
4180                role: "read".to_string(),
4181                count: 2,
4182                image: None,
4183                env: StdHashMap::new(),
4184                command: None,
4185                resources: None,
4186                affinity: GroupAffinity::default(),
4187            },
4188        ]);
4189        spec.scale = ScaleSpec::Fixed { replicas: 3 };
4190        spec.endpoints = vec![
4191            EndpointSpec {
4192                name: "write".to_string(),
4193                protocol: Protocol::Tcp,
4194                port: 5432,
4195                target_port: Some(5432),
4196                path: None,
4197                host: None,
4198                expose: ExposeType::Internal,
4199                stream: None,
4200                tunnel: None,
4201                target_role: Some("primary".to_string()),
4202            },
4203            EndpointSpec {
4204                name: "read".to_string(),
4205                protocol: Protocol::Tcp,
4206                port: 5433,
4207                target_port: Some(5432),
4208                path: None,
4209                host: None,
4210                expose: ExposeType::Internal,
4211                stream: None,
4212                tunnel: None,
4213                target_role: Some("read".to_string()),
4214            },
4215            EndpointSpec {
4216                name: "any".to_string(),
4217                protocol: Protocol::Tcp,
4218                port: 5434,
4219                target_port: Some(5432),
4220                path: None,
4221                host: None,
4222                expose: ExposeType::Internal,
4223                stream: None,
4224                tunnel: None,
4225                target_role: None,
4226            },
4227        ];
4228
4229        let instance = ServiceInstance::new(
4230            "postgres".to_string(),
4231            spec.clone(),
4232            runtime,
4233            None, // overlay_manager — not exercised by this test
4234        );
4235
4236        // Inject three containers directly: one primary, two read replicas.
4237        let cid_primary = ContainerId::with_role_and_node("postgres", 1, "primary", 0);
4238        let cid_first_read = ContainerId::with_role_and_node("postgres", 2, "read", 0);
4239        let cid_second_read = ContainerId::with_role_and_node("postgres", 3, "read", 0);
4240
4241        let ip_primary = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
4242        let ip_first_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2));
4243        let ip_second_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3));
4244
4245        {
4246            let mut containers = instance.containers().write().await;
4247            containers.insert(
4248                cid_primary.clone(),
4249                Container {
4250                    id: cid_primary,
4251                    image: spec.image.name.to_string(),
4252                    state: crate::runtime::ContainerState::Running,
4253                    pid: None,
4254                    task: None,
4255                    overlay_ip: Some(ip_primary),
4256                    health_monitor: None,
4257                    port_override: None,
4258                },
4259            );
4260            containers.insert(
4261                cid_first_read.clone(),
4262                Container {
4263                    id: cid_first_read,
4264                    image: spec.image.name.to_string(),
4265                    state: crate::runtime::ContainerState::Running,
4266                    pid: None,
4267                    task: None,
4268                    overlay_ip: Some(ip_first_read),
4269                    health_monitor: None,
4270                    port_override: None,
4271                },
4272            );
4273            containers.insert(
4274                cid_second_read.clone(),
4275                Container {
4276                    id: cid_second_read,
4277                    image: spec.image.name.to_string(),
4278                    state: crate::runtime::ContainerState::Running,
4279                    pid: None,
4280                    task: None,
4281                    overlay_ip: Some(ip_second_read),
4282                    health_monitor: None,
4283                    port_override: None,
4284                },
4285            );
4286        }
4287
4288        let write_ep = &spec.endpoints[0];
4289        let read_ep = &spec.endpoints[1];
4290        let any_ep = &spec.endpoints[2];
4291
4292        let write_backends = manager.collect_endpoint_backends(&instance, write_ep).await;
4293        let read_backends = manager.collect_endpoint_backends(&instance, read_ep).await;
4294        let any_backends = manager.collect_endpoint_backends(&instance, any_ep).await;
4295
4296        // write endpoint -> only the primary container
4297        assert_eq!(write_backends.len(), 1, "write should match only primary");
4298        assert!(
4299            write_backends.iter().any(|a| a.ip() == ip_primary),
4300            "write backends missing primary IP: {write_backends:?}"
4301        );
4302
4303        // read endpoint -> both read containers, no primary
4304        assert_eq!(
4305            read_backends.len(),
4306            2,
4307            "read should match both read replicas"
4308        );
4309        assert!(read_backends.iter().any(|a| a.ip() == ip_first_read));
4310        assert!(read_backends.iter().any(|a| a.ip() == ip_second_read));
4311        assert!(
4312            !read_backends.iter().any(|a| a.ip() == ip_primary),
4313            "read backends must not contain primary: {read_backends:?}"
4314        );
4315
4316        // legacy endpoint (target_role = None) -> every container
4317        assert_eq!(
4318            any_backends.len(),
4319            3,
4320            "any-role endpoint should see all containers"
4321        );
4322    }
4323}