Skip to main content

zlayer_agent/
service.rs

1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6    DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType, ServiceSpec};
24
25/// Service instance manages a single service's containers
26pub struct ServiceInstance {
27    pub service_name: String,
28    pub spec: ServiceSpec,
29    runtime: Arc<dyn Runtime + Send + Sync>,
30    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31    /// Overlay network manager for container networking (optional, not needed for Docker runtime)
32    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33    /// Proxy manager for updating backend health (optional)
34    proxy_manager: Option<Arc<ProxyManager>>,
35    /// DNS server for service discovery (optional)
36    dns_server: Option<Arc<DnsServer>>,
37    /// Shared health states map so callbacks can update ServiceManager-level health
38    health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
39    /// Most recently observed image digest after a successful pull. Used by
40    /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
41    /// requiring callers to track digest state externally. Wrapped in a
42    /// `RwLock` so `&self` methods (`scale_to`) can update it.
43    last_pulled_digest: tokio::sync::RwLock<Option<String>>,
44    /// Local cluster node id used when constructing new `ContainerId`s during
45    /// scale-up. `0` in single-node deployments or when the cluster handle is
46    /// not yet wired. Populated by `ServiceManager` from `Cluster::node_id()`
47    /// at instance construction time.
48    node_id: u64,
49}
50
51impl ServiceInstance {
52    /// Create a new service instance
53    pub fn new(
54        service_name: String,
55        spec: ServiceSpec,
56        runtime: Arc<dyn Runtime + Send + Sync>,
57        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
58    ) -> Self {
59        Self {
60            service_name,
61            spec,
62            runtime,
63            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
64            overlay_manager,
65            proxy_manager: None,
66            dns_server: None,
67            health_states: None,
68            last_pulled_digest: tokio::sync::RwLock::new(None),
69            node_id: 0,
70        }
71    }
72
73    /// Create a new service instance with proxy manager for health-aware load balancing
74    pub fn with_proxy(
75        service_name: String,
76        spec: ServiceSpec,
77        runtime: Arc<dyn Runtime + Send + Sync>,
78        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
79        proxy_manager: Arc<ProxyManager>,
80    ) -> Self {
81        Self {
82            service_name,
83            spec,
84            runtime,
85            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
86            overlay_manager,
87            proxy_manager: Some(proxy_manager),
88            dns_server: None,
89            health_states: None,
90            last_pulled_digest: tokio::sync::RwLock::new(None),
91            node_id: 0,
92        }
93    }
94
95    /// Set the local cluster node id. Used by `ServiceManager` to thread
96    /// `Cluster::node_id()` down to container construction so new
97    /// `ContainerId`s carry the owning node identity. Defaults to `0` (the
98    /// single-node sentinel) when unset.
99    pub fn set_node_id(&mut self, node_id: u64) {
100        self.node_id = node_id;
101    }
102
103    /// Derive the replica group role for a 1-based `replica_idx`.
104    ///
105    /// When `spec.replica_groups` is unset, returns `"default"` (the implicit
106    /// single-group case). Otherwise walks groups in declaration order,
107    /// accumulating each group's `count` until `replica_idx` falls within the
108    /// current group's range, and returns that group's `role`.
109    ///
110    /// Replicas beyond the declared total fall back to `"default"`.
111    #[must_use]
112    pub fn role_for_replica(&self, replica_idx: u32) -> String {
113        let Some(groups) = self.spec.replica_groups.as_ref() else {
114            return "default".to_string();
115        };
116        let mut cumulative = 0u32;
117        for group in groups {
118            cumulative = cumulative.saturating_add(group.count);
119            if replica_idx <= cumulative {
120                return group.role.clone();
121            }
122        }
123        "default".to_string()
124    }
125
126    /// Builder method to add DNS server for service discovery
127    #[must_use]
128    pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
129        self.dns_server = Some(dns_server);
130        self
131    }
132
133    /// Set the DNS server for service discovery
134    pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
135        self.dns_server = Some(dns_server);
136    }
137
138    /// Set the proxy manager for health-aware load balancing
139    pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
140        self.proxy_manager = Some(proxy_manager);
141    }
142
143    /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
144    pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
145        self.health_states = Some(states);
146    }
147
148    /// Get the last observed image digest (after the most recent successful
149    /// pull). Returns `None` when no pull has happened yet, when the runtime
150    /// does not expose digests, or when no matching `ImageInfo` was found.
151    pub async fn last_pulled_digest(&self) -> Option<String> {
152        self.last_pulled_digest.read().await.clone()
153    }
154
155    /// Pull the service image using the spec's pull policy (literal Docker /
156    /// Kubernetes semantics — no silent auto-upgrade of `IfNotPresent` to
157    /// `Newer` for `:latest` tags) and refresh the cached digest from
158    /// `Runtime::list_images` when the runtime exposes it. Returns the digest
159    /// observed after the pull, when known.
160    ///
161    /// For `Never`, the runtime is still called so it can load the image
162    /// config from the local cache (without any remote round-trip); only the
163    /// remote digest refresh is skipped. Without this call the bundle builder
164    /// has no image entrypoint/cmd and falls back to `/bin/sh`.
165    async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
166        let image_str = self.spec.image.name.to_string();
167        let policy = self.spec.image.pull_policy;
168
169        self.runtime
170            .pull_image_with_policy(
171                &image_str,
172                policy,
173                None,
174                self.spec.image.source_policy.unwrap_or_default(),
175            )
176            .await
177            .map_err(|e| AgentError::PullFailed {
178                image: self.spec.image.name.to_string(),
179                reason: e.to_string(),
180            })?;
181
182        // Best-effort: try to discover the resolved digest via list_images.
183        // Runtimes that don't support introspection (Unsupported) leave the
184        // cached digest unchanged; drift detection then falls back to "always
185        // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
186        // when no digests are known".
187        let new_digest = match self.runtime.list_images().await {
188            Ok(images) => images
189                .into_iter()
190                .find(|info| info.reference == image_str)
191                .and_then(|info| info.digest),
192            Err(e) => {
193                tracing::debug!(
194                    image = %image_str,
195                    error = %e,
196                    "list_images unavailable; cannot record post-pull digest"
197                );
198                None
199            }
200        };
201
202        if let Some(ref digest) = new_digest {
203            *self.last_pulled_digest.write().await = Some(digest.clone());
204        }
205
206        Ok(new_digest)
207    }
208
209    /// Scale to the desired number of replicas
210    ///
211    /// This method uses short-lived locks to avoid blocking concurrent operations.
212    /// I/O operations (pull, create, start, stop, remove) are performed without
213    /// holding the containers lock to allow other operations to proceed.
214    ///
215    /// # Errors
216    /// Returns an error if image pull, container creation, or container lifecycle operations fail.
217    #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
218    pub async fn scale_to(&self, replicas: u32) -> Result<()> {
219        // Phase 1: Determine current state (short read lock)
220        let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
221
222        // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
223        // here with replicas == current_replicas in the steady state) actually
224        // refreshes the cached digest. We skip the call only when scaling
225        // strictly down (no new containers needed). For `Never` the runtime
226        // still needs to load the image config from the local cache so the
227        // bundle builder gets entrypoint/cmd/env — without it the container
228        // falls back to `/bin/sh` and exits instantly. `pull_and_refresh_digest`
229        // itself handles the Never case (no remote round-trip, cache-only).
230        if replicas >= current_replicas {
231            let _ = self.pull_and_refresh_digest().await?;
232        }
233
234        // Phase 2: Scale up - create new containers (no lock held during I/O)
235        //
236        // Compute (role, replica_index) tuples for each new replica. When
237        // `spec.replica_groups` is set, expand groups in declaration order so
238        // each created replica maps to its declared `(role, intra_group_index)`.
239        // Otherwise fall back to the implicit single "default" group. The
240        // `local_node_id` is captured once so every new `ContainerId` carries
241        // the owning node identity for cross-node disambiguation.
242        let local_node_id = self.node_id;
243        if replicas > current_replicas {
244            let replica_specs: Vec<(String, u32)> =
245                if let Some(groups) = self.spec.replica_groups.as_ref() {
246                    let mut specs: Vec<(String, u32)> = Vec::new();
247                    for group in groups {
248                        for idx in 0..group.count {
249                            specs.push((group.role.clone(), idx + 1));
250                        }
251                    }
252                    specs
253                        .into_iter()
254                        .skip(current_replicas as usize)
255                        .take((replicas - current_replicas) as usize)
256                        .collect()
257                } else {
258                    (current_replicas..replicas)
259                        .map(|i| ("default".to_string(), i + 1))
260                        .collect()
261                };
262
263            for (role, replica_idx) in replica_specs {
264                let id = ContainerId::with_role_and_node(
265                    self.service_name.clone(),
266                    replica_idx,
267                    role,
268                    local_node_id,
269                );
270
271                // Create container (no lock needed - I/O operation)
272                //
273                // RouteToPeer must propagate unchanged: the scheduler uses it
274                // to re-place the workload on a capable peer, and wrapping it
275                // in `CreateFailed` would hide the signal and mark the service
276                // dead instead of rescheduling it. All other errors are
277                // normalised to `CreateFailed` for upstream handling.
278                self.runtime
279                    .create_container(&id, &self.spec)
280                    .await
281                    .map_err(|e| match e {
282                        AgentError::RouteToPeer { .. } => e,
283                        other => AgentError::CreateFailed {
284                            id: id.to_string(),
285                            reason: other.to_string(),
286                        },
287                    })?;
288
289                // Run init actions with error policy enforcement (no lock needed)
290                let init_orchestrator = InitOrchestrator::with_error_policy(
291                    id.clone(),
292                    self.spec.init.clone(),
293                    self.spec.errors.clone(),
294                );
295                init_orchestrator.run().await?;
296
297                // Start container (no lock needed - I/O operation)
298                self.runtime
299                    .start_container(&id)
300                    .await
301                    .map_err(|e| AgentError::StartFailed {
302                        id: id.to_string(),
303                        reason: e.to_string(),
304                    })?;
305
306                // Get container PID with retries (may not be immediately available)
307                let mut container_pid = None;
308                for attempt in 1..=5u32 {
309                    match self.runtime.get_container_pid(&id).await {
310                        Ok(Some(pid)) => {
311                            container_pid = Some(pid);
312                            break;
313                        }
314                        Ok(None) if attempt < 5 => {
315                            tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
316                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
317                        }
318                        Ok(None) => {
319                            tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
320                        }
321                        Err(e) => {
322                            tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
323                            if attempt < 5 {
324                                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
325                            }
326                        }
327                    }
328                }
329
330                // Verify the container is still running before attempting
331                // overlay attach. If the init process crashed during start
332                // (bad image, missing libs, failed mount), the PID above is
333                // now dead and every `ip link set ... netns {pid}` will
334                // return a cryptic RTNETLINK error. Surface the real cause
335                // from the container's log tail instead of the cascade.
336                if container_pid.is_some() {
337                    let alive = match self.runtime.container_state(&id).await {
338                        Ok(
339                            ContainerState::Running
340                            | ContainerState::Pending
341                            | ContainerState::Initializing,
342                        ) => true,
343                        Ok(state) => {
344                            tracing::warn!(
345                                container = %id,
346                                ?state,
347                                "container exited before overlay attach could run"
348                            );
349                            false
350                        }
351                        Err(e) => {
352                            // State query failed — don't block the attach on
353                            // it. The overlay manager's own cleanup-on-error
354                            // path now handles the dead-PID case cleanly.
355                            tracing::warn!(
356                                container = %id,
357                                error = %e,
358                                "container state query failed before overlay attach, proceeding"
359                            );
360                            true
361                        }
362                    };
363                    if !alive {
364                        let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
365                            || "  <log read failed>".to_string(),
366                            |entries| {
367                                if entries.is_empty() {
368                                    "  <no log output>".to_string()
369                                } else {
370                                    entries
371                                        .into_iter()
372                                        .map(|e| format!("  {}", e.message))
373                                        .collect::<Vec<_>>()
374                                        .join("\n")
375                                }
376                            },
377                        );
378                        return Err(AgentError::StartFailed {
379                            id: id.to_string(),
380                            reason: format!("container exited during startup:\n{log_tail}"),
381                        });
382                    }
383                }
384
385                // Attach to overlay network if manager is available.
386                //
387                // Linux uses the container PID to enter the netns and attach a
388                // veth. Windows has no PID-addressable netns — the HCN namespace
389                // GUID (obtained from `get_container_namespace_id`) is used
390                // instead, and the endpoint's IP has already been populated by
391                // `EndpointAttachment::create_overlay` during container creation.
392                // We simply register that IP with the slice allocator so host
393                // accounting stays in sync.
394                let overlay_ip = if let Some(overlay) = &self.overlay_manager {
395                    let overlay_guard = overlay.read().await;
396                    #[cfg(target_os = "windows")]
397                    let attach_result: Option<std::net::IpAddr> = {
398                        // On Windows the overlay attach (HCN endpoint + per-container
399                        // namespace creation, via overlayd) already happened inside
400                        // `HcsRuntime::create_container`. Here we only need the IP it
401                        // assigned so we can register DNS for service discovery.
402                        let _ = (container_pid, &overlay_guard); // unused on Windows
403                        match self.runtime.get_container_ip(&id).await {
404                            Ok(Some(ip)) => Some(ip),
405                            Ok(None) => {
406                                tracing::debug!(
407                                    container = %id,
408                                    "no overlay IP recorded for container (overlay attach skipped at create time)"
409                                );
410                                None
411                            }
412                            Err(e) => {
413                                tracing::warn!(
414                                    container = %id,
415                                    error = %e,
416                                    "failed to fetch container overlay IP"
417                                );
418                                None
419                            }
420                        }
421                    };
422                    #[cfg(not(target_os = "windows"))]
423                    let attach_result: Option<std::net::IpAddr> = {
424                        match self.runtime.overlay_attach_kind() {
425                            // VM guest (macOS VZ-Linux): no host netns/PID, so
426                            // overlayd allocates the overlay identity and we push
427                            // it into the guest over vsock, where it brings up its
428                            // own kernel WireGuard device.
429                            crate::runtime::OverlayAttachKind::InGuestVsock => {
430                                let cid = id.to_string();
431                                match overlay_guard
432                                    .attach_container_guest(&cid, &self.service_name, true)
433                                    .await
434                                {
435                                    Ok(cfg) => {
436                                        let ip = cfg.overlay_ip;
437                                        match self.runtime.push_overlay_config(&id, &cfg).await {
438                                            Ok(()) => Some(ip),
439                                            Err(e) => {
440                                                tracing::warn!(
441                                                    container = %id,
442                                                    error = %e,
443                                                    "failed to push overlay config into guest; rolling back allocation"
444                                                );
445                                                // Don't leak the overlayd IP/peer.
446                                                if let Err(de) =
447                                                    overlay_guard.detach_container_guest(&cid).await
448                                                {
449                                                    tracing::warn!(
450                                                        container = %id,
451                                                        error = %de,
452                                                        "failed to roll back guest overlay allocation"
453                                                    );
454                                                }
455                                                None
456                                            }
457                                        }
458                                    }
459                                    Err(e) => {
460                                        tracing::warn!(
461                                            container = %id,
462                                            error = %e,
463                                            "failed to allocate guest overlay config from overlayd"
464                                        );
465                                        None
466                                    }
467                                }
468                            }
469                            // Host-process runtimes (Linux youki): plumb a veth
470                            // into the container's netns by PID.
471                            _ => {
472                                if let Some(pid) = container_pid {
473                                    match overlay_guard
474                                        .attach_container(pid, &self.service_name, true)
475                                        .await
476                                    {
477                                        Ok(ip) => Some(ip),
478                                        Err(e) => {
479                                            tracing::warn!(
480                                                container = %id,
481                                                error = %e,
482                                                "failed to attach container to overlay network"
483                                            );
484                                            None
485                                        }
486                                    }
487                                } else {
488                                    // No PID available (e.g. WASM runtime) - skip overlay attachment
489                                    tracing::debug!(
490                                        container = %id,
491                                        "skipping overlay attachment - no PID available"
492                                    );
493                                    None
494                                }
495                            }
496                        }
497                    };
498
499                    if let Some(ip) = attach_result {
500                        tracing::info!(
501                            container = %id,
502                            overlay_ip = %ip,
503                            "attached container to overlay network"
504                        );
505
506                        // Register DNS for service discovery
507                        if let Some(dns) = &self.dns_server {
508                            // Register the BARE compose service name ({service}) so
509                            // sibling containers can resolve each other by the name
510                            // docker-compose uses (e.g. `postgres`), not just the
511                            // FQDN. This is what `FORGEJO__database__HOST=postgres`
512                            // and every other compose service reference depends on.
513                            // Multiple replicas upsert the same name; the in-memory
514                            // authority keeps the most recent A record (round-robin
515                            // is not required for the single-replica compose case).
516                            match dns.add_record(&self.service_name, ip).await {
517                                Ok(()) => tracing::debug!(
518                                    hostname = %self.service_name,
519                                    ip = %ip,
520                                    "registered bare service-name DNS (compose discovery)"
521                                ),
522                                Err(e) => tracing::warn!(
523                                    hostname = %self.service_name,
524                                    error = %e,
525                                    "failed to register bare service-name DNS"
526                                ),
527                            }
528
529                            // Register service-level hostname: {service}.service.local
530                            let service_hostname = format!("{}.service.local", self.service_name);
531
532                            // Register replica-specific hostname: {replica}.{service}.service.local
533                            let replica_hostname =
534                                format!("{}.{}.service.local", id.replica, self.service_name);
535
536                            match dns.add_record(&service_hostname, ip).await {
537                                Ok(()) => tracing::debug!(
538                                    hostname = %service_hostname,
539                                    ip = %ip,
540                                    "registered DNS for service"
541                                ),
542                                Err(e) => tracing::warn!(
543                                    hostname = %service_hostname,
544                                    error = %e,
545                                    "failed to register DNS for service"
546                                ),
547                            }
548
549                            // Also register replica-specific entry
550                            if let Err(e) = dns.add_record(&replica_hostname, ip).await {
551                                tracing::warn!(
552                                    hostname = %replica_hostname,
553                                    error = %e,
554                                    "failed to register replica DNS"
555                                );
556                            } else {
557                                tracing::debug!(
558                                    hostname = %replica_hostname,
559                                    ip = %ip,
560                                    "registered DNS for replica"
561                                );
562                            }
563
564                            // Per-role DNS: register `{role}.{service}.service.local` when
565                            // this container belongs to a non-default replica group. Lets
566                            // intra-cluster clients reach a specific group (e.g.
567                            // `read.db.service.local` for the read replicas of a postgres
568                            // service with primary+read replica groups).
569                            if id.role != "default" {
570                                let role_hostname =
571                                    format!("{}.{}.service.local", id.role, self.service_name);
572                                match dns.add_record(&role_hostname, ip).await {
573                                    Ok(()) => tracing::debug!(
574                                        hostname = %role_hostname,
575                                        ip = %ip,
576                                        role = %id.role,
577                                        "registered DNS for replica group role"
578                                    ),
579                                    Err(e) => tracing::warn!(
580                                        hostname = %role_hostname,
581                                        error = %e,
582                                        "failed to register role DNS"
583                                    ),
584                                }
585                            }
586                        }
587
588                        Some(ip)
589                    } else {
590                        None
591                    }
592                } else {
593                    None
594                };
595
596                // If overlay failed, try the container runtime's own IP as fallback
597                let effective_ip = if overlay_ip.is_none() {
598                    match self.runtime.get_container_ip(&id).await {
599                        Ok(Some(ip)) => {
600                            tracing::info!(
601                                container = %id,
602                                ip = %ip,
603                                "using runtime container IP for proxy (overlay unavailable)"
604                            );
605                            Some(ip)
606                        }
607                        Ok(None) => {
608                            tracing::warn!(
609                                container = %id,
610                                "no container IP available from runtime, proxy routing will be unavailable"
611                            );
612                            None
613                        }
614                        Err(e) => {
615                            tracing::warn!(
616                                container = %id,
617                                error = %e,
618                                "failed to get container IP from runtime"
619                            );
620                            None
621                        }
622                    }
623                } else {
624                    overlay_ip
625                };
626
627                tracing::info!(
628                    container = %id,
629                    service = %self.service_name,
630                    overlay_ip = ?overlay_ip,
631                    effective_ip = ?effective_ip,
632                    "Container IP resolution complete"
633                );
634
635                // Query port override from the runtime.
636                // On macOS sandbox, each container is assigned a unique port since
637                // all processes share the host network (no network namespaces).
638                // The runtime passes the port to the process via the PORT env var.
639                let port_override = match self.runtime.get_container_port_override(&id).await {
640                    Ok(Some(port)) => {
641                        tracing::info!(
642                            container = %id,
643                            port = port,
644                            "runtime assigned dynamic port override for this container"
645                        );
646                        Some(port)
647                    }
648                    Ok(None) => None,
649                    Err(e) => {
650                        tracing::warn!(
651                            container = %id,
652                            error = %e,
653                            "failed to query port override from runtime, using spec port"
654                        );
655                        None
656                    }
657                };
658
659                // Start health monitoring and store handle (no lock needed during start)
660                let health_monitor_handle = {
661                    let mut check = self.spec.health.check.clone();
662
663                    // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
664                    // port the container is listening on. With mac-sandbox, each
665                    // replica gets a unique assigned port via port_override.
666                    if let HealthCheck::Tcp { ref mut port } = check {
667                        if *port == 0 {
668                            *port = port_override.unwrap_or_else(|| {
669                                self.spec
670                                    .endpoints
671                                    .iter()
672                                    .find(|ep| {
673                                        matches!(
674                                            ep.protocol,
675                                            Protocol::Http | Protocol::Https | Protocol::Websocket
676                                        )
677                                    })
678                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
679                            });
680                        }
681                    }
682
683                    let start_grace = self
684                        .spec
685                        .health
686                        .start_grace
687                        .unwrap_or(Duration::from_secs(5));
688                    let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
689                    let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
690                    let retries = self.spec.health.retries;
691
692                    let checker = HealthChecker::new(check, effective_ip);
693                    let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
694                        .with_start_grace(start_grace)
695                        .with_check_timeout(check_timeout);
696
697                    // Build the optional proxy backend handle. This is only present
698                    // when both a proxy manager AND a reachable overlay IP exist; in
699                    // degraded-overlay / no-proxy deployments it stays None and the
700                    // callback below skips all proxy work while STILL bridging health
701                    // state back into ServiceManager.
702                    let proxy_backend: Option<(Arc<ProxyManager>, SocketAddr)> =
703                        if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
704                            let proxy = Arc::clone(proxy);
705                            // Get the container's target port, using the runtime override if
706                            // present. On macOS sandbox, port_override gives each replica a
707                            // unique port so the proxy can distinguish backends sharing
708                            // 127.0.0.1.
709                            let port = port_override.unwrap_or_else(|| {
710                                self.spec
711                                    .endpoints
712                                    .iter()
713                                    .find(|ep| {
714                                        matches!(
715                                            ep.protocol,
716                                            Protocol::Http | Protocol::Https | Protocol::Websocket
717                                        )
718                                    })
719                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
720                            });
721
722                            let backend_addr = SocketAddr::new(ip, port);
723
724                            // Register backend with load balancer so proxy can route to it.
725                            // This must happen before the health callback is created, because
726                            // update_backend_health only updates *existing* backends.
727                            proxy.add_backend(&self.service_name, backend_addr).await;
728
729                            // Publish this container's exposed ports on the node
730                            // loopback (`127.0.0.1:<endpoint.port>`) so a consumer
731                            // sharing the node loopback can reach the service at
732                            // `localhost:<port>`. Gated on the spec's policy
733                            // (`Auto` publishes only for single-member services).
734                            // Uses the SAME runtime-resolved `ip`/`port_override`
735                            // as the backend above: on macOS each replica shares
736                            // 127.0.0.1 with a unique override; on Linux/VM the
737                            // overlay IP carries the declared target port.
738                            if self.spec.publish_to_node_loopback() {
739                                proxy
740                                    .publish_loopback_for_container(
741                                        &self.service_name,
742                                        &self.spec,
743                                        ip,
744                                        port_override,
745                                    )
746                                    .await;
747                            }
748
749                            Some((proxy, backend_addr))
750                        } else {
751                            None
752                        };
753
754                    // The health bridge is ALWAYS attached, independent of proxy/IP
755                    // availability. stabilization::wait_for_stabilization only treats a
756                    // service as ready when health_states[name] == Healthy, so this write
757                    // must happen even when the overlay is degraded and no proxy backend
758                    // exists — otherwise the service stays healthy=false forever and
759                    // stabilization times out.
760                    let health_states_opt = self.health_states.clone();
761                    let svc_name_for_states = self.service_name.clone();
762                    let svc_name_for_proxy = self.service_name.clone();
763                    let svc_name_for_log = self.service_name.clone();
764
765                    let health_callback: HealthCallback =
766                        Arc::new(move |container_id: ContainerId, is_healthy: bool| {
767                            tracing::info!(
768                                container = %container_id,
769                                service = %svc_name_for_log,
770                                healthy = is_healthy,
771                                has_proxy_backend = proxy_backend.is_some(),
772                                "health status changed"
773                            );
774
775                            // Always bridge health state back to ServiceManager's
776                            // health_states map (unconditional — no proxy/IP required).
777                            if let Some(ref health_states) = health_states_opt {
778                                let states = Arc::clone(health_states);
779                                let svc = svc_name_for_states.clone();
780                                tokio::spawn(async move {
781                                    let state = if is_healthy {
782                                        HealthState::Healthy
783                                    } else {
784                                        HealthState::Unhealthy {
785                                            failures: 0,
786                                            reason: "health check failed".into(),
787                                        }
788                                    };
789                                    states.write().await.insert(svc, state);
790                                });
791                            }
792
793                            // Update proxy backend health only when a proxy backend was
794                            // registered (proxy manager + reachable overlay IP present).
795                            if let Some((proxy, backend_addr)) = proxy_backend.clone() {
796                                let svc = svc_name_for_proxy.clone();
797                                tokio::spawn(async move {
798                                    proxy
799                                        .update_backend_health(&svc, backend_addr, is_healthy)
800                                        .await;
801                                });
802                            }
803                        });
804
805                    monitor = monitor.with_callback(health_callback);
806
807                    monitor.start()
808                };
809
810                // Update state (short write lock)
811                {
812                    let mut containers = self.containers.write().await;
813                    containers.insert(
814                        id.clone(),
815                        Container {
816                            id: id.clone(),
817                            image: self.spec.image.name.to_string(),
818                            state: ContainerState::Running,
819                            pid: None,
820                            task: None,
821                            overlay_ip: effective_ip,
822                            health_monitor: Some(health_monitor_handle),
823                            port_override,
824                        },
825                    );
826                } // Lock released here
827            }
828        }
829
830        // Phase 3: Scale down - remove containers (short write lock per removal)
831        //
832        // Containers were created with `with_role_and_node(role, local_node_id)`
833        // on scale-up, so we must reconstruct the same identity on scale-down
834        // — the role is derived from `replica_groups` via `role_for_replica`
835        // and the node id is the local cluster node. Mismatched ids would miss
836        // the live entry in `self.containers` and leak the container.
837        if replicas < current_replicas {
838            for i in replicas..current_replicas {
839                let replica_idx = i + 1;
840                let id = ContainerId::with_role_and_node(
841                    self.service_name.clone(),
842                    replica_idx,
843                    self.role_for_replica(replica_idx),
844                    local_node_id,
845                );
846
847                // Remove from state first and get the container to abort health monitor (short write lock)
848                let removed_container = {
849                    let mut containers = self.containers.write().await;
850                    containers.remove(&id)
851                }; // Lock released here
852
853                // Then perform cleanup (no lock held - I/O operations)
854                if let Some(container) = removed_container {
855                    // Abort the health monitor task if it exists
856                    if let Some(handle) = container.health_monitor {
857                        handle.abort();
858                    }
859
860                    // Unpublish this container's node-loopback ports (mirror of
861                    // the publish in the start path above). Recomputes the same
862                    // backend from the container's stored runtime-resolved IP and
863                    // port override; the last replica's removal frees the
864                    // loopback listener. Gated identically to publish.
865                    if self.spec.publish_to_node_loopback() {
866                        if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip)
867                        {
868                            proxy
869                                .unpublish_loopback_for_container(
870                                    &self.spec,
871                                    ip,
872                                    container.port_override,
873                                )
874                                .await;
875                        }
876                    }
877
878                    // Remove DNS records for this container
879                    if let Some(dns) = &self.dns_server {
880                        // Remove replica-specific DNS entry
881                        let replica_hostname =
882                            format!("{}.{}.service.local", id.replica, self.service_name);
883                        if let Err(e) = dns.remove_record(&replica_hostname).await {
884                            tracing::warn!(
885                                hostname = %replica_hostname,
886                                error = %e,
887                                "failed to remove replica DNS record"
888                            );
889                        } else {
890                            tracing::debug!(
891                                hostname = %replica_hostname,
892                                "removed replica DNS record"
893                            );
894                        }
895
896                        // Remove per-role DNS entry if this was a non-default group.
897                        // Note: this is best-effort and removes the record even if
898                        // other replicas in the same role still need it — the DNS
899                        // server's add/remove API is single-record so we can't keep
900                        // it alive for siblings. P2.3-bis (round-robin per-role)
901                        // can fix this later via a per-role refcount; for now the
902                        // service-level hostname keeps cluster-internal clients
903                        // working even when the role-specific record briefly
904                        // disappears.
905                        if id.role != "default" {
906                            let role_hostname =
907                                format!("{}.{}.service.local", id.role, self.service_name);
908                            if let Err(e) = dns.remove_record(&role_hostname).await {
909                                tracing::warn!(
910                                    hostname = %role_hostname,
911                                    error = %e,
912                                    "failed to remove role DNS record"
913                                );
914                            } else {
915                                tracing::debug!(
916                                    hostname = %role_hostname,
917                                    "removed role DNS record"
918                                );
919                            }
920                        }
921
922                        // Note: We don't remove the service-level hostname here because
923                        // other replicas may still be using it. The service-level record
924                        // should be cleaned up when the entire service is removed.
925                    }
926
927                    // Detach from overlay network if manager available.
928                    //
929                    // Done BEFORE stop_container because:
930                    //   - The container init process must still be in
931                    //     /proc to look up its PID via `get_container_pid`.
932                    //   - `OverlayManager::detach_container` deletes host-side
933                    //     veth interfaces by name (`veth-<pid>-*`) and
934                    //     releases the allocated overlay IPs back to the
935                    //     per-node slice. Without this the IPs leak across
936                    //     container churn and the slice exhausts.
937                    //
938                    // Best-effort: failures are logged but never abort the
939                    // scale-down. The periodic orphan sweep
940                    // (`start_periodic_orphan_sweep`) catches anything we
941                    // missed.
942                    if let Some(overlay) = &self.overlay_manager {
943                        // VM guests have no host veth/PID — release the overlayd
944                        // allocation (IP + registered mesh peer) by container id
945                        // instead of by PID.
946                        if self.runtime.overlay_attach_kind()
947                            == crate::runtime::OverlayAttachKind::InGuestVsock
948                        {
949                            let overlay_guard = overlay.read().await;
950                            if let Err(e) =
951                                overlay_guard.detach_container_guest(&id.to_string()).await
952                            {
953                                tracing::warn!(
954                                    container = %id,
955                                    error = %e,
956                                    "overlay detach_container_guest failed; relying on orphan sweep"
957                                );
958                            }
959                        } else {
960                            match self.runtime.get_container_pid(&id).await {
961                                Ok(Some(pid)) => {
962                                    let overlay_guard = overlay.read().await;
963                                    if let Err(e) = overlay_guard.detach_container(pid).await {
964                                        tracing::warn!(
965                                            container = %id,
966                                            pid,
967                                            error = %e,
968                                            "overlay detach_container failed; relying on orphan sweep"
969                                        );
970                                    }
971                                }
972                                Ok(None) => {
973                                    tracing::debug!(
974                                        container = %id,
975                                        "no PID available for overlay detach (already exited or non-Linux runtime)"
976                                    );
977                                }
978                                Err(e) => {
979                                    tracing::warn!(
980                                        container = %id,
981                                        error = %e,
982                                        "failed to query container PID for overlay detach"
983                                    );
984                                }
985                            }
986                        }
987                    }
988
989                    // Stop container
990                    self.runtime
991                        .stop_container(&id, Duration::from_secs(30))
992                        .await?;
993
994                    // Sync volumes to S3 before removal (no-op if not configured)
995                    if let Err(e) = self.runtime.sync_container_volumes(&id).await {
996                        tracing::warn!(
997                            container = %id,
998                            error = %e,
999                            "failed to sync volumes before removal"
1000                        );
1001                    }
1002
1003                    // Remove container
1004                    self.runtime.remove_container(&id).await?;
1005                }
1006            }
1007        }
1008
1009        Ok(())
1010    }
1011
1012    /// Get current number of replicas
1013    pub async fn replica_count(&self) -> usize {
1014        self.containers.read().await.len()
1015    }
1016
1017    /// Get all container IDs
1018    pub async fn container_ids(&self) -> Vec<ContainerId> {
1019        self.containers.read().await.keys().cloned().collect()
1020    }
1021
1022    /// Get per-container info (id, image, state, pid, overlay IP) for every
1023    /// live container in this instance.
1024    ///
1025    /// Surfaces the REAL image reference each container was created from and its
1026    /// REAL lifecycle state (lowercased via [`ContainerState::as_str`]) so the
1027    /// API/`ps` no longer reports a hardcoded `"running"` with no image.
1028    pub async fn container_infos(&self) -> Vec<ContainerInfo> {
1029        self.containers
1030            .read()
1031            .await
1032            .values()
1033            .map(|c| ContainerInfo {
1034                id: c.id.clone(),
1035                image: c.image.clone(),
1036                state: c.state.as_str().to_string(),
1037                pid: c.pid,
1038                overlay_ip: c.overlay_ip.map(|ip| ip.to_string()),
1039            })
1040            .collect()
1041    }
1042
1043    /// Get read access to the containers map
1044    ///
1045    /// This allows callers to access container overlay IPs and other metadata
1046    /// without copying the entire map.
1047    pub fn containers(
1048        &self,
1049    ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
1050        &self.containers
1051    }
1052
1053    /// Check if this service instance has an overlay manager configured
1054    pub fn has_overlay_manager(&self) -> bool {
1055        self.overlay_manager.is_some()
1056    }
1057
1058    /// Check if this service instance has a proxy manager configured
1059    pub fn has_proxy_manager(&self) -> bool {
1060        self.proxy_manager.is_some()
1061    }
1062
1063    /// Get the proxy manager for this instance, if configured.
1064    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1065        self.proxy_manager.as_ref()
1066    }
1067
1068    /// Check if this service instance has a DNS server configured
1069    pub fn has_dns_server(&self) -> bool {
1070        self.dns_server.is_some()
1071    }
1072}
1073
1074/// Per-container summary surfaced to callers (API / `ps`).
1075///
1076/// Carries the REAL image reference and lifecycle state of a single live
1077/// container, replacing the previous id-only view that forced the API to
1078/// fabricate a hardcoded `"running"` state with no image.
1079#[derive(Debug, Clone)]
1080pub struct ContainerInfo {
1081    /// Container identity.
1082    pub id: ContainerId,
1083    /// Image reference the container was created from (canonical form).
1084    pub image: String,
1085    /// Lowercased lifecycle state (e.g. `"running"`, `"exited"`).
1086    pub state: String,
1087    /// Process ID, when the container is running.
1088    pub pid: Option<u32>,
1089    /// Overlay IP rendered as a string, when assigned.
1090    pub overlay_ip: Option<String>,
1091}
1092
1093/// A live deployment container enriched for Docker-compat `ps` rows and for
1094/// name resolution. Produced by [`ServiceManager::list_container_views`].
1095#[derive(Debug, Clone)]
1096pub struct DeploymentContainerView {
1097    /// Deployment (compose project) name, when known.
1098    pub deployment: Option<String>,
1099    /// Service name within the deployment.
1100    pub service: String,
1101    /// Concrete container identity.
1102    pub container_id: ContainerId,
1103    /// Compose `container_name:` (the user-facing Docker name), when set.
1104    pub container_name: Option<String>,
1105    /// Image reference the container was created from.
1106    pub image: String,
1107    /// Lowercased lifecycle state (e.g. `"running"`).
1108    pub state: String,
1109    /// Process id when running.
1110    pub pid: Option<u32>,
1111    /// The service's published port mappings.
1112    pub ports: Vec<zlayer_spec::PortMapping>,
1113}
1114
1115/// Service manager for multiple services
1116pub struct ServiceManager {
1117    runtime: Arc<dyn Runtime + Send + Sync>,
1118    services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
1119    scale_semaphore: Arc<Semaphore>,
1120    /// Overlay network manager for container networking
1121    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1122    /// Stream registry for L4 proxy route registration (TCP/UDP)
1123    stream_registry: Option<Arc<StreamRegistry>>,
1124    /// Proxy manager for health-aware load balancing (hyper-based proxy)
1125    proxy_manager: Option<Arc<ProxyManager>>,
1126    /// DNS server for service discovery
1127    dns_server: Option<Arc<DnsServer>>,
1128    /// Deployment name (used for generating hostnames)
1129    deployment_name: Option<String>,
1130    /// Health states for dependency condition checking
1131    health_states: Arc<RwLock<HashMap<String, HealthState>>>,
1132    /// Job executor for run-to-completion workloads
1133    job_executor: Option<Arc<JobExecutor>>,
1134    /// Cron scheduler for time-based job triggers
1135    cron_scheduler: Option<Arc<CronScheduler>>,
1136    /// Container supervisor for crash/panic policy enforcement
1137    container_supervisor: Option<Arc<ContainerSupervisor>>,
1138    /// Cluster membership + dispatch handle. When `None`, scale operations
1139    /// run purely local (single-node mode). When `Some`, `scale_service`
1140    /// routes through the cluster (leader dispatches to peers; followers
1141    /// forward to the leader).
1142    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1143}
1144
1145// ---------------------------------------------------------------------------
1146// ServiceManagerBuilder
1147// ---------------------------------------------------------------------------
1148
1149/// Builder for constructing a [`ServiceManager`] with optional subsystems.
1150///
1151/// Prefer using `ServiceManager::builder(runtime)` to start building.
1152///
1153/// # Example
1154///
1155/// ```ignore
1156/// let manager = ServiceManager::builder(runtime)
1157///     .overlay_manager(om)
1158///     .proxy_manager(proxy)
1159///     .deployment_name("prod")
1160///     .build();
1161/// ```
1162pub struct ServiceManagerBuilder {
1163    runtime: Arc<dyn Runtime + Send + Sync>,
1164    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1165    proxy_manager: Option<Arc<ProxyManager>>,
1166    stream_registry: Option<Arc<StreamRegistry>>,
1167    dns_server: Option<Arc<DnsServer>>,
1168    deployment_name: Option<String>,
1169    job_executor: Option<Arc<JobExecutor>>,
1170    cron_scheduler: Option<Arc<CronScheduler>>,
1171    container_supervisor: Option<Arc<ContainerSupervisor>>,
1172    cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1173}
1174
1175impl ServiceManagerBuilder {
1176    /// Create a new builder with the required runtime.
1177    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1178        Self {
1179            runtime,
1180            overlay_manager: None,
1181            proxy_manager: None,
1182            stream_registry: None,
1183            dns_server: None,
1184            deployment_name: None,
1185            job_executor: None,
1186            cron_scheduler: None,
1187            container_supervisor: None,
1188            cluster: None,
1189        }
1190    }
1191
1192    /// Set the overlay network manager for container networking.
1193    #[must_use]
1194    pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
1195        self.overlay_manager = Some(om);
1196        self
1197    }
1198
1199    /// Set the proxy manager for health-aware load balancing.
1200    #[must_use]
1201    pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
1202        self.proxy_manager = Some(pm);
1203        self
1204    }
1205
1206    /// Set the stream registry for TCP/UDP L4 proxy route registration.
1207    #[must_use]
1208    pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
1209        self.stream_registry = Some(sr);
1210        self
1211    }
1212
1213    /// Set the DNS server for service discovery.
1214    #[must_use]
1215    pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1216        self.dns_server = Some(dns);
1217        self
1218    }
1219
1220    /// Set the deployment name (used for hostname generation).
1221    #[must_use]
1222    pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
1223        self.deployment_name = Some(name.into());
1224        self
1225    }
1226
1227    /// Set the job executor for run-to-completion workloads.
1228    #[must_use]
1229    pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
1230        self.job_executor = Some(je);
1231        self
1232    }
1233
1234    /// Set the cron scheduler for time-based job triggers.
1235    #[must_use]
1236    pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
1237        self.cron_scheduler = Some(cs);
1238        self
1239    }
1240
1241    /// Set the container supervisor for crash/panic policy enforcement.
1242    #[must_use]
1243    pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
1244        self.container_supervisor = Some(cs);
1245        self
1246    }
1247
1248    /// Set the cluster membership + dispatch handle. When set,
1249    /// [`ServiceManager::scale_service`] will route through the cluster
1250    /// (leader dispatches to peers; followers forward to the leader).
1251    /// When unset (the default), scale operations remain local-only.
1252    #[must_use]
1253    pub fn cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1254        self.cluster = Some(cluster);
1255        self
1256    }
1257
1258    /// Consume the builder and produce a fully-wired [`ServiceManager`].
1259    ///
1260    /// Logs warnings for missing recommended subsystems (proxy,
1261    /// `stream_registry`, `container_supervisor`, `deployment_name`).
1262    pub fn build(self) -> ServiceManager {
1263        if self.proxy_manager.is_none() {
1264            tracing::warn!("ServiceManager built without proxy_manager");
1265        }
1266        if self.stream_registry.is_none() {
1267            tracing::warn!("ServiceManager built without stream_registry");
1268        }
1269        if self.container_supervisor.is_none() {
1270            tracing::warn!("ServiceManager built without container_supervisor");
1271        }
1272        if self.deployment_name.is_none() {
1273            tracing::warn!("ServiceManager built without deployment_name");
1274        }
1275
1276        ServiceManager {
1277            runtime: self.runtime,
1278            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1279            scale_semaphore: Arc::new(Semaphore::new(10)),
1280            overlay_manager: self.overlay_manager,
1281            stream_registry: self.stream_registry,
1282            proxy_manager: self.proxy_manager,
1283            dns_server: self.dns_server,
1284            deployment_name: self.deployment_name,
1285            health_states: Arc::new(RwLock::new(HashMap::new())),
1286            job_executor: self.job_executor,
1287            cron_scheduler: self.cron_scheduler,
1288            container_supervisor: self.container_supervisor,
1289            cluster: self.cluster,
1290        }
1291    }
1292}
1293
1294impl ServiceManager {
1295    /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
1296    ///
1297    /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
1298    ///
1299    /// # Example
1300    ///
1301    /// ```ignore
1302    /// let manager = ServiceManager::builder(runtime)
1303    ///     .overlay_manager(om)
1304    ///     .proxy_manager(proxy)
1305    ///     .build();
1306    /// ```
1307    pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
1308        ServiceManagerBuilder::new(runtime)
1309    }
1310
1311    /// Create a new service manager
1312    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1313    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1314        Self {
1315            runtime,
1316            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1317            scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
1318            overlay_manager: None,
1319            stream_registry: None,
1320            proxy_manager: None,
1321            dns_server: None,
1322            deployment_name: None,
1323            health_states: Arc::new(RwLock::new(HashMap::new())),
1324            job_executor: None,
1325            cron_scheduler: None,
1326            container_supervisor: None,
1327            cluster: None,
1328        }
1329    }
1330
1331    /// Create a service manager with overlay network support
1332    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1333    pub fn with_overlay(
1334        runtime: Arc<dyn Runtime + Send + Sync>,
1335        overlay_manager: Arc<RwLock<OverlayManager>>,
1336    ) -> Self {
1337        Self {
1338            runtime,
1339            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1340            scale_semaphore: Arc::new(Semaphore::new(10)),
1341            overlay_manager: Some(overlay_manager),
1342            stream_registry: None,
1343            proxy_manager: None,
1344            dns_server: None,
1345            deployment_name: None,
1346            health_states: Arc::new(RwLock::new(HashMap::new())),
1347            job_executor: None,
1348            cron_scheduler: None,
1349            container_supervisor: None,
1350            cluster: None,
1351        }
1352    }
1353
1354    /// Create a fully-configured service manager with overlay and proxy support
1355    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1356    pub fn with_full_config(
1357        runtime: Arc<dyn Runtime + Send + Sync>,
1358        overlay_manager: Arc<RwLock<OverlayManager>>,
1359        deployment_name: String,
1360    ) -> Self {
1361        Self {
1362            runtime,
1363            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1364            scale_semaphore: Arc::new(Semaphore::new(10)),
1365            overlay_manager: Some(overlay_manager),
1366            stream_registry: None,
1367            proxy_manager: None,
1368            dns_server: None,
1369            deployment_name: Some(deployment_name),
1370            health_states: Arc::new(RwLock::new(HashMap::new())),
1371            job_executor: None,
1372            cron_scheduler: None,
1373            container_supervisor: None,
1374            cluster: None,
1375        }
1376    }
1377
1378    /// Get the health states map for external monitoring
1379    pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
1380        Arc::clone(&self.health_states)
1381    }
1382
1383    /// Update health state for a service
1384    pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1385        let mut states = self.health_states.write().await;
1386        states.insert(service_name.to_string(), state);
1387    }
1388
1389    /// Set the deployment name (used for generating hostnames)
1390    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1391    pub fn set_deployment_name(&mut self, name: String) {
1392        self.deployment_name = Some(name);
1393    }
1394
1395    /// Set the stream registry for L4 proxy integration (TCP/UDP)
1396    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1397    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1398        self.stream_registry = Some(registry);
1399    }
1400
1401    /// Builder pattern: add stream registry for L4 proxy integration
1402    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1403    #[must_use]
1404    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1405        self.stream_registry = Some(registry);
1406        self
1407    }
1408
1409    /// Get the stream registry (if configured)
1410    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1411        self.stream_registry.as_ref()
1412    }
1413
1414    /// Set the overlay manager for container networking
1415    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1416    pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1417        self.overlay_manager = Some(manager);
1418    }
1419
1420    /// Set the proxy manager for health-aware load balancing
1421    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1422    pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1423        self.proxy_manager = Some(proxy);
1424    }
1425
1426    /// Builder pattern: add proxy manager for health-aware load balancing
1427    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1428    #[must_use]
1429    pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1430        self.proxy_manager = Some(proxy);
1431        self
1432    }
1433
1434    /// Get the proxy manager (if configured)
1435    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1436        self.proxy_manager.as_ref()
1437    }
1438
1439    /// Set the DNS server for service discovery
1440    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1441    pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1442        self.dns_server = Some(dns);
1443    }
1444
1445    /// Builder pattern: add DNS server for service discovery
1446    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1447    #[must_use]
1448    pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1449        self.dns_server = Some(dns);
1450        self
1451    }
1452
1453    /// Get the DNS server (if configured)
1454    pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1455        self.dns_server.as_ref()
1456    }
1457
1458    /// Set the job executor for run-to-completion workloads
1459    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1460    pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1461        self.job_executor = Some(executor);
1462    }
1463
1464    /// Set the cron scheduler for time-based job triggers
1465    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1466    pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1467        self.cron_scheduler = Some(scheduler);
1468    }
1469
1470    /// Builder pattern: add job executor
1471    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1472    #[must_use]
1473    pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1474        self.job_executor = Some(executor);
1475        self
1476    }
1477
1478    /// Builder pattern: add cron scheduler
1479    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1480    #[must_use]
1481    pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1482        self.cron_scheduler = Some(scheduler);
1483        self
1484    }
1485
1486    /// Set the cluster handle for cluster-aware scaling.
1487    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1488    pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
1489        self.cluster = Some(cluster);
1490    }
1491
1492    /// Builder pattern: add a cluster handle for cluster-aware scaling.
1493    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1494    #[must_use]
1495    pub fn with_cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1496        self.cluster = Some(cluster);
1497        self
1498    }
1499
1500    /// Get the cluster handle (if configured).
1501    pub fn cluster(&self) -> Option<&Arc<dyn zlayer_scheduler::cluster::Cluster>> {
1502        self.cluster.as_ref()
1503    }
1504
1505    /// Get the job executor (if configured)
1506    pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1507        self.job_executor.as_ref()
1508    }
1509
1510    /// Get the cron scheduler (if configured)
1511    pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1512        self.cron_scheduler.as_ref()
1513    }
1514
1515    /// Set the container supervisor for crash/panic policy enforcement
1516    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1517    pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1518        self.container_supervisor = Some(supervisor);
1519    }
1520
1521    /// Builder pattern: add container supervisor
1522    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1523    #[must_use]
1524    pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1525        self.container_supervisor = Some(supervisor);
1526        self
1527    }
1528
1529    /// Get the container supervisor (if configured)
1530    pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1531        self.container_supervisor.as_ref()
1532    }
1533
1534    /// Start the container supervisor background task
1535    ///
1536    /// This spawns a background task that monitors containers for crashes
1537    /// and enforces the `on_panic` error policy.
1538    ///
1539    /// # Errors
1540    /// Returns an error if no container supervisor is configured.
1541    ///
1542    /// # Returns
1543    /// A `JoinHandle` for the supervisor task.
1544    pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1545        let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1546            AgentError::Configuration("Container supervisor not configured".to_string())
1547        })?;
1548
1549        let supervisor = Arc::clone(supervisor);
1550        Ok(tokio::spawn(async move {
1551            supervisor.run_loop().await;
1552        }))
1553    }
1554
1555    /// Shutdown the container supervisor
1556    pub fn shutdown_container_supervisor(&self) {
1557        if let Some(supervisor) = &self.container_supervisor {
1558            supervisor.shutdown();
1559        }
1560    }
1561
1562    /// Get the supervised state of a container
1563    pub async fn get_container_supervised_state(
1564        &self,
1565        container_id: &ContainerId,
1566    ) -> Option<SupervisedState> {
1567        if let Some(supervisor) = &self.container_supervisor {
1568            supervisor.get_state(container_id).await
1569        } else {
1570            None
1571        }
1572    }
1573
1574    /// Get supervisor events receiver
1575    ///
1576    /// Note: This can only be called once; the receiver is moved to the caller.
1577    pub async fn take_supervisor_events(
1578        &self,
1579    ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1580        if let Some(supervisor) = &self.container_supervisor {
1581            supervisor.take_event_receiver().await
1582        } else {
1583            None
1584        }
1585    }
1586
1587    // ==================== Dependency Orchestration ====================
1588
1589    /// Deploy multiple services respecting their dependency order
1590    ///
1591    /// This method:
1592    /// 1. Builds a dependency graph from the services
1593    /// 2. Validates no cycles exist
1594    /// 3. Computes topological order (services with no deps first)
1595    /// 4. For each service in order, waits for dependencies then starts the service
1596    ///
1597    /// # Arguments
1598    /// * `services` - Map of service name to service specification
1599    ///
1600    /// # Errors
1601    /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1602    /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1603    pub async fn deploy_with_dependencies(
1604        &self,
1605        services: HashMap<String, ServiceSpec>,
1606    ) -> Result<()> {
1607        if services.is_empty() {
1608            return Ok(());
1609        }
1610
1611        // Build dependency graph
1612        let graph = DependencyGraph::build(&services)?;
1613
1614        tracing::info!(
1615            service_count = services.len(),
1616            "Starting deployment with dependency ordering"
1617        );
1618
1619        // Get startup order
1620        let order = graph.startup_order();
1621        tracing::debug!(order = ?order, "Computed startup order");
1622
1623        // Start services in dependency order
1624        for service_name in order {
1625            let service_spec = services
1626                .get(service_name)
1627                .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1628
1629            // Wait for dependencies first
1630            if !service_spec.depends.is_empty() {
1631                tracing::info!(
1632                    service = %service_name,
1633                    dependency_count = service_spec.depends.len(),
1634                    "Waiting for dependencies"
1635                );
1636                self.wait_for_dependencies(service_name, &service_spec.depends)
1637                    .await?;
1638            }
1639
1640            // Register and start service
1641            tracing::info!(service = %service_name, "Starting service");
1642            Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
1643
1644            // Get the desired replica count from scale config
1645            let replicas = match &service_spec.scale {
1646                zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1647                zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1648                zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1649            };
1650            self.scale_service(service_name, replicas).await?;
1651
1652            // Mark service as started in health states (Unknown until health check runs)
1653            self.update_health_state(service_name, HealthState::Unknown)
1654                .await;
1655
1656            tracing::info!(
1657                service = %service_name,
1658                replicas = replicas,
1659                "Service started"
1660            );
1661        }
1662
1663        tracing::info!(service_count = services.len(), "Deployment complete");
1664
1665        Ok(())
1666    }
1667
1668    /// Wait for all dependencies of a service to be satisfied
1669    ///
1670    /// # Arguments
1671    /// * `service` - Name of the service waiting for dependencies
1672    /// * `deps` - Slice of dependency specifications
1673    ///
1674    /// # Errors
1675    /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1676    async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1677        let condition_checker = DependencyConditionChecker::new(
1678            Arc::clone(&self.runtime),
1679            Arc::clone(&self.health_states),
1680            None,
1681        );
1682
1683        let waiter = DependencyWaiter::new(condition_checker);
1684        let results = waiter.wait_for_all(deps).await?;
1685
1686        // Check results for failures
1687        for result in results {
1688            match result {
1689                WaitResult::TimedOutFail {
1690                    service: dep_service,
1691                    condition,
1692                    timeout,
1693                } => {
1694                    return Err(AgentError::DependencyTimeout {
1695                        service: service.to_string(),
1696                        dependency: dep_service,
1697                        condition: format!("{condition:?}"),
1698                        timeout,
1699                    });
1700                }
1701                WaitResult::TimedOutWarn {
1702                    service: dep_service,
1703                    condition,
1704                } => {
1705                    tracing::warn!(
1706                        service = %service,
1707                        dependency = %dep_service,
1708                        condition = ?condition,
1709                        "Dependency timed out but continuing"
1710                    );
1711                }
1712                WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1713                    // Continue silently
1714                }
1715            }
1716        }
1717
1718        Ok(())
1719    }
1720
1721    /// Check if all dependencies for a service are currently satisfied
1722    ///
1723    /// This is a one-shot check (no waiting). Useful for pre-flight validation.
1724    ///
1725    /// # Errors
1726    /// Returns an error if a dependency check fails unexpectedly.
1727    pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1728        let condition_checker = DependencyConditionChecker::new(
1729            Arc::clone(&self.runtime),
1730            Arc::clone(&self.health_states),
1731            None,
1732        );
1733
1734        for dep in deps {
1735            if !condition_checker.check(dep).await? {
1736                return Ok(false);
1737            }
1738        }
1739
1740        Ok(true)
1741    }
1742
1743    /// Add or update a workload (service, job, or cron)
1744    ///
1745    /// This method handles different resource types appropriately:
1746    /// - **Service**: Traditional long-running containers with scaling and health checks
1747    /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
1748    /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
1749    ///
1750    /// # Errors
1751    /// Returns an error if service creation, scaling, or cron registration fails.
1752    #[allow(clippy::too_many_lines)]
1753    pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1754        match spec.rtype {
1755            ResourceType::Service => {
1756                // Long-running service: create/update instance
1757                let mut services = self.services.write().await;
1758
1759                if let Some(instance) = services.get_mut(&name) {
1760                    // Update existing service. We need to:
1761                    //   1. Update the in-memory spec (so future scale-ups use the new image).
1762                    //   2. Recreate the local replicas when the image actually changed —
1763                    //      either a different image *reference* (e.g. tag bump
1764                    //      1.28 -> 1.29), which is a new image regardless of pull
1765                    //      policy, or, under Always/Newer, observed *digest* drift on
1766                    //      the same reference.
1767                    // The recreate is LOCAL (`scale_service_local`): `upsert_service`
1768                    // runs on whichever node owns the replicas (the leader for its
1769                    // own share, each worker via the `/internal/scale` handler). Using
1770                    // the cluster-routed `scale_service` here would bounce a worker's
1771                    // recreate back to the leader and re-enter dispatch. Cluster-wide
1772                    // distribution is the caller's job (orchestrate_deployment + the
1773                    // scale dispatch that carries this spec to every node).
1774                    let image_changed = instance.spec.image.name != spec.image.name;
1775                    instance.spec = spec.clone();
1776                    if let Some(dns) = &self.dns_server {
1777                        instance.set_dns_server(Arc::clone(dns));
1778                    }
1779
1780                    let effective = spec.image.pull_policy;
1781                    let old_digest = instance.last_pulled_digest().await;
1782                    let current_replicas =
1783                        u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
1784                    drop(services); // Release write lock before pull / scale (which take their own locks).
1785
1786                    // A changed image reference always recreates. Same-reference
1787                    // refreshes are governed by pull policy + digest drift.
1788                    let mut should_recreate = image_changed;
1789                    let mut new_digest = old_digest.clone();
1790
1791                    match effective {
1792                        PullPolicy::Never | PullPolicy::IfNotPresent => {
1793                            // No proactive pull. If the reference changed we still
1794                            // recreate below; the scale-up path pulls the (absent) new
1795                            // image per IfNotPresent. A same-reference redeploy under
1796                            // these policies is a genuine no-op.
1797                            tracing::debug!(
1798                                service = %name,
1799                                policy = ?effective,
1800                                image_changed,
1801                                "re-deploy under no-refresh pull policy"
1802                            );
1803                        }
1804                        PullPolicy::Always | PullPolicy::Newer => {
1805                            // Pull (this updates the cached digest as a side-effect).
1806                            // We need a read guard to keep the instance alive while
1807                            // calling its &self method.
1808                            let services_ro = self.services.read().await;
1809                            new_digest = if let Some(inst) = services_ro.get(&name) {
1810                                inst.pull_and_refresh_digest().await?
1811                            } else {
1812                                // The service vanished between our write-lock release
1813                                // and read-lock acquisition (race with remove_service).
1814                                // Treat this as a no-op; the caller will see the removal.
1815                                tracing::warn!(
1816                                    service = %name,
1817                                    "service removed during upsert; skipping drift recreate"
1818                                );
1819                                return Ok(());
1820                            };
1821                            drop(services_ro);
1822
1823                            // Always forces a recreate. Newer recreates on digest
1824                            // drift. When digests are unknown (runtime doesn't expose
1825                            // them), we can't observe drift safely under Newer, so the
1826                            // reference check above is the only trigger.
1827                            should_recreate = should_recreate
1828                                || match effective {
1829                                    PullPolicy::Always => true,
1830                                    PullPolicy::Newer => match (&old_digest, &new_digest) {
1831                                        (Some(old), Some(new)) => old != new,
1832                                        _ => false,
1833                                    },
1834                                    _ => false,
1835                                };
1836                        }
1837                    }
1838
1839                    if should_recreate && current_replicas > 0 {
1840                        tracing::info!(
1841                            service = %name,
1842                            policy = ?effective,
1843                            image_changed,
1844                            old_digest = ?old_digest,
1845                            new_digest = ?new_digest,
1846                            replicas = current_replicas,
1847                            "image changed; performing local rolling recreate"
1848                        );
1849                        self.scale_service_local(&name, 0).await?;
1850                        self.scale_service_local(&name, current_replicas).await?;
1851                        tracing::info!(
1852                            service = %name,
1853                            new_digest = ?new_digest,
1854                            "service recreated with refreshed image"
1855                        );
1856                    } else {
1857                        tracing::debug!(
1858                            service = %name,
1859                            policy = ?effective,
1860                            old_digest = ?old_digest,
1861                            new_digest = ?new_digest,
1862                            "service up to date; no recreate required"
1863                        );
1864                    }
1865                    return Ok(());
1866                }
1867                // Create new service with proxy manager for health-aware load balancing
1868                let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1869                let mut instance = if let Some(proxy) = &self.proxy_manager {
1870                    ServiceInstance::with_proxy(
1871                        name.clone(),
1872                        spec,
1873                        self.runtime.clone(),
1874                        overlay,
1875                        Arc::clone(proxy),
1876                    )
1877                } else {
1878                    ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1879                };
1880                // Thread the local cluster node id so new `ContainerId`s carry
1881                // owning-node identity. Defaults to `0` in single-node mode.
1882                instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
1883                // Set DNS server if configured
1884                if let Some(dns) = &self.dns_server {
1885                    instance.set_dns_server(Arc::clone(dns));
1886                }
1887                // Wire shared health states so callbacks bridge back to ServiceManager
1888                instance.set_health_states(Arc::clone(&self.health_states));
1889                // Register HTTP routes via proxy manager
1890                if let Some(proxy) = &self.proxy_manager {
1891                    proxy.add_service(&name, &instance.spec).await;
1892                }
1893                // Register TCP/UDP endpoints in stream registry
1894                if let Some(stream_registry) = &self.stream_registry {
1895                    for endpoint in &instance.spec.endpoints {
1896                        let svc = StreamService::new(
1897                            name.clone(),
1898                            Vec::new(), // No backends yet; added on scale-up
1899                        );
1900                        match endpoint.protocol {
1901                            Protocol::Tcp => {
1902                                stream_registry.register_tcp(endpoint.port, svc);
1903                                tracing::debug!(
1904                                    service = %name,
1905                                    port = endpoint.port,
1906                                    "Registered TCP stream route"
1907                                );
1908                            }
1909                            Protocol::Udp => {
1910                                stream_registry.register_udp(endpoint.port, svc);
1911                                tracing::debug!(
1912                                    service = %name,
1913                                    port = endpoint.port,
1914                                    "Registered UDP stream route"
1915                                );
1916                            }
1917                            _ => {} // HTTP routes handled by proxy manager
1918                        }
1919                    }
1920                }
1921                services.insert(name, instance);
1922            }
1923            ResourceType::Job => {
1924                // Job: Just store the spec for later triggering
1925                // Jobs don't start containers immediately; they're triggered on-demand
1926                if let Some(executor) = &self.job_executor {
1927                    executor.register_job(&name, spec).await;
1928                    tracing::info!(job = %name, "Registered job spec");
1929                } else {
1930                    tracing::warn!(
1931                        job = %name,
1932                        "Job executor not configured, storing as service for reference"
1933                    );
1934                    // Fallback: store as service instance for reference
1935                    let mut services = self.services.write().await;
1936                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1937                    let mut instance = if let Some(proxy) = &self.proxy_manager {
1938                        ServiceInstance::with_proxy(
1939                            name.clone(),
1940                            spec,
1941                            self.runtime.clone(),
1942                            overlay,
1943                            Arc::clone(proxy),
1944                        )
1945                    } else {
1946                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1947                    };
1948                    // Thread the local cluster node id (same as the Service
1949                    // branch above) so the fallback-as-service Job entry also
1950                    // carries owning-node identity.
1951                    instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
1952                    // Set DNS server if configured
1953                    if let Some(dns) = &self.dns_server {
1954                        instance.set_dns_server(Arc::clone(dns));
1955                    }
1956                    services.insert(name, instance);
1957                }
1958            }
1959            ResourceType::Cron => {
1960                // Cron: Register with the cron scheduler
1961                if let Some(scheduler) = &self.cron_scheduler {
1962                    scheduler.register(&name, &spec).await?;
1963                    tracing::info!(cron = %name, "Registered cron job with scheduler");
1964                } else {
1965                    return Err(AgentError::Configuration(format!(
1966                        "Cron scheduler not configured for cron job '{name}'"
1967                    )));
1968                }
1969            }
1970        }
1971
1972        Ok(())
1973    }
1974
1975    /// Update backend addresses via `ProxyManager` after scaling, applying
1976    /// per-endpoint `target_role` filtering.
1977    ///
1978    /// For each L7 endpoint of the service, this collects the subset of
1979    /// containers whose `ContainerId.role` matches `endpoint.target_role`
1980    /// (or all containers when `target_role` is `None`) and updates the
1981    /// proxy's backend pool for that specific endpoint via
1982    /// [`ProxyManager::update_endpoint_backends`].
1983    async fn update_proxy_backends(&self, instance: &ServiceInstance) {
1984        let Some(proxy) = &self.proxy_manager else {
1985            return;
1986        };
1987        for endpoint in &instance.spec.endpoints {
1988            // Only L7 endpoints flow through the proxy (HTTP/HTTPS/WS).
1989            if !matches!(
1990                endpoint.protocol,
1991                Protocol::Http | Protocol::Https | Protocol::Websocket
1992            ) {
1993                continue;
1994            }
1995            let addrs = self.collect_endpoint_backends(instance, endpoint).await;
1996            proxy
1997                .update_endpoint_backends(&instance.service_name, &endpoint.name, addrs)
1998                .await;
1999        }
2000    }
2001
2002    /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
2003    ///
2004    /// For containers with a port override (macOS sandbox), the addresses already
2005    /// carry the runtime-assigned port. In that case, the container listens on the
2006    /// override port for all traffic, so we use the address port directly. For
2007    /// containers without a port override (Linux, VMs), we reconstruct addresses
2008    /// using the endpoint's declared port, since each container has its own IP
2009    /// and can bind any port independently.
2010    async fn update_stream_backends(&self, instance: &ServiceInstance) {
2011        let Some(stream_registry) = &self.stream_registry else {
2012            return;
2013        };
2014
2015        for endpoint in &instance.spec.endpoints {
2016            match endpoint.protocol {
2017                Protocol::Tcp => {
2018                    let tcp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2019                    let backend_count = tcp_backends.len();
2020                    stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
2021                    tracing::debug!(
2022                        endpoint = %endpoint.name,
2023                        port = endpoint.port,
2024                        backend_count = backend_count,
2025                        target_role = ?endpoint.target_role,
2026                        "Updated TCP stream backends"
2027                    );
2028                }
2029                Protocol::Udp => {
2030                    let udp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2031                    let backend_count = udp_backends.len();
2032                    stream_registry.update_udp_backends(endpoint.port, udp_backends);
2033                    tracing::debug!(
2034                        endpoint = %endpoint.name,
2035                        port = endpoint.port,
2036                        backend_count = backend_count,
2037                        target_role = ?endpoint.target_role,
2038                        "Updated UDP stream backends"
2039                    );
2040                }
2041                _ => {} // HTTP endpoints handled by update_proxy_backends
2042            }
2043        }
2044    }
2045
2046    /// Scale a service. Cluster-aware: if this node has a `Cluster` handle
2047    /// and we're not the leader, forward to the leader; if leader, compute
2048    /// affinity-aware placement and dispatch each node its share via
2049    /// `dispatch_scale_distributed`; else (single-node) just scale locally.
2050    ///
2051    /// # Errors
2052    /// Returns an error if scaling fails on any participating node.
2053    #[allow(clippy::cast_possible_truncation)]
2054    pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
2055        use zlayer_scheduler::cluster::InternalScaleRequest;
2056
2057        tracing::info!(
2058            target: "zlayer::scale_distribute",
2059            service = name,
2060            replicas,
2061            has_cluster = self.cluster.is_some(),
2062            "scale_service ENTER"
2063        );
2064
2065        // Attach the current spec so every receiving node can register/update
2066        // the service before scaling. This is what propagates an image change
2067        // to worker containers and lets a fresh worker run a replica it has
2068        // never seen. `None` if the service isn't registered locally (the
2069        // receiver then falls back to its own cached spec).
2070        let spec = self
2071            .services
2072            .read()
2073            .await
2074            .get(name)
2075            .map(|inst| inst.spec.clone());
2076        let build_req = |replicas: u32| {
2077            let req = InternalScaleRequest::new(name, replicas);
2078            match spec.clone() {
2079                Some(s) => req.with_spec(s),
2080                None => req,
2081            }
2082        };
2083
2084        if let Some(cluster) = &self.cluster {
2085            let is_leader = cluster.is_leader().await;
2086            tracing::info!(
2087                target: "zlayer::scale_distribute",
2088                service = name,
2089                replicas,
2090                is_leader,
2091                spec_affinity = ?spec.as_ref().and_then(|s| s.affinity.clone()),
2092                "scale_service: cluster path"
2093            );
2094            if !is_leader {
2095                // Follower: forward to the leader and let it dispatch.
2096                return cluster
2097                    .forward_scale(build_req(replicas))
2098                    .await
2099                    .map_err(|e| AgentError::CreateFailed {
2100                        id: name.to_string(),
2101                        reason: format!("cluster forward: {e}"),
2102                    });
2103            }
2104
2105            // Leader path. Compute affinity-aware placement across the Ready
2106            // node set and dispatch each node its share. `dispatch_scale_distributed`
2107            // reuses the same placement machinery as one-off container placement
2108            // (`cluster_nodes_to_node_states` + `place_service_replicas`), honoring
2109            // `ServiceSpec.affinity` (`spread`/`pack`/`pin`). The leader's own
2110            // share short-circuits to a local call (no localhost HTTP round-trip),
2111            // and the attached spec lets fresh workers register the service before
2112            // scaling. Single-node clusters fall through the default impl, which
2113            // dispatches everything to this node (unchanged behavior).
2114            return cluster
2115                .dispatch_scale_distributed(build_req(replicas))
2116                .await
2117                .map_err(|e| AgentError::CreateFailed {
2118                    id: name.to_string(),
2119                    reason: format!("cluster dispatch: {e}"),
2120                });
2121        }
2122
2123        // No cluster handle — single-node mode.
2124        self.scale_service_local(name, replicas).await
2125    }
2126
2127    /// Local (single-node) scale: directly creates/destroys containers on
2128    /// this node only. Called by:
2129    ///   - `scale_service` in single-node mode (when `self.cluster` is None).
2130    ///   - The `/api/v1/internal/scale` handler (which the leader's
2131    ///     `Cluster::dispatch_scale` HTTP-POSTs to, bottoming out the
2132    ///     recursive loop on each receiving node).
2133    ///   - The cluster impls' `local_dispatch` closure (for the leader's own
2134    ///     share — short-circuited to avoid a localhost round-trip).
2135    ///
2136    /// # Errors
2137    /// Returns an error if the service is not found or scaling fails.
2138    #[allow(clippy::cast_possible_truncation)]
2139    pub async fn scale_service_local(&self, name: &str, replicas: u32) -> Result<()> {
2140        tracing::info!(
2141            target: "zlayer::scale_distribute",
2142            service = name,
2143            replicas,
2144            "scale_service_local ENTER"
2145        );
2146        let _permit = self.scale_semaphore.acquire().await;
2147
2148        let services = self.services.read().await;
2149        let Some(instance) = services.get(name) else {
2150            // Draining a service this node never hosted is a no-op (e.g. the
2151            // leader fans out `count=0` to a node to drain it during a
2152            // scale-down, but that node never ran the service).
2153            if replicas == 0 {
2154                return Ok(());
2155            }
2156            return Err(AgentError::NotFound {
2157                container: name.to_string(),
2158                reason: "service not found".to_string(),
2159            });
2160        };
2161
2162        // Get current replica count before scaling
2163        let current_replicas = instance.replica_count().await as u32;
2164
2165        // Perform the scaling operation
2166        instance.scale_to(replicas).await?;
2167
2168        // After scaling, update proxy and stream backends for each endpoint.
2169        // Per-endpoint collection (rather than a single service-wide list)
2170        // is what makes `EndpointSpec.target_role` filtering possible:
2171        // each endpoint receives only the containers whose
2172        // `ContainerId.role` matches its declared role.
2173        if self.proxy_manager.is_some() {
2174            self.update_proxy_backends(instance).await;
2175        }
2176        if self.stream_registry.is_some() {
2177            self.update_stream_backends(instance).await;
2178        }
2179
2180        // Register new containers with supervisor for crash monitoring.
2181        //
2182        // Container ids here must match what `ServiceInstance::scale_to`
2183        // constructed — same role (derived from `replica_groups`) and same
2184        // local node id. Otherwise supervise/unsupervise miss the live entry
2185        // and crash-restart bookkeeping leaks across scale events.
2186        let local_node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2187        if let Some(supervisor) = &self.container_supervisor {
2188            // For scale-up, register new containers
2189            if replicas > current_replicas {
2190                for i in current_replicas..replicas {
2191                    let replica_idx = i + 1;
2192                    let container_id = ContainerId::with_role_and_node(
2193                        name.to_string(),
2194                        replica_idx,
2195                        instance.role_for_replica(replica_idx),
2196                        local_node_id,
2197                    );
2198                    supervisor.supervise(&container_id, &instance.spec).await;
2199                }
2200            }
2201            // For scale-down, unregister removed containers
2202            if replicas < current_replicas {
2203                for i in replicas..current_replicas {
2204                    let replica_idx = i + 1;
2205                    let container_id = ContainerId::with_role_and_node(
2206                        name.to_string(),
2207                        replica_idx,
2208                        instance.role_for_replica(replica_idx),
2209                        local_node_id,
2210                    );
2211                    supervisor.unsupervise(&container_id).await;
2212                }
2213            }
2214        }
2215
2216        Ok(())
2217    }
2218
2219    /// Collect backend addresses for a single endpoint of a service.
2220    ///
2221    /// This queries the service instance's containers for their overlay
2222    /// network IP addresses and constructs backend addresses using the
2223    /// endpoint's container target port.
2224    ///
2225    /// Containers are filtered by `endpoint.target_role`:
2226    /// - `None` (default): all containers of the service are eligible
2227    ///   (legacy behavior).
2228    /// - `Some(role)`: only containers whose `ContainerId.role` equals
2229    ///   `role` are included. Implements
2230    ///   [`zlayer_spec::EndpointSpec::target_role`].
2231    ///
2232    /// If a container has a `port_override` (e.g., macOS sandbox where all
2233    /// containers share the host network), that port is used instead of
2234    /// the spec-declared endpoint port. This allows multiple replicas on
2235    /// the same IP (`127.0.0.1`) to be distinguished by port.
2236    async fn collect_endpoint_backends(
2237        &self,
2238        instance: &ServiceInstance,
2239        endpoint: &zlayer_spec::EndpointSpec,
2240    ) -> Vec<SocketAddr> {
2241        let mut addrs = Vec::new();
2242        let endpoint_port = endpoint.target_port();
2243        let containers = instance.containers().read().await;
2244
2245        for (container_id, container) in containers.iter() {
2246            // target_role filter: skip containers whose role doesn't match.
2247            if let Some(required_role) = endpoint.target_role.as_ref() {
2248                if container_id.role != *required_role {
2249                    continue;
2250                }
2251            }
2252            let Some(ip) = container.overlay_ip else {
2253                continue;
2254            };
2255            // Use the runtime-assigned port override if present (macOS
2256            // sandbox), otherwise fall back to the endpoint's declared
2257            // target port.
2258            let port = container.port_override.unwrap_or(endpoint_port);
2259            addrs.push(SocketAddr::new(ip, port));
2260        }
2261
2262        // If we expected backends but found none, log a hint so operators
2263        // can debug. Distinguish "no containers" from "role filter
2264        // excluded everything" from "no overlay IPs".
2265        if addrs.is_empty() && !containers.is_empty() {
2266            tracing::warn!(
2267                service = %instance.service_name,
2268                endpoint = %endpoint.name,
2269                target_role = ?endpoint.target_role,
2270                container_count = containers.len(),
2271                "no backends collected for endpoint - either no matching role, no overlay IPs, or filtering excluded all"
2272            );
2273        }
2274
2275        addrs
2276    }
2277
2278    /// Get service replica count
2279    ///
2280    /// # Errors
2281    /// Returns an error if the service is not found.
2282    pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
2283        let services = self.services.read().await;
2284        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
2285            container: name.to_string(),
2286            reason: "service not found".to_string(),
2287        })?;
2288
2289        Ok(instance.replica_count().await)
2290    }
2291
2292    /// Remove a workload (service, job, or cron)
2293    ///
2294    /// This method handles cleanup for different resource types:
2295    /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
2296    /// - **Job**: Unregisters from job executor
2297    /// - **Cron**: Unregisters from cron scheduler
2298    ///
2299    /// # Errors
2300    /// Returns an error if the service cannot be removed or scale-down fails.
2301    pub async fn remove_service(&self, name: &str) -> Result<()> {
2302        // Try to unregister from cron scheduler first
2303        if let Some(scheduler) = &self.cron_scheduler {
2304            scheduler.unregister(name).await;
2305        }
2306
2307        // Try to unregister from job executor
2308        if let Some(executor) = &self.job_executor {
2309            executor.unregister_job(name).await;
2310        }
2311
2312        // Unregister stream routes (TCP/UDP) from the stream registry
2313        if let Some(stream_registry) = &self.stream_registry {
2314            // Need to get the service spec to know which ports to unregister
2315            let services = self.services.read().await;
2316            if let Some(instance) = services.get(name) {
2317                for endpoint in &instance.spec.endpoints {
2318                    match endpoint.protocol {
2319                        Protocol::Tcp => {
2320                            let _ = stream_registry.unregister_tcp(endpoint.port);
2321                            tracing::debug!(
2322                                service = %name,
2323                                port = endpoint.port,
2324                                "Unregistered TCP stream route"
2325                            );
2326                        }
2327                        Protocol::Udp => {
2328                            let _ = stream_registry.unregister_udp(endpoint.port);
2329                            tracing::debug!(
2330                                service = %name,
2331                                port = endpoint.port,
2332                                "Unregistered UDP stream route"
2333                            );
2334                        }
2335                        _ => {} // HTTP routes handled above
2336                    }
2337                }
2338            }
2339            drop(services); // Release read lock
2340        }
2341
2342        // Unpublish node-loopback ports for every live replica of this
2343        // service so the loopback listeners are freed (mirror of the
2344        // per-replica unpublish in `ServiceInstance::scale_to`). Gated on the
2345        // spec's policy; recomputes each backend from the container's stored
2346        // runtime-resolved IP and port override.
2347        {
2348            let services = self.services.read().await;
2349            if let Some(instance) = services.get(name) {
2350                if instance.spec.publish_to_node_loopback() {
2351                    if let Some(proxy) = instance.proxy_manager() {
2352                        let containers = instance.containers().read().await;
2353                        for container in containers.values() {
2354                            if let Some(ip) = container.overlay_ip {
2355                                proxy
2356                                    .unpublish_loopback_for_container(
2357                                        &instance.spec,
2358                                        ip,
2359                                        container.port_override,
2360                                    )
2361                                    .await;
2362                            }
2363                        }
2364                    }
2365                }
2366            }
2367            drop(services); // Release read lock
2368        }
2369
2370        // Unregister containers from the supervisor
2371        if let Some(supervisor) = &self.container_supervisor {
2372            let containers = self.get_service_containers(name).await;
2373            for container_id in containers {
2374                supervisor.unsupervise(&container_id).await;
2375            }
2376            tracing::debug!(service = %name, "Unregistered containers from supervisor");
2377        }
2378
2379        // Clean up DNS records for the service (bare name + FQDNs).
2380        self.cleanup_service_dns(name).await;
2381
2382        // Remove from services map (may or may not exist depending on rtype)
2383        let mut services = self.services.write().await;
2384        if services.remove(name).is_some() {
2385            tracing::debug!(service = %name, "Removed service from manager");
2386        }
2387
2388        Ok(())
2389    }
2390
2391    /// Remove every DNS record this service registered on attach: the bare
2392    /// compose service name (`{service}`), the service-level FQDN
2393    /// (`{service}.service.local`), and each replica's FQDN
2394    /// (`{replica}.{service}.service.local`). Best-effort; failures are logged.
2395    async fn cleanup_service_dns(&self, name: &str) {
2396        let Some(dns) = &self.dns_server else {
2397            return;
2398        };
2399
2400        // Bare compose service-name record (compose discovery).
2401        if let Err(e) = dns.remove_record(name).await {
2402            tracing::warn!(
2403                hostname = %name,
2404                error = %e,
2405                "failed to remove bare service-name DNS record"
2406            );
2407        }
2408
2409        // Service-level FQDN.
2410        let service_hostname = format!("{name}.service.local");
2411        if let Err(e) = dns.remove_record(&service_hostname).await {
2412            tracing::warn!(
2413                hostname = %service_hostname,
2414                error = %e,
2415                "failed to remove service DNS record"
2416            );
2417        } else {
2418            tracing::debug!(hostname = %service_hostname, "removed service DNS record");
2419        }
2420
2421        // Any remaining replica-specific FQDNs.
2422        let services = self.services.read().await;
2423        if let Some(instance) = services.get(name) {
2424            let containers = instance.containers().read().await;
2425            for (id, _) in containers.iter() {
2426                let replica_hostname = format!("{}.{}.service.local", id.replica, name);
2427                if let Err(e) = dns.remove_record(&replica_hostname).await {
2428                    tracing::warn!(
2429                        hostname = %replica_hostname,
2430                        error = %e,
2431                        "failed to remove replica DNS record during service removal"
2432                    );
2433                }
2434            }
2435        }
2436    }
2437
2438    /// Introspect service infrastructure wiring.
2439    /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
2440    pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
2441        let services = self.services.read().await;
2442        services.get(name).map(|i| {
2443            (
2444                i.has_overlay_manager(),
2445                i.has_proxy_manager(),
2446                i.has_dns_server(),
2447            )
2448        })
2449    }
2450
2451    /// List all services
2452    pub async fn list_services(&self) -> Vec<String> {
2453        self.services.read().await.keys().cloned().collect()
2454    }
2455
2456    /// Get logs for a service, aggregated from all container replicas.
2457    ///
2458    /// # Arguments
2459    /// * `service_name` - Name of the service to fetch logs for
2460    /// * `tail` - Number of lines to return per container (0 = all)
2461    /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
2462    ///
2463    /// # Errors
2464    /// Returns an error if the service or instance is not found.
2465    ///
2466    /// # Returns
2467    /// Structured log entries from all (or specific) container replicas. Each
2468    /// entry has its `service` and `deployment` fields populated when available.
2469    pub async fn get_service_logs(
2470        &self,
2471        service_name: &str,
2472        tail: usize,
2473        instance: Option<&str>,
2474    ) -> Result<Vec<LogEntry>> {
2475        let container_ids = self.get_service_containers(service_name).await;
2476
2477        if container_ids.is_empty() {
2478            return Err(AgentError::NotFound {
2479                container: service_name.to_string(),
2480                reason: "no containers found for service".to_string(),
2481            });
2482        }
2483
2484        // If a specific instance is requested, filter to just that one
2485        let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
2486            if let Ok(replica_num) = inst.parse::<u32>() {
2487                container_ids
2488                    .iter()
2489                    .filter(|id| id.replica == replica_num)
2490                    .collect()
2491            } else {
2492                // Try matching by full container ID string suffix
2493                container_ids
2494                    .iter()
2495                    .filter(|id| id.to_string().contains(inst))
2496                    .collect()
2497            }
2498        } else {
2499            container_ids.iter().collect()
2500        };
2501
2502        if target_ids.is_empty() {
2503            return Err(AgentError::NotFound {
2504                container: format!("{}/{}", service_name, instance.unwrap_or("?")),
2505                reason: "instance not found".to_string(),
2506            });
2507        }
2508
2509        let mut all_entries: Vec<LogEntry> = Vec::new();
2510
2511        for id in &target_ids {
2512            match self.runtime.container_logs(id, tail).await {
2513                Ok(mut entries) => {
2514                    // Populate service and deployment metadata on each entry
2515                    for entry in &mut entries {
2516                        if entry.service.is_none() {
2517                            entry.service = Some(service_name.to_string());
2518                        }
2519                        if entry.deployment.is_none() {
2520                            entry.deployment.clone_from(&self.deployment_name);
2521                        }
2522                    }
2523                    all_entries.extend(entries);
2524                }
2525                Err(e) => {
2526                    tracing::warn!(
2527                        service = service_name,
2528                        container = %id,
2529                        error = %e,
2530                        "Failed to read container logs"
2531                    );
2532                }
2533            }
2534        }
2535
2536        Ok(all_entries)
2537    }
2538
2539    /// Get all container IDs for a specific service
2540    ///
2541    /// Returns an empty vector if the service doesn't exist.
2542    ///
2543    /// # Arguments
2544    /// * `service_name` - Name of the service to query
2545    ///
2546    /// # Returns
2547    /// Vector of `ContainerIds` for all replicas of the service
2548    pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
2549        let services = self.services.read().await;
2550        if let Some(instance) = services.get(service_name) {
2551            instance.container_ids().await
2552        } else {
2553            Vec::new()
2554        }
2555    }
2556
2557    /// Get per-container info (id, image, real state, pid, overlay IP) for a
2558    /// specific service.
2559    ///
2560    /// Unlike [`get_service_containers`](Self::get_service_containers) (which
2561    /// returns ids only), this surfaces the REAL image reference and lifecycle
2562    /// state of each live container so the API/`ps` can report them accurately.
2563    ///
2564    /// Returns an empty vector if the service doesn't exist.
2565    pub async fn get_service_container_infos(&self, service_name: &str) -> Vec<ContainerInfo> {
2566        let services = self.services.read().await;
2567        if let Some(instance) = services.get(service_name) {
2568            instance.container_infos().await
2569        } else {
2570            Vec::new()
2571        }
2572    }
2573
2574    /// This node's **local** view of `service` (running replica count, health,
2575    /// containers), used for cluster-wide aggregation. Served by the internal
2576    /// `/api/v1/internal/services/{svc}/state` endpoint and used as the local
2577    /// part of [`Self::cluster_service_states`].
2578    pub async fn local_service_state(
2579        &self,
2580        service: &str,
2581    ) -> zlayer_types::cluster::NodeServiceState {
2582        use zlayer_types::cluster::{ClusterContainerSummary, NodeServiceState};
2583        let node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2584        let infos = self.get_service_container_infos(service).await;
2585        #[allow(clippy::cast_possible_truncation)]
2586        let running = infos
2587            .iter()
2588            .filter(|i| i.state.eq_ignore_ascii_case("running"))
2589            .count() as u32;
2590        // A node running 0 replicas is trivially healthy (it can't drag the
2591        // cluster-wide aggregate). Otherwise require a Healthy health state.
2592        let healthy = if running == 0 {
2593            true
2594        } else {
2595            let states = self.health_states();
2596            let guard = states.read().await;
2597            matches!(guard.get(service), Some(HealthState::Healthy))
2598        };
2599        let containers = infos
2600            .into_iter()
2601            .map(|i| ClusterContainerSummary {
2602                node_id,
2603                id: i.id.to_string(),
2604                service: i.id.service.clone(),
2605                replica: i.id.replica,
2606                image: i.image,
2607                state: i.state,
2608                pid: i.pid,
2609                overlay_ip: i.overlay_ip,
2610            })
2611            .collect();
2612        NodeServiceState {
2613            node_id,
2614            running,
2615            healthy,
2616            containers,
2617        }
2618    }
2619
2620    /// Cluster-wide per-node states for `service`: this node's local view plus
2621    /// every other node's (fetched via the cluster handle's
2622    /// `fetch_remote_service_states`). When not clustered, returns just the
2623    /// local view. This is the source of truth for distributed-service replica
2624    /// counts, health, and the `ps` container listing on the leader.
2625    pub async fn cluster_service_states(
2626        &self,
2627        service: &str,
2628    ) -> Vec<zlayer_types::cluster::NodeServiceState> {
2629        let mut states = vec![self.local_service_state(service).await];
2630        if let Some(cluster) = &self.cluster {
2631            states.extend(cluster.fetch_remote_service_states(service).await);
2632        }
2633        states
2634    }
2635
2636    /// Execute a command inside a running container for a service
2637    ///
2638    /// Picks a specific replica if provided, otherwise uses the first available container.
2639    ///
2640    /// # Arguments
2641    /// * `service_name` - Name of the service
2642    /// * `replica` - Optional replica number to target
2643    /// * `cmd` - Command and arguments to execute
2644    ///
2645    /// # Errors
2646    /// Returns an error if the service or replica is not found, or if exec fails.
2647    ///
2648    /// # Panics
2649    /// Panics if no replica is specified and the container list is unexpectedly empty
2650    /// after the emptiness check (should not happen in practice).
2651    ///
2652    /// # Returns
2653    /// Tuple of (`exit_code`, stdout, stderr)
2654    pub async fn exec_in_container(
2655        &self,
2656        service_name: &str,
2657        replica: Option<u32>,
2658        cmd: &[String],
2659    ) -> Result<(i32, String, String)> {
2660        let container_ids = self.get_service_containers(service_name).await;
2661
2662        if container_ids.is_empty() {
2663            return Err(AgentError::NotFound {
2664                container: service_name.to_string(),
2665                reason: "no containers found for service".to_string(),
2666            });
2667        }
2668
2669        // Pick the target container
2670        let target = if let Some(rep) = replica {
2671            container_ids
2672                .into_iter()
2673                .find(|cid| cid.replica == rep)
2674                .ok_or_else(|| AgentError::NotFound {
2675                    container: format!("{service_name}-rep-{rep}"),
2676                    reason: format!("replica {rep} not found for service"),
2677                })?
2678        } else {
2679            // Use the first container (lowest replica number)
2680            container_ids.into_iter().next().unwrap()
2681        };
2682
2683        self.runtime.exec(&target, cmd).await
2684    }
2685
2686    /// List every live container across all services, enriched with the data a
2687    /// Docker `ps` row needs and the data the Docker-name resolver needs.
2688    ///
2689    /// For each running container this surfaces the deployment name, the service
2690    /// name, the concrete [`ContainerId`], the compose `container_name:` label
2691    /// (when set, the user-facing Docker name), the real image, the lifecycle
2692    /// state, and the service's published port mappings. Used by the unified
2693    /// container-name resolver and by `docker ps` so compose deployments show up
2694    /// and resolve by their `container_name`.
2695    pub async fn list_container_views(&self) -> Vec<DeploymentContainerView> {
2696        let deployment = self.deployment_name.clone();
2697        let services = self.services.read().await;
2698        let mut out = Vec::new();
2699        for (service_name, instance) in services.iter() {
2700            let container_name = instance
2701                .spec
2702                .labels
2703                .get("com.docker.compose.container_name")
2704                .cloned();
2705            let ports = instance.spec.port_mappings.clone();
2706            for info in instance.container_infos().await {
2707                out.push(DeploymentContainerView {
2708                    deployment: deployment.clone(),
2709                    service: service_name.clone(),
2710                    container_id: info.id,
2711                    container_name: container_name.clone(),
2712                    image: info.image,
2713                    state: info.state,
2714                    pid: info.pid,
2715                    ports: ports.clone(),
2716                });
2717            }
2718        }
2719        out
2720    }
2721
2722    /// Resolve a Docker-style container name/id to a live deployment
2723    /// [`ContainerId`].
2724    ///
2725    /// Matching precedence (first hit wins):
2726    /// 1. The compose `container_name:` label (e.g. `forgejo-e2e`).
2727    /// 2. The conventional compose names `{deployment}-{service}-{replica}` and
2728    ///    `{deployment}_{service}_{replica}` (replica is 1-based, mirroring
2729    ///    Docker Compose; `ContainerId.replica` is 0-based so we add 1).
2730    /// 3. The bare service name (`{service}`), targeting its first replica.
2731    /// 4. The [`ContainerId`] `Display` form.
2732    ///
2733    /// Returns `None` when nothing matches a *running* container.
2734    pub async fn resolve_container_name(&self, name: &str) -> Option<ContainerId> {
2735        let views = self.list_container_views().await;
2736        // 1. explicit container_name label.
2737        if let Some(v) = views
2738            .iter()
2739            .find(|v| v.container_name.as_deref() == Some(name))
2740        {
2741            return Some(v.container_id.clone());
2742        }
2743        // 2 & 3. conventional names + bare service name.
2744        for v in &views {
2745            let dep = v.deployment.as_deref().unwrap_or("");
2746            let svc = &v.service;
2747            let rep1 = v.container_id.replica + 1;
2748            let candidates = [
2749                format!("{dep}-{svc}-{rep1}"),
2750                format!("{dep}_{svc}_{rep1}"),
2751                svc.clone(),
2752            ];
2753            if candidates.iter().any(|c| c == name) {
2754                return Some(v.container_id.clone());
2755            }
2756        }
2757        // 4. ContainerId Display form.
2758        for v in &views {
2759            if v.container_id.to_string() == name {
2760                return Some(v.container_id.clone());
2761            }
2762        }
2763        None
2764    }
2765
2766    /// Execute a command in a specific deployment container (by its concrete
2767    /// [`ContainerId`]) honouring Docker `exec` options (`--user`, `-w`, `-e`).
2768    ///
2769    /// Routes through [`Runtime::exec_with_opts`] so runtimes that support
2770    /// dropping to a uid/gid + chdir + env injection (macOS VZ-Linux) apply
2771    /// them; others fall back to a plain buffered exec.
2772    ///
2773    /// # Errors
2774    /// Propagates the runtime's exec error.
2775    pub async fn exec_in_container_id_with_opts(
2776        &self,
2777        id: &ContainerId,
2778        opts: &crate::runtime::ExecOptions,
2779    ) -> Result<(i32, String, String)> {
2780        self.runtime.exec_with_opts(id, opts).await
2781    }
2782
2783    // ==================== Job Management ====================
2784
2785    /// Trigger a job execution
2786    ///
2787    /// # Arguments
2788    /// * `name` - Name of the registered job
2789    /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
2790    ///
2791    /// # Returns
2792    /// The execution ID for tracking the job
2793    ///
2794    /// # Errors
2795    /// - Returns error if job executor is not configured
2796    /// - Returns error if the job is not registered
2797    pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
2798        let executor = self
2799            .job_executor
2800            .as_ref()
2801            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2802
2803        let spec = executor
2804            .get_job_spec(name)
2805            .await
2806            .ok_or_else(|| AgentError::NotFound {
2807                container: name.to_string(),
2808                reason: "job not registered".to_string(),
2809            })?;
2810
2811        executor.trigger(name, &spec, trigger).await
2812    }
2813
2814    /// Get the status of a job execution
2815    ///
2816    /// # Arguments
2817    /// * `id` - The execution ID returned from `trigger_job`
2818    ///
2819    /// # Returns
2820    /// The job execution details, or None if not found
2821    pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
2822        if let Some(executor) = &self.job_executor {
2823            executor.get_execution(id).await
2824        } else {
2825            None
2826        }
2827    }
2828
2829    /// List all executions for a specific job
2830    ///
2831    /// # Arguments
2832    /// * `name` - Name of the job
2833    ///
2834    /// # Returns
2835    /// Vector of job executions for the specified job
2836    pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
2837        if let Some(executor) = &self.job_executor {
2838            executor.list_executions(name).await
2839        } else {
2840            Vec::new()
2841        }
2842    }
2843
2844    /// Cancel a running job execution
2845    ///
2846    /// # Arguments
2847    /// * `id` - The execution ID to cancel
2848    ///
2849    /// # Errors
2850    /// Returns error if job executor is not configured or if cancellation fails
2851    pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
2852        let executor = self
2853            .job_executor
2854            .as_ref()
2855            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2856
2857        executor.cancel(id).await
2858    }
2859
2860    // ==================== Cron Management ====================
2861
2862    /// Manually trigger a cron job (outside of its schedule)
2863    ///
2864    /// # Arguments
2865    /// * `name` - Name of the cron job
2866    ///
2867    /// # Returns
2868    /// The execution ID for tracking the triggered job
2869    ///
2870    /// # Errors
2871    /// Returns error if cron scheduler is not configured or job not found
2872    pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
2873        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2874            AgentError::Configuration("Cron scheduler not configured".to_string())
2875        })?;
2876
2877        scheduler.trigger_now(name).await
2878    }
2879
2880    /// Enable or disable a cron job
2881    ///
2882    /// # Arguments
2883    /// * `name` - Name of the cron job
2884    /// * `enabled` - Whether to enable or disable the job
2885    pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
2886        if let Some(scheduler) = &self.cron_scheduler {
2887            scheduler.set_enabled(name, enabled).await;
2888        }
2889    }
2890
2891    /// List all registered cron jobs
2892    pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2893        if let Some(scheduler) = &self.cron_scheduler {
2894            scheduler.list_jobs().await
2895        } else {
2896            Vec::new()
2897        }
2898    }
2899
2900    /// Start the cron scheduler background task
2901    ///
2902    /// This spawns a background task that checks for due cron jobs every second.
2903    /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
2904    ///
2905    /// # Errors
2906    /// Returns error if cron scheduler is not configured
2907    pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2908        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2909            AgentError::Configuration("Cron scheduler not configured".to_string())
2910        })?;
2911
2912        let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2913        Ok(tokio::spawn(async move {
2914            scheduler.run_loop().await;
2915        }))
2916    }
2917
2918    /// Shutdown the cron scheduler
2919    pub fn shutdown_cron(&self) {
2920        if let Some(scheduler) = &self.cron_scheduler {
2921            scheduler.shutdown();
2922        }
2923    }
2924}
2925
2926#[cfg(test)]
2927#[allow(deprecated)]
2928mod tests {
2929    use super::*;
2930    use crate::runtime::MockRuntime;
2931
2932    #[tokio::test]
2933    async fn test_service_manager() {
2934        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2935        let manager = ServiceManager::new(runtime);
2936
2937        // Add service
2938        let spec = mock_spec();
2939        Box::pin(manager.upsert_service("test".to_string(), spec))
2940            .await
2941            .unwrap();
2942
2943        // Scale up
2944        manager.scale_service("test", 3).await.unwrap();
2945
2946        // Check count
2947        let count = manager.service_replica_count("test").await.unwrap();
2948        assert_eq!(count, 3);
2949
2950        // List services
2951        let services = manager.list_services().await;
2952        assert_eq!(services, vec!["test".to_string()]);
2953    }
2954
2955    #[tokio::test]
2956    async fn test_service_manager_basic_lifecycle() {
2957        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2958        let manager = ServiceManager::new(runtime);
2959
2960        // Add service with HTTP endpoint
2961        let spec = mock_spec();
2962        Box::pin(manager.upsert_service("api".to_string(), spec))
2963            .await
2964            .unwrap();
2965
2966        // Scale up
2967        manager.scale_service("api", 2).await.unwrap();
2968
2969        // Check count
2970        let count = manager.service_replica_count("api").await.unwrap();
2971        assert_eq!(count, 2);
2972
2973        // Remove service
2974        manager.remove_service("api").await.unwrap();
2975
2976        // Verify service is gone
2977        let services = manager.list_services().await;
2978        assert!(!services.contains(&"api".to_string()));
2979    }
2980
2981    #[tokio::test]
2982    async fn test_service_manager_with_full_config() {
2983        use tokio::sync::RwLock;
2984
2985        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2986
2987        // Create a mock overlay manager (skip actual network setup)
2988        let overlay_manager = Arc::new(RwLock::new(
2989            OverlayManager::new("test-deployment".to_string(), "test".to_string())
2990                .await
2991                .unwrap(),
2992        ));
2993
2994        let manager =
2995            ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2996
2997        // Add service
2998        let spec = mock_spec();
2999        Box::pin(manager.upsert_service("web".to_string(), spec))
3000            .await
3001            .unwrap();
3002
3003        // Verify service is registered
3004        let services = manager.list_services().await;
3005        assert!(services.contains(&"web".to_string()));
3006    }
3007
3008    #[test]
3009    fn test_container_state_as_str() {
3010        use crate::runtime::ContainerState;
3011        assert_eq!(ContainerState::Pending.as_str(), "pending");
3012        assert_eq!(ContainerState::Initializing.as_str(), "initializing");
3013        assert_eq!(ContainerState::Running.as_str(), "running");
3014        assert_eq!(ContainerState::Stopping.as_str(), "stopping");
3015        assert_eq!(ContainerState::Exited { code: 0 }.as_str(), "exited");
3016        assert_eq!(
3017            ContainerState::Failed {
3018                reason: "boom".to_string()
3019            }
3020            .as_str(),
3021            "failed"
3022        );
3023        // Display delegates to as_str.
3024        assert_eq!(ContainerState::Running.to_string(), "running");
3025    }
3026
3027    /// A container created from image X must report image X and its real
3028    /// lifecycle state through the new `container_infos` accessor, replacing
3029    /// the previously hardcoded `"running"` / empty-image behavior.
3030    #[tokio::test]
3031    async fn test_container_infos_surfaces_image_and_state() {
3032        use crate::runtime::{Container, ContainerState};
3033
3034        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3035        let manager = ServiceManager::new(runtime);
3036
3037        let spec = mock_spec(); // image name = "test:latest"
3038        let image = spec.image.name.to_string();
3039        Box::pin(manager.upsert_service("web".to_string(), spec))
3040            .await
3041            .unwrap();
3042
3043        // Inject containers directly with distinct states.
3044        {
3045            let services = manager.services.read().await;
3046            let instance = services.get("web").unwrap();
3047            let mut containers = instance.containers().write().await;
3048
3049            let running_id = ContainerId::new("web", 0);
3050            containers.insert(
3051                running_id.clone(),
3052                Container {
3053                    id: running_id,
3054                    image: image.clone(),
3055                    state: ContainerState::Running,
3056                    pid: Some(4242),
3057                    task: None,
3058                    overlay_ip: None,
3059                    health_monitor: None,
3060                    port_override: None,
3061                },
3062            );
3063
3064            let exited_id = ContainerId::new("web", 1);
3065            containers.insert(
3066                exited_id.clone(),
3067                Container {
3068                    id: exited_id,
3069                    image: image.clone(),
3070                    state: ContainerState::Exited { code: 1 },
3071                    pid: None,
3072                    task: None,
3073                    overlay_ip: None,
3074                    health_monitor: None,
3075                    port_override: None,
3076                },
3077            );
3078        }
3079
3080        let mut infos = manager.get_service_container_infos("web").await;
3081        infos.sort_by_key(|i| i.id.replica);
3082        assert_eq!(infos.len(), 2);
3083
3084        // Every container reports the real image it was created from.
3085        assert!(infos.iter().all(|i| i.image == image));
3086        assert!(infos.iter().all(|i| i.image == "test:latest"));
3087
3088        // Real per-container state is surfaced (not a hardcoded "running").
3089        assert_eq!(infos[0].state, "running");
3090        assert_eq!(infos[0].pid, Some(4242));
3091        assert_eq!(infos[1].state, "exited");
3092        assert_eq!(infos[1].pid, None);
3093
3094        // Unknown service yields an empty list.
3095        assert!(manager
3096            .get_service_container_infos("missing")
3097            .await
3098            .is_empty());
3099    }
3100
3101    /// Bug 2 (`cluster_upgrade`): a changed image *reference* (tag bump) under
3102    /// `if_not_present` must still recreate the local replicas. Previously the
3103    /// recreate only fired on digest drift under `Always`/`Newer`, so a tag
3104    /// change was silently ignored and containers stayed on the old image.
3105    #[tokio::test]
3106    async fn upsert_recreates_local_replicas_on_image_reference_change() {
3107        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3108        let manager = ServiceManager::new(runtime);
3109
3110        // Deploy v1 with the e2e's pull policy (if_not_present) and scale up.
3111        let mut spec = mock_spec();
3112        spec.image.name = "docker.io/library/nginx:1.28-alpine".parse().unwrap();
3113        spec.image.pull_policy = zlayer_spec::PullPolicy::IfNotPresent;
3114        Box::pin(manager.upsert_service("web".to_string(), spec.clone()))
3115            .await
3116            .unwrap();
3117        manager.scale_service_local("web", 2).await.unwrap();
3118
3119        let v1: Vec<String> = manager
3120            .get_service_container_infos("web")
3121            .await
3122            .into_iter()
3123            .map(|i| i.image)
3124            .collect();
3125        assert_eq!(v1.len(), 2);
3126        assert!(
3127            v1.iter().all(|img| img.contains("1.28")),
3128            "expected v1 images, got {v1:?}"
3129        );
3130
3131        // Upgrade to v2 under the SAME if_not_present policy.
3132        let mut spec_v2 = spec;
3133        spec_v2.image.name = "docker.io/library/nginx:1.29-alpine".parse().unwrap();
3134        Box::pin(manager.upsert_service("web".to_string(), spec_v2))
3135            .await
3136            .unwrap();
3137
3138        let v2: Vec<String> = manager
3139            .get_service_container_infos("web")
3140            .await
3141            .into_iter()
3142            .map(|i| i.image)
3143            .collect();
3144        assert_eq!(v2.len(), 2, "replica count preserved across upgrade");
3145        assert!(
3146            v2.iter().all(|img| img.contains("1.29")),
3147            "containers must be recreated on the new image, got {v2:?}"
3148        );
3149    }
3150
3151    fn mock_spec() -> ServiceSpec {
3152        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3153            r"
3154version: v1
3155deployment: test
3156services:
3157  test:
3158    rtype: service
3159    image:
3160      name: test:latest
3161    endpoints:
3162      - name: http
3163        protocol: http
3164        port: 8080
3165    scale:
3166      mode: fixed
3167      replicas: 1
3168",
3169        )
3170        .unwrap()
3171        .services
3172        .remove("test")
3173        .unwrap()
3174    }
3175
3176    /// Helper to create a `ServiceSpec` with dependencies
3177    fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
3178        let mut spec = mock_spec();
3179        spec.depends = deps;
3180        spec
3181    }
3182
3183    /// Helper to create a `DependsSpec`
3184    fn dep(
3185        service: &str,
3186        condition: zlayer_spec::DependencyCondition,
3187        timeout_ms: u64,
3188        on_timeout: zlayer_spec::TimeoutAction,
3189    ) -> DependsSpec {
3190        DependsSpec {
3191            service: service.to_string(),
3192            condition,
3193            timeout: Some(Duration::from_millis(timeout_ms)),
3194            on_timeout,
3195        }
3196    }
3197
3198    #[tokio::test]
3199    async fn test_deploy_with_dependencies_no_deps() {
3200        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3201        let manager = ServiceManager::new(runtime);
3202
3203        // Services with no dependencies
3204        let mut services = HashMap::new();
3205        services.insert("a".to_string(), mock_spec());
3206        services.insert("b".to_string(), mock_spec());
3207
3208        // Should deploy both without issue
3209        Box::pin(manager.deploy_with_dependencies(services))
3210            .await
3211            .unwrap();
3212
3213        // Both services should be registered
3214        let service_list = manager.list_services().await;
3215        assert_eq!(service_list.len(), 2);
3216    }
3217
3218    #[tokio::test]
3219    async fn test_deploy_with_dependencies_linear() {
3220        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3221        let manager = ServiceManager::new(runtime);
3222
3223        // A -> B -> C (A depends on B, B depends on C)
3224        // All use "started" condition which is satisfied when container is running
3225        let mut services = HashMap::new();
3226        services.insert("c".to_string(), mock_spec());
3227        services.insert(
3228            "b".to_string(),
3229            mock_spec_with_deps(vec![dep(
3230                "c",
3231                zlayer_spec::DependencyCondition::Started,
3232                5000,
3233                zlayer_spec::TimeoutAction::Fail,
3234            )]),
3235        );
3236        services.insert(
3237            "a".to_string(),
3238            mock_spec_with_deps(vec![dep(
3239                "b",
3240                zlayer_spec::DependencyCondition::Started,
3241                5000,
3242                zlayer_spec::TimeoutAction::Fail,
3243            )]),
3244        );
3245
3246        // Should deploy in order: c, b, a
3247        Box::pin(manager.deploy_with_dependencies(services))
3248            .await
3249            .unwrap();
3250
3251        // All services should be registered
3252        let service_list = manager.list_services().await;
3253        assert_eq!(service_list.len(), 3);
3254    }
3255
3256    #[tokio::test]
3257    async fn test_deploy_with_dependencies_cycle_detection() {
3258        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3259        let manager = ServiceManager::new(runtime);
3260
3261        // A -> B -> A (cycle)
3262        let mut services = HashMap::new();
3263        services.insert(
3264            "a".to_string(),
3265            mock_spec_with_deps(vec![dep(
3266                "b",
3267                zlayer_spec::DependencyCondition::Started,
3268                5000,
3269                zlayer_spec::TimeoutAction::Fail,
3270            )]),
3271        );
3272        services.insert(
3273            "b".to_string(),
3274            mock_spec_with_deps(vec![dep(
3275                "a",
3276                zlayer_spec::DependencyCondition::Started,
3277                5000,
3278                zlayer_spec::TimeoutAction::Fail,
3279            )]),
3280        );
3281
3282        // Should fail with cycle detection
3283        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3284        assert!(result.is_err());
3285        let err = result.unwrap_err().to_string();
3286        assert!(err.contains("Cyclic dependency"));
3287    }
3288
3289    #[tokio::test]
3290    async fn test_deploy_with_dependencies_timeout_continue() {
3291        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3292        let manager = ServiceManager::new(runtime);
3293
3294        // A depends on B (healthy), but B never becomes healthy
3295        // Using continue action, so it should proceed anyway
3296        let mut services = HashMap::new();
3297        services.insert("b".to_string(), mock_spec());
3298        services.insert(
3299            "a".to_string(),
3300            mock_spec_with_deps(vec![dep(
3301                "b",
3302                zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
3303                100,                                       // Short timeout
3304                zlayer_spec::TimeoutAction::Continue,      // But continue anyway
3305            )]),
3306        );
3307
3308        // Should deploy both despite timeout
3309        Box::pin(manager.deploy_with_dependencies(services))
3310            .await
3311            .unwrap();
3312
3313        let service_list = manager.list_services().await;
3314        assert_eq!(service_list.len(), 2);
3315    }
3316
3317    #[tokio::test]
3318    async fn test_deploy_with_dependencies_timeout_warn() {
3319        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3320        let manager = ServiceManager::new(runtime);
3321
3322        // A depends on B (healthy), but B never becomes healthy
3323        // Using warn action, so it should proceed with a warning
3324        let mut services = HashMap::new();
3325        services.insert("b".to_string(), mock_spec());
3326        services.insert(
3327            "a".to_string(),
3328            mock_spec_with_deps(vec![dep(
3329                "b",
3330                zlayer_spec::DependencyCondition::Healthy,
3331                100,
3332                zlayer_spec::TimeoutAction::Warn,
3333            )]),
3334        );
3335
3336        // Should deploy both despite timeout (with warning)
3337        Box::pin(manager.deploy_with_dependencies(services))
3338            .await
3339            .unwrap();
3340
3341        let service_list = manager.list_services().await;
3342        assert_eq!(service_list.len(), 2);
3343    }
3344
3345    #[tokio::test]
3346    async fn test_deploy_with_dependencies_timeout_fail() {
3347        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3348        let manager = ServiceManager::new(runtime);
3349
3350        // A depends on B (healthy), but B never becomes healthy
3351        // Using fail action, so deployment should fail
3352        let mut services = HashMap::new();
3353        services.insert("b".to_string(), mock_spec());
3354        services.insert(
3355            "a".to_string(),
3356            mock_spec_with_deps(vec![dep(
3357                "b",
3358                zlayer_spec::DependencyCondition::Healthy,
3359                100,
3360                zlayer_spec::TimeoutAction::Fail,
3361            )]),
3362        );
3363
3364        // Should fail after B is started but doesn't become healthy
3365        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3366        assert!(result.is_err());
3367
3368        // B should be started (it has no deps), but A should fail
3369        let err = result.unwrap_err().to_string();
3370        assert!(err.contains("Dependency timeout"));
3371    }
3372
3373    #[tokio::test]
3374    async fn test_check_dependencies_all_satisfied() {
3375        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3376        let manager = ServiceManager::new(runtime);
3377
3378        // Mark a service as healthy
3379        manager
3380            .update_health_state("db", HealthState::Healthy)
3381            .await;
3382
3383        let deps = vec![DependsSpec {
3384            service: "db".to_string(),
3385            condition: zlayer_spec::DependencyCondition::Healthy,
3386            timeout: Some(Duration::from_secs(60)),
3387            on_timeout: zlayer_spec::TimeoutAction::Fail,
3388        }];
3389
3390        let satisfied = manager.check_dependencies(&deps).await.unwrap();
3391        assert!(satisfied);
3392    }
3393
3394    #[tokio::test]
3395    async fn test_check_dependencies_not_satisfied() {
3396        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3397        let manager = ServiceManager::new(runtime);
3398
3399        // Service not healthy (no state set = Unknown)
3400        let deps = vec![DependsSpec {
3401            service: "db".to_string(),
3402            condition: zlayer_spec::DependencyCondition::Healthy,
3403            timeout: Some(Duration::from_secs(60)),
3404            on_timeout: zlayer_spec::TimeoutAction::Fail,
3405        }];
3406
3407        let satisfied = manager.check_dependencies(&deps).await.unwrap();
3408        assert!(!satisfied);
3409    }
3410
3411    #[tokio::test]
3412    async fn test_health_state_tracking() {
3413        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3414        let manager = ServiceManager::new(runtime);
3415
3416        // Update health states
3417        manager
3418            .update_health_state("db", HealthState::Healthy)
3419            .await;
3420        manager
3421            .update_health_state("cache", HealthState::Unknown)
3422            .await;
3423
3424        // Verify states
3425        let states = manager.health_states();
3426        let states_read = states.read().await;
3427
3428        assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
3429        assert!(matches!(
3430            states_read.get("cache"),
3431            Some(HealthState::Unknown)
3432        ));
3433    }
3434
3435    /// Regression test for the stabilization timeout that blocked the raft-e2e
3436    /// `cluster_scaling` / `cluster_upgrade` suites.
3437    ///
3438    /// Previously the callback that bridges a container's health result into the
3439    /// `ServiceManager` `health_states` map was only attached when BOTH a proxy
3440    /// manager AND a reachable overlay IP existed. In degraded-overlay / no-proxy
3441    /// deployments that `if let` was false, so `health_states` was never written,
3442    /// the service stayed `healthy=false` forever, and stabilization timed out
3443    /// even though the container was running and its health check passing.
3444    ///
3445    /// This test drives the real `scale_to` create path with:
3446    ///   * NO `proxy_manager` (so `proxy_backend` resolves to None), and
3447    ///   * a `Command { command: "true" }` health check (always passes host-side),
3448    /// then asserts the shared `health_states` map receives `Healthy` for the
3449    /// service — proving the bridge fires unconditionally.
3450    ///
3451    /// Gated to `#[cfg(unix)]` because `HealthCheck::Command` is executed via
3452    /// `sh -c <command>` in `crate::health::HealthChecker::check_command`. On
3453    /// Windows hosts without `sh` on PATH (the default Windows CI image), no
3454    /// Command-based health check can ever pass, so the test would fail for
3455    /// reasons unrelated to the bridge it is regression-testing. The bridge
3456    /// behavior under test is platform-agnostic; only the test fixture's
3457    /// "always-passes command" needs a Unix shell.
3458    #[cfg(unix)]
3459    #[tokio::test]
3460    async fn test_health_states_bridge_fires_without_proxy() {
3461        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3462
3463        // Service spec with a host-side command health check that always passes.
3464        // Zero start-grace + a short interval keep the test fast.
3465        let mut spec = mock_spec();
3466        spec.health = zlayer_spec::HealthSpec {
3467            start_grace: Some(Duration::from_millis(0)),
3468            interval: Some(Duration::from_millis(50)),
3469            timeout: Some(Duration::from_secs(5)),
3470            retries: 1,
3471            check: HealthCheck::Command {
3472                command: "true".to_string(),
3473            },
3474        };
3475
3476        // Build a ServiceInstance with NO proxy_manager and NO overlay_manager,
3477        // then wire in the shared health_states map exactly as ServiceManager does.
3478        let mut instance =
3479            ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3480        let health_states: Arc<RwLock<HashMap<String, HealthState>>> =
3481            Arc::new(RwLock::new(HashMap::new()));
3482        instance.set_health_states(Arc::clone(&health_states));
3483
3484        // Drive the real create path (no proxy, MockRuntime IP present but proxy
3485        // absent => proxy_backend is None, hitting the previously-broken branch).
3486        instance.scale_to(1).await.unwrap();
3487
3488        // Poll for the bridged Healthy state (the monitor checks asynchronously
3489        // after its start grace). Bounded so a regression fails fast.
3490        let mut bridged = false;
3491        for _ in 0..100 {
3492            if matches!(
3493                health_states.read().await.get("web"),
3494                Some(HealthState::Healthy)
3495            ) {
3496                bridged = true;
3497                break;
3498            }
3499            tokio::time::sleep(Duration::from_millis(50)).await;
3500        }
3501
3502        assert!(
3503            bridged,
3504            "health_states must receive Healthy for the service even without a \
3505             proxy or overlay IP; the bridge regressed and stabilization would time out"
3506        );
3507    }
3508
3509    // ==================== Job/Cron Integration Tests ====================
3510
3511    fn mock_job_spec() -> ServiceSpec {
3512        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3513            r"
3514version: v1
3515deployment: test
3516services:
3517  backup:
3518    rtype: job
3519    image:
3520      name: backup:latest
3521",
3522        )
3523        .unwrap()
3524        .services
3525        .remove("backup")
3526        .unwrap()
3527    }
3528
3529    fn mock_cron_spec() -> ServiceSpec {
3530        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3531            r#"
3532version: v1
3533deployment: test
3534services:
3535  cleanup:
3536    rtype: cron
3537    schedule: "0 0 * * * * *"
3538    image:
3539      name: cleanup:latest
3540"#,
3541        )
3542        .unwrap()
3543        .services
3544        .remove("cleanup")
3545        .unwrap()
3546    }
3547
3548    #[tokio::test]
3549    async fn test_service_manager_with_job_executor() {
3550        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3551        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3552
3553        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3554
3555        // Register job
3556        let job_spec = mock_job_spec();
3557        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3558            .await
3559            .unwrap();
3560
3561        // Trigger job
3562        let exec_id = manager
3563            .trigger_job("backup", JobTrigger::Cli)
3564            .await
3565            .unwrap();
3566
3567        // Give job time to start
3568        tokio::time::sleep(Duration::from_millis(50)).await;
3569
3570        // Check execution exists
3571        let execution = manager.get_job_execution(&exec_id).await;
3572        assert!(execution.is_some());
3573        assert_eq!(execution.unwrap().job_name, "backup");
3574    }
3575
3576    #[tokio::test]
3577    async fn test_service_manager_with_cron_scheduler() {
3578        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3579        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3580        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3581
3582        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3583
3584        // Register cron job
3585        let cron_spec = mock_cron_spec();
3586        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3587            .await
3588            .unwrap();
3589
3590        // List cron jobs
3591        let cron_jobs = manager.list_cron_jobs().await;
3592        assert_eq!(cron_jobs.len(), 1);
3593        assert_eq!(cron_jobs[0].name, "cleanup");
3594        assert!(cron_jobs[0].enabled);
3595    }
3596
3597    #[tokio::test]
3598    async fn test_service_manager_trigger_cron() {
3599        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3600        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3601        let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
3602
3603        let manager = ServiceManager::new(runtime)
3604            .with_job_executor(job_executor)
3605            .with_cron_scheduler(cron_scheduler);
3606
3607        // Register cron job
3608        let cron_spec = mock_cron_spec();
3609        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3610            .await
3611            .unwrap();
3612
3613        // Manually trigger the cron job
3614        let exec_id = manager.trigger_cron("cleanup").await.unwrap();
3615        assert!(!exec_id.0.is_empty());
3616    }
3617
3618    #[tokio::test]
3619    async fn test_service_manager_enable_disable_cron() {
3620        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3621        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3622        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3623
3624        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3625
3626        // Register cron job
3627        let cron_spec = mock_cron_spec();
3628        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3629            .await
3630            .unwrap();
3631
3632        // Initially enabled
3633        let cron_jobs = manager.list_cron_jobs().await;
3634        assert!(cron_jobs[0].enabled);
3635
3636        // Disable
3637        manager.set_cron_enabled("cleanup", false).await;
3638        let cron_jobs = manager.list_cron_jobs().await;
3639        assert!(!cron_jobs[0].enabled);
3640
3641        // Re-enable
3642        manager.set_cron_enabled("cleanup", true).await;
3643        let cron_jobs = manager.list_cron_jobs().await;
3644        assert!(cron_jobs[0].enabled);
3645    }
3646
3647    #[tokio::test]
3648    async fn test_service_manager_remove_cleans_up_job() {
3649        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3650        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3651
3652        let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
3653
3654        // Register job
3655        let job_spec = mock_job_spec();
3656        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3657            .await
3658            .unwrap();
3659
3660        // Verify job is registered
3661        let spec = job_executor.get_job_spec("backup").await;
3662        assert!(spec.is_some());
3663
3664        // Remove job
3665        manager.remove_service("backup").await.unwrap();
3666
3667        // Verify job is unregistered
3668        let spec = job_executor.get_job_spec("backup").await;
3669        assert!(spec.is_none());
3670    }
3671
3672    #[tokio::test]
3673    async fn test_service_manager_remove_cleans_up_cron() {
3674        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3675        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3676        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3677
3678        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
3679
3680        // Register cron job
3681        let cron_spec = mock_cron_spec();
3682        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3683            .await
3684            .unwrap();
3685
3686        // Verify cron job is registered
3687        assert_eq!(cron_scheduler.job_count().await, 1);
3688
3689        // Remove cron job
3690        manager.remove_service("cleanup").await.unwrap();
3691
3692        // Verify cron job is unregistered
3693        assert_eq!(cron_scheduler.job_count().await, 0);
3694    }
3695
3696    #[tokio::test]
3697    async fn test_service_manager_job_without_executor() {
3698        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3699        let manager = ServiceManager::new(runtime);
3700
3701        // Try to trigger job without executor configured
3702        let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
3703        assert!(result.is_err());
3704        assert!(result.unwrap_err().to_string().contains("not configured"));
3705    }
3706
3707    #[tokio::test]
3708    async fn test_service_manager_cron_without_scheduler() {
3709        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3710        let manager = ServiceManager::new(runtime);
3711
3712        // Try to register cron job without scheduler configured
3713        let cron_spec = mock_cron_spec();
3714        let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
3715        assert!(result.is_err());
3716        assert!(result.unwrap_err().to_string().contains("not configured"));
3717    }
3718
3719    #[tokio::test]
3720    async fn test_service_manager_list_job_executions() {
3721        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3722        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3723
3724        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3725
3726        // Register job
3727        let job_spec = mock_job_spec();
3728        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3729            .await
3730            .unwrap();
3731
3732        // Trigger job twice
3733        manager
3734            .trigger_job("backup", JobTrigger::Cli)
3735            .await
3736            .unwrap();
3737        manager
3738            .trigger_job("backup", JobTrigger::Scheduler)
3739            .await
3740            .unwrap();
3741
3742        // Give jobs time to start
3743        tokio::time::sleep(Duration::from_millis(50)).await;
3744
3745        // List executions
3746        let executions = manager.list_job_executions("backup").await;
3747        assert_eq!(executions.len(), 2);
3748    }
3749
3750    // ==================== Container Supervisor Integration Tests ====================
3751
3752    #[tokio::test]
3753    async fn test_service_manager_with_supervisor() {
3754        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3755        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3756
3757        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
3758
3759        // Add service
3760        let spec = mock_spec();
3761        Box::pin(manager.upsert_service("api".to_string(), spec))
3762            .await
3763            .unwrap();
3764
3765        // Scale up - containers should be registered with supervisor
3766        manager.scale_service("api", 2).await.unwrap();
3767
3768        // Verify containers are supervised
3769        assert_eq!(supervisor.supervised_count().await, 2);
3770
3771        // Scale down - containers should be unregistered
3772        manager.scale_service("api", 1).await.unwrap();
3773        assert_eq!(supervisor.supervised_count().await, 1);
3774
3775        // Remove service - remaining containers should be unregistered
3776        manager.remove_service("api").await.unwrap();
3777        assert_eq!(supervisor.supervised_count().await, 0);
3778    }
3779
3780    #[tokio::test]
3781    async fn test_service_manager_supervisor_state() {
3782        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3783        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3784
3785        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
3786
3787        // Add and scale service
3788        let spec = mock_spec();
3789        Box::pin(manager.upsert_service("web".to_string(), spec))
3790            .await
3791            .unwrap();
3792        manager.scale_service("web", 1).await.unwrap();
3793
3794        // Check supervised state
3795        let container_id = ContainerId::new("web".to_string(), 1);
3796        let state = manager.get_container_supervised_state(&container_id).await;
3797        assert_eq!(state, Some(SupervisedState::Running));
3798    }
3799
3800    #[tokio::test]
3801    async fn test_service_manager_start_supervisor() {
3802        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3803        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3804
3805        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
3806
3807        // Start the supervisor
3808        let handle = manager.start_container_supervisor().unwrap();
3809
3810        // Give it time to start
3811        tokio::time::sleep(Duration::from_millis(50)).await;
3812        assert!(supervisor.is_running());
3813
3814        // Shutdown
3815        manager.shutdown_container_supervisor();
3816
3817        // Wait for it to stop
3818        tokio::time::timeout(Duration::from_secs(1), handle)
3819            .await
3820            .unwrap()
3821            .unwrap();
3822
3823        assert!(!supervisor.is_running());
3824    }
3825
3826    #[tokio::test]
3827    async fn test_service_manager_supervisor_not_configured() {
3828        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3829        let manager = ServiceManager::new(runtime);
3830
3831        // Try to start supervisor without configuring it
3832        let result = manager.start_container_supervisor();
3833        assert!(result.is_err());
3834        assert!(result.unwrap_err().to_string().contains("not configured"));
3835    }
3836
3837    // ==================== Stream Registry Integration Tests ====================
3838
3839    fn mock_tcp_spec() -> ServiceSpec {
3840        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3841            r"
3842version: v1
3843deployment: test
3844services:
3845  database:
3846    rtype: service
3847    image:
3848      name: postgres:latest
3849    endpoints:
3850      - name: postgresql
3851        protocol: tcp
3852        port: 5432
3853    scale:
3854      mode: fixed
3855      replicas: 1
3856",
3857        )
3858        .unwrap()
3859        .services
3860        .remove("database")
3861        .unwrap()
3862    }
3863
3864    fn mock_udp_spec() -> ServiceSpec {
3865        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3866            r"
3867version: v1
3868deployment: test
3869services:
3870  dns:
3871    rtype: service
3872    image:
3873      name: dns:latest
3874    endpoints:
3875      - name: dns
3876        protocol: udp
3877        port: 53
3878    scale:
3879      mode: fixed
3880      replicas: 1
3881",
3882        )
3883        .unwrap()
3884        .services
3885        .remove("dns")
3886        .unwrap()
3887    }
3888
3889    fn mock_mixed_spec() -> ServiceSpec {
3890        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3891            r"
3892version: v1
3893deployment: test
3894services:
3895  mixed:
3896    rtype: service
3897    image:
3898      name: mixed:latest
3899    endpoints:
3900      - name: http
3901        protocol: http
3902        port: 8080
3903      - name: grpc
3904        protocol: tcp
3905        port: 9000
3906      - name: metrics
3907        protocol: udp
3908        port: 8125
3909    scale:
3910      mode: fixed
3911      replicas: 1
3912",
3913        )
3914        .unwrap()
3915        .services
3916        .remove("mixed")
3917        .unwrap()
3918    }
3919
3920    #[tokio::test]
3921    async fn test_service_manager_with_stream_registry_tcp() {
3922        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3923        let stream_registry = Arc::new(StreamRegistry::new());
3924
3925        let mut manager = ServiceManager::new(runtime);
3926        manager.set_stream_registry(stream_registry.clone());
3927        manager.set_deployment_name("test".to_string());
3928
3929        // Add TCP-only service
3930        let spec = mock_tcp_spec();
3931        Box::pin(manager.upsert_service("database".to_string(), spec))
3932            .await
3933            .unwrap();
3934
3935        // Verify TCP route was registered
3936        assert_eq!(stream_registry.tcp_count(), 1);
3937        assert!(stream_registry.tcp_ports().contains(&5432));
3938
3939        // Remove service and verify cleanup
3940        manager.remove_service("database").await.unwrap();
3941        assert_eq!(stream_registry.tcp_count(), 0);
3942    }
3943
3944    #[tokio::test]
3945    async fn test_service_manager_with_stream_registry_udp() {
3946        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3947        let stream_registry = Arc::new(StreamRegistry::new());
3948
3949        let mut manager = ServiceManager::new(runtime);
3950        manager.set_stream_registry(stream_registry.clone());
3951        manager.set_deployment_name("test".to_string());
3952
3953        // Add UDP-only service
3954        let spec = mock_udp_spec();
3955        Box::pin(manager.upsert_service("dns".to_string(), spec))
3956            .await
3957            .unwrap();
3958
3959        // Verify UDP route was registered
3960        assert_eq!(stream_registry.udp_count(), 1);
3961        assert!(stream_registry.udp_ports().contains(&53));
3962
3963        // Remove service and verify cleanup
3964        manager.remove_service("dns").await.unwrap();
3965        assert_eq!(stream_registry.udp_count(), 0);
3966    }
3967
3968    #[tokio::test]
3969    async fn test_service_manager_with_stream_registry_mixed() {
3970        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3971        let stream_registry = Arc::new(StreamRegistry::new());
3972
3973        let mut manager = ServiceManager::new(runtime);
3974        manager.set_stream_registry(stream_registry.clone());
3975        manager.set_deployment_name("test".to_string());
3976
3977        // Add mixed service (HTTP + TCP + UDP)
3978        let spec = mock_mixed_spec();
3979        Box::pin(manager.upsert_service("mixed".to_string(), spec))
3980            .await
3981            .unwrap();
3982
3983        // Verify stream routes were registered
3984        assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
3985        assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
3986
3987        assert!(stream_registry.tcp_ports().contains(&9000));
3988        assert!(stream_registry.udp_ports().contains(&8125));
3989
3990        // Remove service and verify stream cleanup
3991        manager.remove_service("mixed").await.unwrap();
3992        assert_eq!(stream_registry.tcp_count(), 0);
3993        assert_eq!(stream_registry.udp_count(), 0);
3994    }
3995
3996    #[tokio::test]
3997    async fn test_service_manager_stream_registry_builder() {
3998        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3999        let stream_registry = Arc::new(StreamRegistry::new());
4000
4001        // Test builder pattern
4002        let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
4003
4004        // Verify stream registry is accessible
4005        assert!(manager.stream_registry().is_some());
4006    }
4007
4008    #[tokio::test]
4009    async fn test_tcp_service_without_stream_registry() {
4010        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4011
4012        // Manager without stream registry
4013        let mut manager = ServiceManager::new(runtime);
4014        manager.set_deployment_name("test".to_string());
4015
4016        // Add TCP service - should log warning but not fail
4017        let spec = mock_tcp_spec();
4018        Box::pin(manager.upsert_service("database".to_string(), spec))
4019            .await
4020            .unwrap();
4021
4022        // No stream registry to check, but service should be tracked
4023        let services = manager.list_services().await;
4024        assert!(services.contains(&"database".to_string()));
4025    }
4026
4027    /// Verify `collect_endpoint_backends` filters containers by
4028    /// `EndpointSpec.target_role`.
4029    ///
4030    /// Given two replica groups (`primary` × 1, `read` × 2) and two
4031    /// endpoints — one with `target_role: primary` and one with
4032    /// `target_role: read` — each endpoint should receive only the
4033    /// matching containers' overlay addresses. The legacy no-filter
4034    /// endpoint (`target_role: None`) should receive all of them.
4035    #[tokio::test]
4036    #[allow(clippy::too_many_lines)]
4037    async fn test_collect_endpoint_backends_respects_target_role() {
4038        use crate::runtime::Container;
4039        use std::collections::HashMap as StdHashMap;
4040        use std::net::{IpAddr, Ipv4Addr};
4041        use zlayer_spec::{
4042            EndpointSpec, ExposeType, GroupAffinity, Protocol, ReplicaGroup, ScaleSpec,
4043        };
4044
4045        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4046        let manager = ServiceManager::new(runtime.clone());
4047
4048        // Build a spec with replica_groups and three endpoints:
4049        // - "write" targets role "primary"
4050        // - "read" targets role "read"
4051        // - "any" has no target_role (legacy)
4052        let mut spec = mock_spec();
4053        spec.replica_groups = Some(vec![
4054            ReplicaGroup {
4055                role: "primary".to_string(),
4056                count: 1,
4057                image: None,
4058                env: StdHashMap::new(),
4059                command: None,
4060                resources: None,
4061                affinity: GroupAffinity::default(),
4062            },
4063            ReplicaGroup {
4064                role: "read".to_string(),
4065                count: 2,
4066                image: None,
4067                env: StdHashMap::new(),
4068                command: None,
4069                resources: None,
4070                affinity: GroupAffinity::default(),
4071            },
4072        ]);
4073        spec.scale = ScaleSpec::Fixed { replicas: 3 };
4074        spec.endpoints = vec![
4075            EndpointSpec {
4076                name: "write".to_string(),
4077                protocol: Protocol::Tcp,
4078                port: 5432,
4079                target_port: Some(5432),
4080                path: None,
4081                host: None,
4082                expose: ExposeType::Internal,
4083                stream: None,
4084                tunnel: None,
4085                target_role: Some("primary".to_string()),
4086            },
4087            EndpointSpec {
4088                name: "read".to_string(),
4089                protocol: Protocol::Tcp,
4090                port: 5433,
4091                target_port: Some(5432),
4092                path: None,
4093                host: None,
4094                expose: ExposeType::Internal,
4095                stream: None,
4096                tunnel: None,
4097                target_role: Some("read".to_string()),
4098            },
4099            EndpointSpec {
4100                name: "any".to_string(),
4101                protocol: Protocol::Tcp,
4102                port: 5434,
4103                target_port: Some(5432),
4104                path: None,
4105                host: None,
4106                expose: ExposeType::Internal,
4107                stream: None,
4108                tunnel: None,
4109                target_role: None,
4110            },
4111        ];
4112
4113        let instance = ServiceInstance::new(
4114            "postgres".to_string(),
4115            spec.clone(),
4116            runtime,
4117            None, // overlay_manager — not exercised by this test
4118        );
4119
4120        // Inject three containers directly: one primary, two read replicas.
4121        let cid_primary = ContainerId::with_role_and_node("postgres", 1, "primary", 0);
4122        let cid_first_read = ContainerId::with_role_and_node("postgres", 2, "read", 0);
4123        let cid_second_read = ContainerId::with_role_and_node("postgres", 3, "read", 0);
4124
4125        let ip_primary = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
4126        let ip_first_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2));
4127        let ip_second_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3));
4128
4129        {
4130            let mut containers = instance.containers().write().await;
4131            containers.insert(
4132                cid_primary.clone(),
4133                Container {
4134                    id: cid_primary,
4135                    image: spec.image.name.to_string(),
4136                    state: crate::runtime::ContainerState::Running,
4137                    pid: None,
4138                    task: None,
4139                    overlay_ip: Some(ip_primary),
4140                    health_monitor: None,
4141                    port_override: None,
4142                },
4143            );
4144            containers.insert(
4145                cid_first_read.clone(),
4146                Container {
4147                    id: cid_first_read,
4148                    image: spec.image.name.to_string(),
4149                    state: crate::runtime::ContainerState::Running,
4150                    pid: None,
4151                    task: None,
4152                    overlay_ip: Some(ip_first_read),
4153                    health_monitor: None,
4154                    port_override: None,
4155                },
4156            );
4157            containers.insert(
4158                cid_second_read.clone(),
4159                Container {
4160                    id: cid_second_read,
4161                    image: spec.image.name.to_string(),
4162                    state: crate::runtime::ContainerState::Running,
4163                    pid: None,
4164                    task: None,
4165                    overlay_ip: Some(ip_second_read),
4166                    health_monitor: None,
4167                    port_override: None,
4168                },
4169            );
4170        }
4171
4172        let write_ep = &spec.endpoints[0];
4173        let read_ep = &spec.endpoints[1];
4174        let any_ep = &spec.endpoints[2];
4175
4176        let write_backends = manager.collect_endpoint_backends(&instance, write_ep).await;
4177        let read_backends = manager.collect_endpoint_backends(&instance, read_ep).await;
4178        let any_backends = manager.collect_endpoint_backends(&instance, any_ep).await;
4179
4180        // write endpoint -> only the primary container
4181        assert_eq!(write_backends.len(), 1, "write should match only primary");
4182        assert!(
4183            write_backends.iter().any(|a| a.ip() == ip_primary),
4184            "write backends missing primary IP: {write_backends:?}"
4185        );
4186
4187        // read endpoint -> both read containers, no primary
4188        assert_eq!(
4189            read_backends.len(),
4190            2,
4191            "read should match both read replicas"
4192        );
4193        assert!(read_backends.iter().any(|a| a.ip() == ip_first_read));
4194        assert!(read_backends.iter().any(|a| a.ip() == ip_second_read));
4195        assert!(
4196            !read_backends.iter().any(|a| a.ip() == ip_primary),
4197            "read backends must not contain primary: {read_backends:?}"
4198        );
4199
4200        // legacy endpoint (target_role = None) -> every container
4201        assert_eq!(
4202            any_backends.len(),
4203            3,
4204            "any-role endpoint should see all containers"
4205        );
4206    }
4207}