Skip to main content

zlayer_agent/
service.rs

1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6    DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{
24    effective_pull_policy, DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType,
25    ServiceSpec,
26};
27
28/// Service instance manages a single service's containers
29pub struct ServiceInstance {
30    pub service_name: String,
31    pub spec: ServiceSpec,
32    runtime: Arc<dyn Runtime + Send + Sync>,
33    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
34    /// Overlay network manager for container networking (optional, not needed for Docker runtime)
35    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
36    /// Proxy manager for updating backend health (optional)
37    proxy_manager: Option<Arc<ProxyManager>>,
38    /// DNS server for service discovery (optional)
39    dns_server: Option<Arc<DnsServer>>,
40    /// Shared health states map so callbacks can update ServiceManager-level health
41    health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
42    /// Most recently observed image digest after a successful pull. Used by
43    /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
44    /// requiring callers to track digest state externally. Wrapped in a
45    /// `RwLock` so `&self` methods (`scale_to`) can update it.
46    last_pulled_digest: tokio::sync::RwLock<Option<String>>,
47}
48
49impl ServiceInstance {
50    /// Create a new service instance
51    pub fn new(
52        service_name: String,
53        spec: ServiceSpec,
54        runtime: Arc<dyn Runtime + Send + Sync>,
55        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
56    ) -> Self {
57        Self {
58            service_name,
59            spec,
60            runtime,
61            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
62            overlay_manager,
63            proxy_manager: None,
64            dns_server: None,
65            health_states: None,
66            last_pulled_digest: tokio::sync::RwLock::new(None),
67        }
68    }
69
70    /// Create a new service instance with proxy manager for health-aware load balancing
71    pub fn with_proxy(
72        service_name: String,
73        spec: ServiceSpec,
74        runtime: Arc<dyn Runtime + Send + Sync>,
75        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
76        proxy_manager: Arc<ProxyManager>,
77    ) -> Self {
78        Self {
79            service_name,
80            spec,
81            runtime,
82            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
83            overlay_manager,
84            proxy_manager: Some(proxy_manager),
85            dns_server: None,
86            health_states: None,
87            last_pulled_digest: tokio::sync::RwLock::new(None),
88        }
89    }
90
91    /// Builder method to add DNS server for service discovery
92    #[must_use]
93    pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
94        self.dns_server = Some(dns_server);
95        self
96    }
97
98    /// Set the DNS server for service discovery
99    pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
100        self.dns_server = Some(dns_server);
101    }
102
103    /// Set the proxy manager for health-aware load balancing
104    pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
105        self.proxy_manager = Some(proxy_manager);
106    }
107
108    /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
109    pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
110        self.health_states = Some(states);
111    }
112
113    /// Get the last observed image digest (after the most recent successful
114    /// pull). Returns `None` when no pull has happened yet, when the runtime
115    /// does not expose digests, or when no matching `ImageInfo` was found.
116    pub async fn last_pulled_digest(&self) -> Option<String> {
117        self.last_pulled_digest.read().await.clone()
118    }
119
120    /// Pull the service image using `effective_pull_policy` (so a default
121    /// `IfNotPresent` on a `:latest` tag auto-upgrades to `Newer`) and refresh
122    /// the cached digest from `Runtime::list_images` when the runtime exposes
123    /// it. Returns the digest observed after the pull, when known.
124    ///
125    /// `Never` skips the pull entirely; the cached digest is returned
126    /// unchanged.
127    async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
128        let image_str = self.spec.image.name.to_string();
129        let effective = effective_pull_policy(&self.spec.image.name, self.spec.image.pull_policy);
130
131        if matches!(effective, PullPolicy::Never) {
132            return Ok(self.last_pulled_digest.read().await.clone());
133        }
134
135        self.runtime
136            .pull_image_with_policy(&image_str, effective, None)
137            .await
138            .map_err(|e| AgentError::PullFailed {
139                image: self.spec.image.name.to_string(),
140                reason: e.to_string(),
141            })?;
142
143        // Best-effort: try to discover the resolved digest via list_images.
144        // Runtimes that don't support introspection (Unsupported) leave the
145        // cached digest unchanged; drift detection then falls back to "always
146        // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
147        // when no digests are known".
148        let new_digest = match self.runtime.list_images().await {
149            Ok(images) => images
150                .into_iter()
151                .find(|info| info.reference == image_str)
152                .and_then(|info| info.digest),
153            Err(e) => {
154                tracing::debug!(
155                    image = %image_str,
156                    error = %e,
157                    "list_images unavailable; cannot record post-pull digest"
158                );
159                None
160            }
161        };
162
163        if let Some(ref digest) = new_digest {
164            *self.last_pulled_digest.write().await = Some(digest.clone());
165        }
166
167        Ok(new_digest)
168    }
169
170    /// Scale to the desired number of replicas
171    ///
172    /// This method uses short-lived locks to avoid blocking concurrent operations.
173    /// I/O operations (pull, create, start, stop, remove) are performed without
174    /// holding the containers lock to allow other operations to proceed.
175    ///
176    /// # Errors
177    /// Returns an error if image pull, container creation, or container lifecycle operations fail.
178    #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
179    pub async fn scale_to(&self, replicas: u32) -> Result<()> {
180        // Phase 1: Determine current state (short read lock)
181        let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
182
183        // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
184        // here with replicas == current_replicas in the steady state) actually
185        // refreshes the cached digest. We skip the pull when scaling strictly
186        // down (no new containers needed) and when policy is `Never`. Cached
187        // layers make this cheap when nothing changed.
188        let effective = effective_pull_policy(&self.spec.image.name, self.spec.image.pull_policy);
189        if replicas >= current_replicas && !matches!(effective, PullPolicy::Never) {
190            let _ = self.pull_and_refresh_digest().await?;
191        }
192
193        // Phase 2: Scale up - create new containers (no lock held during I/O)
194        if replicas > current_replicas {
195            for i in current_replicas..replicas {
196                let id = ContainerId {
197                    service: self.service_name.clone(),
198                    replica: i + 1,
199                };
200
201                // Create container (no lock needed - I/O operation)
202                //
203                // RouteToPeer must propagate unchanged: the scheduler uses it
204                // to re-place the workload on a capable peer, and wrapping it
205                // in `CreateFailed` would hide the signal and mark the service
206                // dead instead of rescheduling it. All other errors are
207                // normalised to `CreateFailed` for upstream handling.
208                self.runtime
209                    .create_container(&id, &self.spec)
210                    .await
211                    .map_err(|e| match e {
212                        AgentError::RouteToPeer { .. } => e,
213                        other => AgentError::CreateFailed {
214                            id: id.to_string(),
215                            reason: other.to_string(),
216                        },
217                    })?;
218
219                // Run init actions with error policy enforcement (no lock needed)
220                let init_orchestrator = InitOrchestrator::with_error_policy(
221                    id.clone(),
222                    self.spec.init.clone(),
223                    self.spec.errors.clone(),
224                );
225                init_orchestrator.run().await?;
226
227                // Start container (no lock needed - I/O operation)
228                self.runtime
229                    .start_container(&id)
230                    .await
231                    .map_err(|e| AgentError::StartFailed {
232                        id: id.to_string(),
233                        reason: e.to_string(),
234                    })?;
235
236                // Get container PID with retries (may not be immediately available)
237                let mut container_pid = None;
238                for attempt in 1..=5u32 {
239                    match self.runtime.get_container_pid(&id).await {
240                        Ok(Some(pid)) => {
241                            container_pid = Some(pid);
242                            break;
243                        }
244                        Ok(None) if attempt < 5 => {
245                            tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
246                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
247                        }
248                        Ok(None) => {
249                            tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
250                        }
251                        Err(e) => {
252                            tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
253                            if attempt < 5 {
254                                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
255                            }
256                        }
257                    }
258                }
259
260                // Verify the container is still running before attempting
261                // overlay attach. If the init process crashed during start
262                // (bad image, missing libs, failed mount), the PID above is
263                // now dead and every `ip link set ... netns {pid}` will
264                // return a cryptic RTNETLINK error. Surface the real cause
265                // from the container's log tail instead of the cascade.
266                if container_pid.is_some() {
267                    let alive = match self.runtime.container_state(&id).await {
268                        Ok(
269                            ContainerState::Running
270                            | ContainerState::Pending
271                            | ContainerState::Initializing,
272                        ) => true,
273                        Ok(state) => {
274                            tracing::warn!(
275                                container = %id,
276                                ?state,
277                                "container exited before overlay attach could run"
278                            );
279                            false
280                        }
281                        Err(e) => {
282                            // State query failed — don't block the attach on
283                            // it. The overlay manager's own cleanup-on-error
284                            // path now handles the dead-PID case cleanly.
285                            tracing::warn!(
286                                container = %id,
287                                error = %e,
288                                "container state query failed before overlay attach, proceeding"
289                            );
290                            true
291                        }
292                    };
293                    if !alive {
294                        let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
295                            || "  <log read failed>".to_string(),
296                            |entries| {
297                                if entries.is_empty() {
298                                    "  <no log output>".to_string()
299                                } else {
300                                    entries
301                                        .into_iter()
302                                        .map(|e| format!("  {}", e.message))
303                                        .collect::<Vec<_>>()
304                                        .join("\n")
305                                }
306                            },
307                        );
308                        return Err(AgentError::StartFailed {
309                            id: id.to_string(),
310                            reason: format!("container exited during startup:\n{log_tail}"),
311                        });
312                    }
313                }
314
315                // Attach to overlay network if manager is available.
316                //
317                // Linux uses the container PID to enter the netns and attach a
318                // veth. Windows has no PID-addressable netns — the HCN namespace
319                // GUID (obtained from `get_container_namespace_id`) is used
320                // instead, and the endpoint's IP has already been populated by
321                // `EndpointAttachment::create_overlay` during container creation.
322                // We simply register that IP with the slice allocator so host
323                // accounting stays in sync.
324                let overlay_ip = if let Some(overlay) = &self.overlay_manager {
325                    let overlay_guard = overlay.read().await;
326                    #[cfg(target_os = "windows")]
327                    let attach_result: Option<std::net::IpAddr> = {
328                        let _ = container_pid; // unused on Windows
329                        match self.runtime.get_container_namespace_id(&id).await {
330                            Ok(Some(ns_id)) => {
331                                let ip_override =
332                                    self.runtime.get_container_ip(&id).await.ok().flatten();
333                                let dns_server = overlay_guard.dns_server_addr().map(|sa| sa.ip());
334                                let dns_domain =
335                                    overlay_guard.dns_domain().map(ToString::to_string);
336                                match overlay_guard
337                                    .attach_container_hcn(
338                                        ns_id,
339                                        &self.service_name,
340                                        ip_override,
341                                        true,
342                                        dns_server,
343                                        dns_domain,
344                                    )
345                                    .await
346                                {
347                                    Ok(ip) => Some(ip),
348                                    Err(e) => {
349                                        tracing::warn!(
350                                            container = %id,
351                                            error = %e,
352                                            "HCN overlay attach failed"
353                                        );
354                                        None
355                                    }
356                                }
357                            }
358                            Ok(None) => {
359                                tracing::debug!(
360                                    container = %id,
361                                    "skipping HCN overlay attach - no namespace id available"
362                                );
363                                None
364                            }
365                            Err(e) => {
366                                tracing::warn!(
367                                    container = %id,
368                                    error = %e,
369                                    "failed to fetch HCN namespace id"
370                                );
371                                None
372                            }
373                        }
374                    };
375                    #[cfg(not(target_os = "windows"))]
376                    let attach_result: Option<std::net::IpAddr> = {
377                        if let Some(pid) = container_pid {
378                            match overlay_guard
379                                .attach_container(pid, &self.service_name, true)
380                                .await
381                            {
382                                Ok(ip) => Some(ip),
383                                Err(e) => {
384                                    tracing::warn!(
385                                        container = %id,
386                                        error = %e,
387                                        "failed to attach container to overlay network"
388                                    );
389                                    None
390                                }
391                            }
392                        } else {
393                            // No PID available (e.g. WASM runtime) - skip overlay attachment
394                            tracing::debug!(
395                                container = %id,
396                                "skipping overlay attachment - no PID available"
397                            );
398                            None
399                        }
400                    };
401
402                    if let Some(ip) = attach_result {
403                        tracing::info!(
404                            container = %id,
405                            overlay_ip = %ip,
406                            "attached container to overlay network"
407                        );
408
409                        // Register DNS for service discovery
410                        if let Some(dns) = &self.dns_server {
411                            // Register service-level hostname: {service}.service.local
412                            let service_hostname = format!("{}.service.local", self.service_name);
413
414                            // Register replica-specific hostname: {replica}.{service}.service.local
415                            let replica_hostname =
416                                format!("{}.{}.service.local", id.replica, self.service_name);
417
418                            match dns.add_record(&service_hostname, ip).await {
419                                Ok(()) => tracing::debug!(
420                                    hostname = %service_hostname,
421                                    ip = %ip,
422                                    "registered DNS for service"
423                                ),
424                                Err(e) => tracing::warn!(
425                                    hostname = %service_hostname,
426                                    error = %e,
427                                    "failed to register DNS for service"
428                                ),
429                            }
430
431                            // Also register replica-specific entry
432                            if let Err(e) = dns.add_record(&replica_hostname, ip).await {
433                                tracing::warn!(
434                                    hostname = %replica_hostname,
435                                    error = %e,
436                                    "failed to register replica DNS"
437                                );
438                            } else {
439                                tracing::debug!(
440                                    hostname = %replica_hostname,
441                                    ip = %ip,
442                                    "registered DNS for replica"
443                                );
444                            }
445                        }
446
447                        Some(ip)
448                    } else {
449                        None
450                    }
451                } else {
452                    None
453                };
454
455                // If overlay failed, try the container runtime's own IP as fallback
456                let effective_ip = if overlay_ip.is_none() {
457                    match self.runtime.get_container_ip(&id).await {
458                        Ok(Some(ip)) => {
459                            tracing::info!(
460                                container = %id,
461                                ip = %ip,
462                                "using runtime container IP for proxy (overlay unavailable)"
463                            );
464                            Some(ip)
465                        }
466                        Ok(None) => {
467                            tracing::warn!(
468                                container = %id,
469                                "no container IP available from runtime, proxy routing will be unavailable"
470                            );
471                            None
472                        }
473                        Err(e) => {
474                            tracing::warn!(
475                                container = %id,
476                                error = %e,
477                                "failed to get container IP from runtime"
478                            );
479                            None
480                        }
481                    }
482                } else {
483                    overlay_ip
484                };
485
486                tracing::info!(
487                    container = %id,
488                    service = %self.service_name,
489                    overlay_ip = ?overlay_ip,
490                    effective_ip = ?effective_ip,
491                    "Container IP resolution complete"
492                );
493
494                // Query port override from the runtime.
495                // On macOS sandbox, each container is assigned a unique port since
496                // all processes share the host network (no network namespaces).
497                // The runtime passes the port to the process via the PORT env var.
498                let port_override = match self.runtime.get_container_port_override(&id).await {
499                    Ok(Some(port)) => {
500                        tracing::info!(
501                            container = %id,
502                            port = port,
503                            "runtime assigned dynamic port override for this container"
504                        );
505                        Some(port)
506                    }
507                    Ok(None) => None,
508                    Err(e) => {
509                        tracing::warn!(
510                            container = %id,
511                            error = %e,
512                            "failed to query port override from runtime, using spec port"
513                        );
514                        None
515                    }
516                };
517
518                // Start health monitoring and store handle (no lock needed during start)
519                let health_monitor_handle = {
520                    let mut check = self.spec.health.check.clone();
521
522                    // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
523                    // port the container is listening on. With mac-sandbox, each
524                    // replica gets a unique assigned port via port_override.
525                    if let HealthCheck::Tcp { ref mut port } = check {
526                        if *port == 0 {
527                            *port = port_override.unwrap_or_else(|| {
528                                self.spec
529                                    .endpoints
530                                    .iter()
531                                    .find(|ep| {
532                                        matches!(
533                                            ep.protocol,
534                                            Protocol::Http | Protocol::Https | Protocol::Websocket
535                                        )
536                                    })
537                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
538                            });
539                        }
540                    }
541
542                    let start_grace = self
543                        .spec
544                        .health
545                        .start_grace
546                        .unwrap_or(Duration::from_secs(5));
547                    let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
548                    let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
549                    let retries = self.spec.health.retries;
550
551                    let checker = HealthChecker::new(check, effective_ip);
552                    let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
553                        .with_start_grace(start_grace)
554                        .with_check_timeout(check_timeout);
555
556                    // Create health callback to update proxy backend health if proxy is configured
557                    // and we have an overlay IP for this container
558                    if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
559                        let proxy = Arc::clone(proxy);
560                        let service_name = self.service_name.clone();
561                        // Get the container's target port, using the runtime override if present.
562                        // On macOS sandbox, port_override gives each replica a unique port
563                        // so the proxy can distinguish backends sharing 127.0.0.1.
564                        let port = port_override.unwrap_or_else(|| {
565                            self.spec
566                                .endpoints
567                                .iter()
568                                .find(|ep| {
569                                    matches!(
570                                        ep.protocol,
571                                        Protocol::Http | Protocol::Https | Protocol::Websocket
572                                    )
573                                })
574                                .map_or(8080, zlayer_spec::EndpointSpec::target_port)
575                        });
576
577                        let backend_addr = SocketAddr::new(ip, port);
578
579                        // Register backend with load balancer so proxy can route to it.
580                        // This must happen before the health callback is created, because
581                        // update_backend_health only updates *existing* backends.
582                        proxy.add_backend(&self.service_name, backend_addr).await;
583
584                        let health_states_opt = self.health_states.clone();
585                        let svc_name_for_states = self.service_name.clone();
586
587                        let health_callback: HealthCallback =
588                            Arc::new(move |container_id: ContainerId, is_healthy: bool| {
589                                let proxy = Arc::clone(&proxy);
590                                let service_name = service_name.clone();
591                                tracing::info!(
592                                    container = %container_id,
593                                    service = %service_name,
594                                    backend = %backend_addr,
595                                    healthy = is_healthy,
596                                    "health status changed, updating proxy backend"
597                                );
598                                // Spawn a task to update the proxy (callback is sync, proxy update is async)
599                                tokio::spawn(async move {
600                                    proxy
601                                        .update_backend_health(
602                                            &service_name,
603                                            backend_addr,
604                                            is_healthy,
605                                        )
606                                        .await;
607                                });
608                                // Bridge health state back to ServiceManager's health_states map
609                                if let Some(ref health_states) = health_states_opt {
610                                    let states = Arc::clone(health_states);
611                                    let svc = svc_name_for_states.clone();
612                                    tokio::spawn(async move {
613                                        let state = if is_healthy {
614                                            HealthState::Healthy
615                                        } else {
616                                            HealthState::Unhealthy {
617                                                failures: 0,
618                                                reason: "health check failed".into(),
619                                            }
620                                        };
621                                        states.write().await.insert(svc, state);
622                                    });
623                                }
624                            });
625
626                        monitor = monitor.with_callback(health_callback);
627                    }
628
629                    monitor.start()
630                };
631
632                // Update state (short write lock)
633                {
634                    let mut containers = self.containers.write().await;
635                    containers.insert(
636                        id.clone(),
637                        Container {
638                            id: id.clone(),
639                            state: ContainerState::Running,
640                            pid: None,
641                            task: None,
642                            overlay_ip: effective_ip,
643                            health_monitor: Some(health_monitor_handle),
644                            port_override,
645                        },
646                    );
647                } // Lock released here
648            }
649        }
650
651        // Phase 3: Scale down - remove containers (short write lock per removal)
652        if replicas < current_replicas {
653            for i in replicas..current_replicas {
654                let id = ContainerId {
655                    service: self.service_name.clone(),
656                    replica: i + 1,
657                };
658
659                // Remove from state first and get the container to abort health monitor (short write lock)
660                let removed_container = {
661                    let mut containers = self.containers.write().await;
662                    containers.remove(&id)
663                }; // Lock released here
664
665                // Then perform cleanup (no lock held - I/O operations)
666                if let Some(container) = removed_container {
667                    // Abort the health monitor task if it exists
668                    if let Some(handle) = container.health_monitor {
669                        handle.abort();
670                    }
671
672                    // Remove DNS records for this container
673                    if let Some(dns) = &self.dns_server {
674                        // Remove replica-specific DNS entry
675                        let replica_hostname =
676                            format!("{}.{}.service.local", id.replica, self.service_name);
677                        if let Err(e) = dns.remove_record(&replica_hostname).await {
678                            tracing::warn!(
679                                hostname = %replica_hostname,
680                                error = %e,
681                                "failed to remove replica DNS record"
682                            );
683                        } else {
684                            tracing::debug!(
685                                hostname = %replica_hostname,
686                                "removed replica DNS record"
687                            );
688                        }
689
690                        // Note: We don't remove the service-level hostname here because
691                        // other replicas may still be using it. The service-level record
692                        // should be cleaned up when the entire service is removed.
693                    }
694
695                    // Stop container
696                    self.runtime
697                        .stop_container(&id, Duration::from_secs(30))
698                        .await?;
699
700                    // Sync volumes to S3 before removal (no-op if not configured)
701                    if let Err(e) = self.runtime.sync_container_volumes(&id).await {
702                        tracing::warn!(
703                            container = %id,
704                            error = %e,
705                            "failed to sync volumes before removal"
706                        );
707                    }
708
709                    // Remove container
710                    self.runtime.remove_container(&id).await?;
711                }
712            }
713        }
714
715        Ok(())
716    }
717
718    /// Get current number of replicas
719    pub async fn replica_count(&self) -> usize {
720        self.containers.read().await.len()
721    }
722
723    /// Get all container IDs
724    pub async fn container_ids(&self) -> Vec<ContainerId> {
725        self.containers.read().await.keys().cloned().collect()
726    }
727
728    /// Get read access to the containers map
729    ///
730    /// This allows callers to access container overlay IPs and other metadata
731    /// without copying the entire map.
732    pub fn containers(
733        &self,
734    ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
735        &self.containers
736    }
737
738    /// Check if this service instance has an overlay manager configured
739    pub fn has_overlay_manager(&self) -> bool {
740        self.overlay_manager.is_some()
741    }
742
743    /// Check if this service instance has a proxy manager configured
744    pub fn has_proxy_manager(&self) -> bool {
745        self.proxy_manager.is_some()
746    }
747
748    /// Check if this service instance has a DNS server configured
749    pub fn has_dns_server(&self) -> bool {
750        self.dns_server.is_some()
751    }
752}
753
754/// Service manager for multiple services
755pub struct ServiceManager {
756    runtime: Arc<dyn Runtime + Send + Sync>,
757    services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
758    scale_semaphore: Arc<Semaphore>,
759    /// Overlay network manager for container networking
760    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
761    /// Stream registry for L4 proxy route registration (TCP/UDP)
762    stream_registry: Option<Arc<StreamRegistry>>,
763    /// Proxy manager for health-aware load balancing (hyper-based proxy)
764    proxy_manager: Option<Arc<ProxyManager>>,
765    /// DNS server for service discovery
766    dns_server: Option<Arc<DnsServer>>,
767    /// Deployment name (used for generating hostnames)
768    deployment_name: Option<String>,
769    /// Health states for dependency condition checking
770    health_states: Arc<RwLock<HashMap<String, HealthState>>>,
771    /// Job executor for run-to-completion workloads
772    job_executor: Option<Arc<JobExecutor>>,
773    /// Cron scheduler for time-based job triggers
774    cron_scheduler: Option<Arc<CronScheduler>>,
775    /// Container supervisor for crash/panic policy enforcement
776    container_supervisor: Option<Arc<ContainerSupervisor>>,
777}
778
779// ---------------------------------------------------------------------------
780// ServiceManagerBuilder
781// ---------------------------------------------------------------------------
782
783/// Builder for constructing a [`ServiceManager`] with optional subsystems.
784///
785/// Prefer using `ServiceManager::builder(runtime)` to start building.
786///
787/// # Example
788///
789/// ```ignore
790/// let manager = ServiceManager::builder(runtime)
791///     .overlay_manager(om)
792///     .proxy_manager(proxy)
793///     .deployment_name("prod")
794///     .build();
795/// ```
796pub struct ServiceManagerBuilder {
797    runtime: Arc<dyn Runtime + Send + Sync>,
798    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
799    proxy_manager: Option<Arc<ProxyManager>>,
800    stream_registry: Option<Arc<StreamRegistry>>,
801    dns_server: Option<Arc<DnsServer>>,
802    deployment_name: Option<String>,
803    job_executor: Option<Arc<JobExecutor>>,
804    cron_scheduler: Option<Arc<CronScheduler>>,
805    container_supervisor: Option<Arc<ContainerSupervisor>>,
806}
807
808impl ServiceManagerBuilder {
809    /// Create a new builder with the required runtime.
810    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
811        Self {
812            runtime,
813            overlay_manager: None,
814            proxy_manager: None,
815            stream_registry: None,
816            dns_server: None,
817            deployment_name: None,
818            job_executor: None,
819            cron_scheduler: None,
820            container_supervisor: None,
821        }
822    }
823
824    /// Set the overlay network manager for container networking.
825    #[must_use]
826    pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
827        self.overlay_manager = Some(om);
828        self
829    }
830
831    /// Set the proxy manager for health-aware load balancing.
832    #[must_use]
833    pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
834        self.proxy_manager = Some(pm);
835        self
836    }
837
838    /// Set the stream registry for TCP/UDP L4 proxy route registration.
839    #[must_use]
840    pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
841        self.stream_registry = Some(sr);
842        self
843    }
844
845    /// Set the DNS server for service discovery.
846    #[must_use]
847    pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
848        self.dns_server = Some(dns);
849        self
850    }
851
852    /// Set the deployment name (used for hostname generation).
853    #[must_use]
854    pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
855        self.deployment_name = Some(name.into());
856        self
857    }
858
859    /// Set the job executor for run-to-completion workloads.
860    #[must_use]
861    pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
862        self.job_executor = Some(je);
863        self
864    }
865
866    /// Set the cron scheduler for time-based job triggers.
867    #[must_use]
868    pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
869        self.cron_scheduler = Some(cs);
870        self
871    }
872
873    /// Set the container supervisor for crash/panic policy enforcement.
874    #[must_use]
875    pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
876        self.container_supervisor = Some(cs);
877        self
878    }
879
880    /// Consume the builder and produce a fully-wired [`ServiceManager`].
881    ///
882    /// Logs warnings for missing recommended subsystems (proxy,
883    /// `stream_registry`, `container_supervisor`, `deployment_name`).
884    pub fn build(self) -> ServiceManager {
885        if self.proxy_manager.is_none() {
886            tracing::warn!("ServiceManager built without proxy_manager");
887        }
888        if self.stream_registry.is_none() {
889            tracing::warn!("ServiceManager built without stream_registry");
890        }
891        if self.container_supervisor.is_none() {
892            tracing::warn!("ServiceManager built without container_supervisor");
893        }
894        if self.deployment_name.is_none() {
895            tracing::warn!("ServiceManager built without deployment_name");
896        }
897
898        ServiceManager {
899            runtime: self.runtime,
900            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
901            scale_semaphore: Arc::new(Semaphore::new(10)),
902            overlay_manager: self.overlay_manager,
903            stream_registry: self.stream_registry,
904            proxy_manager: self.proxy_manager,
905            dns_server: self.dns_server,
906            deployment_name: self.deployment_name,
907            health_states: Arc::new(RwLock::new(HashMap::new())),
908            job_executor: self.job_executor,
909            cron_scheduler: self.cron_scheduler,
910            container_supervisor: self.container_supervisor,
911        }
912    }
913}
914
915impl ServiceManager {
916    /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
917    ///
918    /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
919    ///
920    /// # Example
921    ///
922    /// ```ignore
923    /// let manager = ServiceManager::builder(runtime)
924    ///     .overlay_manager(om)
925    ///     .proxy_manager(proxy)
926    ///     .build();
927    /// ```
928    pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
929        ServiceManagerBuilder::new(runtime)
930    }
931
932    /// Create a new service manager
933    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
934    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
935        Self {
936            runtime,
937            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
938            scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
939            overlay_manager: None,
940            stream_registry: None,
941            proxy_manager: None,
942            dns_server: None,
943            deployment_name: None,
944            health_states: Arc::new(RwLock::new(HashMap::new())),
945            job_executor: None,
946            cron_scheduler: None,
947            container_supervisor: None,
948        }
949    }
950
951    /// Create a service manager with overlay network support
952    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
953    pub fn with_overlay(
954        runtime: Arc<dyn Runtime + Send + Sync>,
955        overlay_manager: Arc<RwLock<OverlayManager>>,
956    ) -> Self {
957        Self {
958            runtime,
959            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
960            scale_semaphore: Arc::new(Semaphore::new(10)),
961            overlay_manager: Some(overlay_manager),
962            stream_registry: None,
963            proxy_manager: None,
964            dns_server: None,
965            deployment_name: None,
966            health_states: Arc::new(RwLock::new(HashMap::new())),
967            job_executor: None,
968            cron_scheduler: None,
969            container_supervisor: None,
970        }
971    }
972
973    /// Create a fully-configured service manager with overlay and proxy support
974    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
975    pub fn with_full_config(
976        runtime: Arc<dyn Runtime + Send + Sync>,
977        overlay_manager: Arc<RwLock<OverlayManager>>,
978        deployment_name: String,
979    ) -> Self {
980        Self {
981            runtime,
982            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
983            scale_semaphore: Arc::new(Semaphore::new(10)),
984            overlay_manager: Some(overlay_manager),
985            stream_registry: None,
986            proxy_manager: None,
987            dns_server: None,
988            deployment_name: Some(deployment_name),
989            health_states: Arc::new(RwLock::new(HashMap::new())),
990            job_executor: None,
991            cron_scheduler: None,
992            container_supervisor: None,
993        }
994    }
995
996    /// Get the health states map for external monitoring
997    pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
998        Arc::clone(&self.health_states)
999    }
1000
1001    /// Update health state for a service
1002    pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1003        let mut states = self.health_states.write().await;
1004        states.insert(service_name.to_string(), state);
1005    }
1006
1007    /// Set the deployment name (used for generating hostnames)
1008    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1009    pub fn set_deployment_name(&mut self, name: String) {
1010        self.deployment_name = Some(name);
1011    }
1012
1013    /// Set the stream registry for L4 proxy integration (TCP/UDP)
1014    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1015    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1016        self.stream_registry = Some(registry);
1017    }
1018
1019    /// Builder pattern: add stream registry for L4 proxy integration
1020    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1021    #[must_use]
1022    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1023        self.stream_registry = Some(registry);
1024        self
1025    }
1026
1027    /// Get the stream registry (if configured)
1028    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1029        self.stream_registry.as_ref()
1030    }
1031
1032    /// Set the overlay manager for container networking
1033    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1034    pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1035        self.overlay_manager = Some(manager);
1036    }
1037
1038    /// Set the proxy manager for health-aware load balancing
1039    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1040    pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1041        self.proxy_manager = Some(proxy);
1042    }
1043
1044    /// Builder pattern: add proxy manager for health-aware load balancing
1045    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1046    #[must_use]
1047    pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1048        self.proxy_manager = Some(proxy);
1049        self
1050    }
1051
1052    /// Get the proxy manager (if configured)
1053    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1054        self.proxy_manager.as_ref()
1055    }
1056
1057    /// Set the DNS server for service discovery
1058    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1059    pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1060        self.dns_server = Some(dns);
1061    }
1062
1063    /// Builder pattern: add DNS server for service discovery
1064    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1065    #[must_use]
1066    pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1067        self.dns_server = Some(dns);
1068        self
1069    }
1070
1071    /// Get the DNS server (if configured)
1072    pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1073        self.dns_server.as_ref()
1074    }
1075
1076    /// Set the job executor for run-to-completion workloads
1077    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1078    pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1079        self.job_executor = Some(executor);
1080    }
1081
1082    /// Set the cron scheduler for time-based job triggers
1083    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1084    pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1085        self.cron_scheduler = Some(scheduler);
1086    }
1087
1088    /// Builder pattern: add job executor
1089    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1090    #[must_use]
1091    pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1092        self.job_executor = Some(executor);
1093        self
1094    }
1095
1096    /// Builder pattern: add cron scheduler
1097    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1098    #[must_use]
1099    pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1100        self.cron_scheduler = Some(scheduler);
1101        self
1102    }
1103
1104    /// Get the job executor (if configured)
1105    pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1106        self.job_executor.as_ref()
1107    }
1108
1109    /// Get the cron scheduler (if configured)
1110    pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1111        self.cron_scheduler.as_ref()
1112    }
1113
1114    /// Set the container supervisor for crash/panic policy enforcement
1115    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1116    pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1117        self.container_supervisor = Some(supervisor);
1118    }
1119
1120    /// Builder pattern: add container supervisor
1121    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1122    #[must_use]
1123    pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1124        self.container_supervisor = Some(supervisor);
1125        self
1126    }
1127
1128    /// Get the container supervisor (if configured)
1129    pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1130        self.container_supervisor.as_ref()
1131    }
1132
1133    /// Start the container supervisor background task
1134    ///
1135    /// This spawns a background task that monitors containers for crashes
1136    /// and enforces the `on_panic` error policy.
1137    ///
1138    /// # Errors
1139    /// Returns an error if no container supervisor is configured.
1140    ///
1141    /// # Returns
1142    /// A `JoinHandle` for the supervisor task.
1143    pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1144        let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1145            AgentError::Configuration("Container supervisor not configured".to_string())
1146        })?;
1147
1148        let supervisor = Arc::clone(supervisor);
1149        Ok(tokio::spawn(async move {
1150            supervisor.run_loop().await;
1151        }))
1152    }
1153
1154    /// Shutdown the container supervisor
1155    pub fn shutdown_container_supervisor(&self) {
1156        if let Some(supervisor) = &self.container_supervisor {
1157            supervisor.shutdown();
1158        }
1159    }
1160
1161    /// Get the supervised state of a container
1162    pub async fn get_container_supervised_state(
1163        &self,
1164        container_id: &ContainerId,
1165    ) -> Option<SupervisedState> {
1166        if let Some(supervisor) = &self.container_supervisor {
1167            supervisor.get_state(container_id).await
1168        } else {
1169            None
1170        }
1171    }
1172
1173    /// Get supervisor events receiver
1174    ///
1175    /// Note: This can only be called once; the receiver is moved to the caller.
1176    pub async fn take_supervisor_events(
1177        &self,
1178    ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1179        if let Some(supervisor) = &self.container_supervisor {
1180            supervisor.take_event_receiver().await
1181        } else {
1182            None
1183        }
1184    }
1185
1186    // ==================== Dependency Orchestration ====================
1187
1188    /// Deploy multiple services respecting their dependency order
1189    ///
1190    /// This method:
1191    /// 1. Builds a dependency graph from the services
1192    /// 2. Validates no cycles exist
1193    /// 3. Computes topological order (services with no deps first)
1194    /// 4. For each service in order, waits for dependencies then starts the service
1195    ///
1196    /// # Arguments
1197    /// * `services` - Map of service name to service specification
1198    ///
1199    /// # Errors
1200    /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1201    /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1202    pub async fn deploy_with_dependencies(
1203        &self,
1204        services: HashMap<String, ServiceSpec>,
1205    ) -> Result<()> {
1206        if services.is_empty() {
1207            return Ok(());
1208        }
1209
1210        // Build dependency graph
1211        let graph = DependencyGraph::build(&services)?;
1212
1213        tracing::info!(
1214            service_count = services.len(),
1215            "Starting deployment with dependency ordering"
1216        );
1217
1218        // Get startup order
1219        let order = graph.startup_order();
1220        tracing::debug!(order = ?order, "Computed startup order");
1221
1222        // Start services in dependency order
1223        for service_name in order {
1224            let service_spec = services
1225                .get(service_name)
1226                .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1227
1228            // Wait for dependencies first
1229            if !service_spec.depends.is_empty() {
1230                tracing::info!(
1231                    service = %service_name,
1232                    dependency_count = service_spec.depends.len(),
1233                    "Waiting for dependencies"
1234                );
1235                self.wait_for_dependencies(service_name, &service_spec.depends)
1236                    .await?;
1237            }
1238
1239            // Register and start service
1240            tracing::info!(service = %service_name, "Starting service");
1241            self.upsert_service(service_name.clone(), service_spec.clone())
1242                .await?;
1243
1244            // Get the desired replica count from scale config
1245            let replicas = match &service_spec.scale {
1246                zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1247                zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1248                zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1249            };
1250            self.scale_service(service_name, replicas).await?;
1251
1252            // Mark service as started in health states (Unknown until health check runs)
1253            self.update_health_state(service_name, HealthState::Unknown)
1254                .await;
1255
1256            tracing::info!(
1257                service = %service_name,
1258                replicas = replicas,
1259                "Service started"
1260            );
1261        }
1262
1263        tracing::info!(service_count = services.len(), "Deployment complete");
1264
1265        Ok(())
1266    }
1267
1268    /// Wait for all dependencies of a service to be satisfied
1269    ///
1270    /// # Arguments
1271    /// * `service` - Name of the service waiting for dependencies
1272    /// * `deps` - Slice of dependency specifications
1273    ///
1274    /// # Errors
1275    /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1276    async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1277        let condition_checker = DependencyConditionChecker::new(
1278            Arc::clone(&self.runtime),
1279            Arc::clone(&self.health_states),
1280            None,
1281        );
1282
1283        let waiter = DependencyWaiter::new(condition_checker);
1284        let results = waiter.wait_for_all(deps).await?;
1285
1286        // Check results for failures
1287        for result in results {
1288            match result {
1289                WaitResult::TimedOutFail {
1290                    service: dep_service,
1291                    condition,
1292                    timeout,
1293                } => {
1294                    return Err(AgentError::DependencyTimeout {
1295                        service: service.to_string(),
1296                        dependency: dep_service,
1297                        condition: format!("{condition:?}"),
1298                        timeout,
1299                    });
1300                }
1301                WaitResult::TimedOutWarn {
1302                    service: dep_service,
1303                    condition,
1304                } => {
1305                    tracing::warn!(
1306                        service = %service,
1307                        dependency = %dep_service,
1308                        condition = ?condition,
1309                        "Dependency timed out but continuing"
1310                    );
1311                }
1312                WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1313                    // Continue silently
1314                }
1315            }
1316        }
1317
1318        Ok(())
1319    }
1320
1321    /// Check if all dependencies for a service are currently satisfied
1322    ///
1323    /// This is a one-shot check (no waiting). Useful for pre-flight validation.
1324    ///
1325    /// # Errors
1326    /// Returns an error if a dependency check fails unexpectedly.
1327    pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1328        let condition_checker = DependencyConditionChecker::new(
1329            Arc::clone(&self.runtime),
1330            Arc::clone(&self.health_states),
1331            None,
1332        );
1333
1334        for dep in deps {
1335            if !condition_checker.check(dep).await? {
1336                return Ok(false);
1337            }
1338        }
1339
1340        Ok(true)
1341    }
1342
1343    /// Add or update a workload (service, job, or cron)
1344    ///
1345    /// This method handles different resource types appropriately:
1346    /// - **Service**: Traditional long-running containers with scaling and health checks
1347    /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
1348    /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
1349    ///
1350    /// # Errors
1351    /// Returns an error if service creation, scaling, or cron registration fails.
1352    #[allow(clippy::too_many_lines)]
1353    pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1354        match spec.rtype {
1355            ResourceType::Service => {
1356                // Long-running service: create/update instance
1357                let mut services = self.services.write().await;
1358
1359                if let Some(instance) = services.get_mut(&name) {
1360                    // Update existing service. We need to:
1361                    //   1. Update the in-memory spec (so future scale-ups use the new image).
1362                    //   2. Honour the effective pull policy. For Never/IfNotPresent (after
1363                    //      effective resolution) we noop. For Always/Newer we pull, compare
1364                    //      digests, and trigger a rolling recreate when drift is observed.
1365                    instance.spec = spec.clone();
1366                    if let Some(dns) = &self.dns_server {
1367                        instance.set_dns_server(Arc::clone(dns));
1368                    }
1369
1370                    let effective = effective_pull_policy(&spec.image.name, spec.image.pull_policy);
1371                    let old_digest = instance.last_pulled_digest().await;
1372                    let current_replicas =
1373                        u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
1374                    drop(services); // Release write lock before pull / scale (which take their own locks).
1375
1376                    match effective {
1377                        PullPolicy::Never | PullPolicy::IfNotPresent => {
1378                            // No pull, no recreate. Drift is silently ignored when the
1379                            // user has explicitly opted into "do not refresh" semantics.
1380                            tracing::debug!(
1381                                service = %name,
1382                                policy = ?effective,
1383                                "service unchanged on re-deploy (effective pull policy skips refresh)"
1384                            );
1385                        }
1386                        PullPolicy::Always | PullPolicy::Newer => {
1387                            // Pull (this updates the cached digest as a side-effect).
1388                            // We need a read guard to keep the instance alive while
1389                            // calling its &self method.
1390                            let services_ro = self.services.read().await;
1391                            let new_digest = if let Some(inst) = services_ro.get(&name) {
1392                                inst.pull_and_refresh_digest().await?
1393                            } else {
1394                                // The service vanished between our write-lock release
1395                                // and read-lock acquisition (race with remove_service).
1396                                // Treat this as a no-op; the caller will see the removal.
1397                                tracing::warn!(
1398                                    service = %name,
1399                                    "service removed during upsert; skipping drift recreate"
1400                                );
1401                                return Ok(());
1402                            };
1403                            drop(services_ro);
1404
1405                            // Decide whether to recreate. Always forces a recreate.
1406                            // Newer recreates only when the digest actually changed.
1407                            // When digests are unknown (runtime doesn't expose them),
1408                            // we can't observe drift safely under Newer, so no-op.
1409                            let should_recreate = match effective {
1410                                PullPolicy::Always => true,
1411                                PullPolicy::Newer => match (&old_digest, &new_digest) {
1412                                    (Some(old), Some(new)) => old != new,
1413                                    _ => false,
1414                                },
1415                                _ => false,
1416                            };
1417
1418                            if should_recreate && current_replicas > 0 {
1419                                tracing::info!(
1420                                    service = %name,
1421                                    policy = ?effective,
1422                                    old_digest = ?old_digest,
1423                                    new_digest = ?new_digest,
1424                                    replicas = current_replicas,
1425                                    "image drift detected; performing rolling recreate"
1426                                );
1427                                self.scale_service(&name, 0).await?;
1428                                self.scale_service(&name, current_replicas).await?;
1429                                tracing::info!(
1430                                    service = %name,
1431                                    new_digest = ?new_digest,
1432                                    "service recreated with refreshed image"
1433                                );
1434                            } else {
1435                                tracing::debug!(
1436                                    service = %name,
1437                                    policy = ?effective,
1438                                    old_digest = ?old_digest,
1439                                    new_digest = ?new_digest,
1440                                    "service up to date; no recreate required"
1441                                );
1442                            }
1443                        }
1444                    }
1445                    return Ok(());
1446                }
1447                // Create new service with proxy manager for health-aware load balancing
1448                let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1449                let mut instance = if let Some(proxy) = &self.proxy_manager {
1450                    ServiceInstance::with_proxy(
1451                        name.clone(),
1452                        spec,
1453                        self.runtime.clone(),
1454                        overlay,
1455                        Arc::clone(proxy),
1456                    )
1457                } else {
1458                    ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1459                };
1460                // Set DNS server if configured
1461                if let Some(dns) = &self.dns_server {
1462                    instance.set_dns_server(Arc::clone(dns));
1463                }
1464                // Wire shared health states so callbacks bridge back to ServiceManager
1465                instance.set_health_states(Arc::clone(&self.health_states));
1466                // Register HTTP routes via proxy manager
1467                if let Some(proxy) = &self.proxy_manager {
1468                    proxy.add_service(&name, &instance.spec).await;
1469                }
1470                // Register TCP/UDP endpoints in stream registry
1471                if let Some(stream_registry) = &self.stream_registry {
1472                    for endpoint in &instance.spec.endpoints {
1473                        let svc = StreamService::new(
1474                            name.clone(),
1475                            Vec::new(), // No backends yet; added on scale-up
1476                        );
1477                        match endpoint.protocol {
1478                            Protocol::Tcp => {
1479                                stream_registry.register_tcp(endpoint.port, svc);
1480                                tracing::debug!(
1481                                    service = %name,
1482                                    port = endpoint.port,
1483                                    "Registered TCP stream route"
1484                                );
1485                            }
1486                            Protocol::Udp => {
1487                                stream_registry.register_udp(endpoint.port, svc);
1488                                tracing::debug!(
1489                                    service = %name,
1490                                    port = endpoint.port,
1491                                    "Registered UDP stream route"
1492                                );
1493                            }
1494                            _ => {} // HTTP routes handled by proxy manager
1495                        }
1496                    }
1497                }
1498                services.insert(name, instance);
1499            }
1500            ResourceType::Job => {
1501                // Job: Just store the spec for later triggering
1502                // Jobs don't start containers immediately; they're triggered on-demand
1503                if let Some(executor) = &self.job_executor {
1504                    executor.register_job(&name, spec).await;
1505                    tracing::info!(job = %name, "Registered job spec");
1506                } else {
1507                    tracing::warn!(
1508                        job = %name,
1509                        "Job executor not configured, storing as service for reference"
1510                    );
1511                    // Fallback: store as service instance for reference
1512                    let mut services = self.services.write().await;
1513                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1514                    let mut instance = if let Some(proxy) = &self.proxy_manager {
1515                        ServiceInstance::with_proxy(
1516                            name.clone(),
1517                            spec,
1518                            self.runtime.clone(),
1519                            overlay,
1520                            Arc::clone(proxy),
1521                        )
1522                    } else {
1523                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1524                    };
1525                    // Set DNS server if configured
1526                    if let Some(dns) = &self.dns_server {
1527                        instance.set_dns_server(Arc::clone(dns));
1528                    }
1529                    services.insert(name, instance);
1530                }
1531            }
1532            ResourceType::Cron => {
1533                // Cron: Register with the cron scheduler
1534                if let Some(scheduler) = &self.cron_scheduler {
1535                    scheduler.register(&name, &spec).await?;
1536                    tracing::info!(cron = %name, "Registered cron job with scheduler");
1537                } else {
1538                    return Err(AgentError::Configuration(format!(
1539                        "Cron scheduler not configured for cron job '{name}'"
1540                    )));
1541                }
1542            }
1543        }
1544
1545        Ok(())
1546    }
1547
1548    /// Update backend addresses via `ProxyManager` after scaling
1549    async fn update_proxy_backends(&self, service_name: &str, addrs: Vec<SocketAddr>) {
1550        if let Some(proxy) = &self.proxy_manager {
1551            proxy.update_backends(service_name, addrs).await;
1552        }
1553    }
1554
1555    /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
1556    ///
1557    /// For containers with a port override (macOS sandbox), the addresses already
1558    /// carry the runtime-assigned port. In that case, the container listens on the
1559    /// override port for all traffic, so we use the address port directly. For
1560    /// containers without a port override (Linux, VMs), we reconstruct addresses
1561    /// using the endpoint's declared port, since each container has its own IP
1562    /// and can bind any port independently.
1563    fn update_stream_backends(&self, spec: &ServiceSpec, addrs: &[SocketAddr]) {
1564        let Some(stream_registry) = &self.stream_registry else {
1565            return;
1566        };
1567
1568        // Determine if any addresses have a port override by checking whether
1569        // all addresses use the same port as the primary spec endpoint. If not,
1570        // they carry per-container port overrides and should be used as-is.
1571        let primary_spec_port = spec
1572            .endpoints
1573            .iter()
1574            .find(|ep| {
1575                matches!(
1576                    ep.protocol,
1577                    Protocol::Http | Protocol::Https | Protocol::Websocket
1578                )
1579            })
1580            .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1581
1582        let has_port_overrides = addrs.iter().any(|addr| addr.port() != primary_spec_port);
1583
1584        for endpoint in &spec.endpoints {
1585            match endpoint.protocol {
1586                Protocol::Tcp => {
1587                    let tcp_backends: Vec<SocketAddr> = if has_port_overrides {
1588                        // Port overrides active (macOS sandbox): the container listens
1589                        // on its assigned port for all traffic. Use addresses as-is.
1590                        addrs.to_vec()
1591                    } else {
1592                        // Normal case: each container has its own IP, construct
1593                        // addresses using the TCP endpoint's container target port.
1594                        addrs
1595                            .iter()
1596                            .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1597                            .collect()
1598                    };
1599
1600                    stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
1601
1602                    tracing::debug!(
1603                        endpoint = %endpoint.name,
1604                        port = endpoint.port,
1605                        backend_count = addrs.len(),
1606                        "Updated TCP stream backends"
1607                    );
1608                }
1609                Protocol::Udp => {
1610                    let udp_backends: Vec<SocketAddr> = if has_port_overrides {
1611                        addrs.to_vec()
1612                    } else {
1613                        addrs
1614                            .iter()
1615                            .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1616                            .collect()
1617                    };
1618
1619                    stream_registry.update_udp_backends(endpoint.port, udp_backends);
1620
1621                    tracing::debug!(
1622                        endpoint = %endpoint.name,
1623                        port = endpoint.port,
1624                        backend_count = addrs.len(),
1625                        "Updated UDP stream backends"
1626                    );
1627                }
1628                _ => {} // HTTP endpoints handled by update_proxy_backends
1629            }
1630        }
1631    }
1632
1633    /// Scale a service to desired replica count
1634    ///
1635    /// # Errors
1636    /// Returns an error if the service is not found or scaling fails.
1637    #[allow(clippy::cast_possible_truncation)]
1638    pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
1639        let _permit = self.scale_semaphore.acquire().await;
1640
1641        let services = self.services.read().await;
1642        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1643            container: name.to_string(),
1644            reason: "service not found".to_string(),
1645        })?;
1646
1647        // Get current replica count before scaling
1648        let current_replicas = instance.replica_count().await as u32;
1649
1650        // Perform the scaling operation
1651        instance.scale_to(replicas).await?;
1652
1653        // After scaling, update proxy backends with new container addresses
1654        // Note: In a real implementation, we would get actual container IPs
1655        // from the overlay network or container runtime. For now, we construct
1656        // backend addresses based on the endpoint port and localhost (for same-node).
1657        // TODO: Get actual container addresses from overlay_manager or runtime
1658        let addrs = self.collect_backend_addrs(instance, replicas).await;
1659
1660        // Update HTTP backends via ProxyManager
1661        if self.proxy_manager.is_some() && !addrs.is_empty() {
1662            self.update_proxy_backends(name, addrs.clone()).await;
1663        }
1664
1665        // Update TCP/UDP backends in StreamRegistry
1666        if self.stream_registry.is_some() {
1667            self.update_stream_backends(&instance.spec, &addrs);
1668        }
1669
1670        // Register new containers with supervisor for crash monitoring
1671        if let Some(supervisor) = &self.container_supervisor {
1672            // For scale-up, register new containers
1673            if replicas > current_replicas {
1674                for i in current_replicas..replicas {
1675                    let container_id = ContainerId {
1676                        service: name.to_string(),
1677                        replica: i + 1,
1678                    };
1679                    supervisor.supervise(&container_id, &instance.spec).await;
1680                }
1681            }
1682            // For scale-down, unregister removed containers
1683            if replicas < current_replicas {
1684                for i in replicas..current_replicas {
1685                    let container_id = ContainerId {
1686                        service: name.to_string(),
1687                        replica: i + 1,
1688                    };
1689                    supervisor.unsupervise(&container_id).await;
1690                }
1691            }
1692        }
1693
1694        Ok(())
1695    }
1696
1697    /// Collect backend addresses for a service's containers
1698    ///
1699    /// This queries the service instance's containers for their overlay network
1700    /// IP addresses and constructs backend addresses using those IPs with the
1701    /// service's endpoint port.
1702    ///
1703    /// If a container has a `port_override` (e.g., macOS sandbox where all
1704    /// containers share the host network), that port is used instead of the
1705    /// spec-declared endpoint port. This allows multiple replicas on the same
1706    /// IP (`127.0.0.1`) to be distinguished by port.
1707    async fn collect_backend_addrs(
1708        &self,
1709        instance: &ServiceInstance,
1710        _replicas: u32, // No longer needed - we iterate containers directly
1711    ) -> Vec<SocketAddr> {
1712        let mut addrs = Vec::new();
1713
1714        // Get the primary container target port (first HTTP endpoint) as the default
1715        let spec_port = instance
1716            .spec
1717            .endpoints
1718            .iter()
1719            .find(|ep| {
1720                matches!(
1721                    ep.protocol,
1722                    Protocol::Http | Protocol::Https | Protocol::Websocket
1723                )
1724            })
1725            .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1726
1727        // Collect backend addresses from containers with overlay IPs
1728        let containers = instance.containers().read().await;
1729
1730        for container in containers.values() {
1731            if let Some(ip) = container.overlay_ip {
1732                // Use the runtime-assigned port override if present (macOS sandbox),
1733                // otherwise fall back to the spec-declared endpoint port.
1734                let port = container.port_override.unwrap_or(spec_port);
1735                addrs.push(SocketAddr::new(ip, port));
1736            }
1737        }
1738
1739        // If no overlay IPs available, this might be Docker runtime or failed attachments
1740        // Log a warning but don't fallback to localhost in production
1741        if addrs.is_empty() && !containers.is_empty() {
1742            tracing::warn!(
1743                service = %instance.service_name,
1744                container_count = containers.len(),
1745                "no overlay IPs available for backends - containers may not be reachable via proxy"
1746            );
1747        }
1748
1749        addrs
1750    }
1751
1752    /// Get service replica count
1753    ///
1754    /// # Errors
1755    /// Returns an error if the service is not found.
1756    pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
1757        let services = self.services.read().await;
1758        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1759            container: name.to_string(),
1760            reason: "service not found".to_string(),
1761        })?;
1762
1763        Ok(instance.replica_count().await)
1764    }
1765
1766    /// Remove a workload (service, job, or cron)
1767    ///
1768    /// This method handles cleanup for different resource types:
1769    /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
1770    /// - **Job**: Unregisters from job executor
1771    /// - **Cron**: Unregisters from cron scheduler
1772    ///
1773    /// # Errors
1774    /// Returns an error if the service cannot be removed or scale-down fails.
1775    pub async fn remove_service(&self, name: &str) -> Result<()> {
1776        // Try to unregister from cron scheduler first
1777        if let Some(scheduler) = &self.cron_scheduler {
1778            scheduler.unregister(name).await;
1779        }
1780
1781        // Try to unregister from job executor
1782        if let Some(executor) = &self.job_executor {
1783            executor.unregister_job(name).await;
1784        }
1785
1786        // Unregister stream routes (TCP/UDP) from the stream registry
1787        if let Some(stream_registry) = &self.stream_registry {
1788            // Need to get the service spec to know which ports to unregister
1789            let services = self.services.read().await;
1790            if let Some(instance) = services.get(name) {
1791                for endpoint in &instance.spec.endpoints {
1792                    match endpoint.protocol {
1793                        Protocol::Tcp => {
1794                            let _ = stream_registry.unregister_tcp(endpoint.port);
1795                            tracing::debug!(
1796                                service = %name,
1797                                port = endpoint.port,
1798                                "Unregistered TCP stream route"
1799                            );
1800                        }
1801                        Protocol::Udp => {
1802                            let _ = stream_registry.unregister_udp(endpoint.port);
1803                            tracing::debug!(
1804                                service = %name,
1805                                port = endpoint.port,
1806                                "Unregistered UDP stream route"
1807                            );
1808                        }
1809                        _ => {} // HTTP routes handled above
1810                    }
1811                }
1812            }
1813            drop(services); // Release read lock
1814        }
1815
1816        // Unregister containers from the supervisor
1817        if let Some(supervisor) = &self.container_supervisor {
1818            let containers = self.get_service_containers(name).await;
1819            for container_id in containers {
1820                supervisor.unsupervise(&container_id).await;
1821            }
1822            tracing::debug!(service = %name, "Unregistered containers from supervisor");
1823        }
1824
1825        // Clean up DNS records for the service
1826        if let Some(dns) = &self.dns_server {
1827            // Remove the service-level DNS entry
1828            let service_hostname = format!("{name}.service.local");
1829            if let Err(e) = dns.remove_record(&service_hostname).await {
1830                tracing::warn!(
1831                    hostname = %service_hostname,
1832                    error = %e,
1833                    "failed to remove service DNS record"
1834                );
1835            } else {
1836                tracing::debug!(
1837                    hostname = %service_hostname,
1838                    "removed service DNS record"
1839                );
1840            }
1841
1842            // Also remove any remaining replica-specific DNS entries
1843            let services = self.services.read().await;
1844            if let Some(instance) = services.get(name) {
1845                let containers = instance.containers().read().await;
1846                for (id, _) in containers.iter() {
1847                    let replica_hostname = format!("{}.{}.service.local", id.replica, name);
1848                    if let Err(e) = dns.remove_record(&replica_hostname).await {
1849                        tracing::warn!(
1850                            hostname = %replica_hostname,
1851                            error = %e,
1852                            "failed to remove replica DNS record during service removal"
1853                        );
1854                    }
1855                }
1856            }
1857            drop(services); // Release read lock before write lock
1858        }
1859
1860        // Remove from services map (may or may not exist depending on rtype)
1861        let mut services = self.services.write().await;
1862        if services.remove(name).is_some() {
1863            tracing::debug!(service = %name, "Removed service from manager");
1864        }
1865
1866        Ok(())
1867    }
1868
1869    /// Introspect service infrastructure wiring.
1870    /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
1871    pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
1872        let services = self.services.read().await;
1873        services.get(name).map(|i| {
1874            (
1875                i.has_overlay_manager(),
1876                i.has_proxy_manager(),
1877                i.has_dns_server(),
1878            )
1879        })
1880    }
1881
1882    /// List all services
1883    pub async fn list_services(&self) -> Vec<String> {
1884        self.services.read().await.keys().cloned().collect()
1885    }
1886
1887    /// Get logs for a service, aggregated from all container replicas.
1888    ///
1889    /// # Arguments
1890    /// * `service_name` - Name of the service to fetch logs for
1891    /// * `tail` - Number of lines to return per container (0 = all)
1892    /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
1893    ///
1894    /// # Errors
1895    /// Returns an error if the service or instance is not found.
1896    ///
1897    /// # Returns
1898    /// Structured log entries from all (or specific) container replicas. Each
1899    /// entry has its `service` and `deployment` fields populated when available.
1900    pub async fn get_service_logs(
1901        &self,
1902        service_name: &str,
1903        tail: usize,
1904        instance: Option<&str>,
1905    ) -> Result<Vec<LogEntry>> {
1906        let container_ids = self.get_service_containers(service_name).await;
1907
1908        if container_ids.is_empty() {
1909            return Err(AgentError::NotFound {
1910                container: service_name.to_string(),
1911                reason: "no containers found for service".to_string(),
1912            });
1913        }
1914
1915        // If a specific instance is requested, filter to just that one
1916        let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
1917            if let Ok(replica_num) = inst.parse::<u32>() {
1918                container_ids
1919                    .iter()
1920                    .filter(|id| id.replica == replica_num)
1921                    .collect()
1922            } else {
1923                // Try matching by full container ID string suffix
1924                container_ids
1925                    .iter()
1926                    .filter(|id| id.to_string().contains(inst))
1927                    .collect()
1928            }
1929        } else {
1930            container_ids.iter().collect()
1931        };
1932
1933        if target_ids.is_empty() {
1934            return Err(AgentError::NotFound {
1935                container: format!("{}/{}", service_name, instance.unwrap_or("?")),
1936                reason: "instance not found".to_string(),
1937            });
1938        }
1939
1940        let mut all_entries: Vec<LogEntry> = Vec::new();
1941
1942        for id in &target_ids {
1943            match self.runtime.container_logs(id, tail).await {
1944                Ok(mut entries) => {
1945                    // Populate service and deployment metadata on each entry
1946                    for entry in &mut entries {
1947                        if entry.service.is_none() {
1948                            entry.service = Some(service_name.to_string());
1949                        }
1950                        if entry.deployment.is_none() {
1951                            entry.deployment.clone_from(&self.deployment_name);
1952                        }
1953                    }
1954                    all_entries.extend(entries);
1955                }
1956                Err(e) => {
1957                    tracing::warn!(
1958                        service = service_name,
1959                        container = %id,
1960                        error = %e,
1961                        "Failed to read container logs"
1962                    );
1963                }
1964            }
1965        }
1966
1967        Ok(all_entries)
1968    }
1969
1970    /// Get all container IDs for a specific service
1971    ///
1972    /// Returns an empty vector if the service doesn't exist.
1973    ///
1974    /// # Arguments
1975    /// * `service_name` - Name of the service to query
1976    ///
1977    /// # Returns
1978    /// Vector of `ContainerIds` for all replicas of the service
1979    pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
1980        let services = self.services.read().await;
1981        if let Some(instance) = services.get(service_name) {
1982            instance.container_ids().await
1983        } else {
1984            Vec::new()
1985        }
1986    }
1987
1988    /// Execute a command inside a running container for a service
1989    ///
1990    /// Picks a specific replica if provided, otherwise uses the first available container.
1991    ///
1992    /// # Arguments
1993    /// * `service_name` - Name of the service
1994    /// * `replica` - Optional replica number to target
1995    /// * `cmd` - Command and arguments to execute
1996    ///
1997    /// # Errors
1998    /// Returns an error if the service or replica is not found, or if exec fails.
1999    ///
2000    /// # Panics
2001    /// Panics if no replica is specified and the container list is unexpectedly empty
2002    /// after the emptiness check (should not happen in practice).
2003    ///
2004    /// # Returns
2005    /// Tuple of (`exit_code`, stdout, stderr)
2006    pub async fn exec_in_container(
2007        &self,
2008        service_name: &str,
2009        replica: Option<u32>,
2010        cmd: &[String],
2011    ) -> Result<(i32, String, String)> {
2012        let container_ids = self.get_service_containers(service_name).await;
2013
2014        if container_ids.is_empty() {
2015            return Err(AgentError::NotFound {
2016                container: service_name.to_string(),
2017                reason: "no containers found for service".to_string(),
2018            });
2019        }
2020
2021        // Pick the target container
2022        let target = if let Some(rep) = replica {
2023            container_ids
2024                .into_iter()
2025                .find(|cid| cid.replica == rep)
2026                .ok_or_else(|| AgentError::NotFound {
2027                    container: format!("{service_name}-rep-{rep}"),
2028                    reason: format!("replica {rep} not found for service"),
2029                })?
2030        } else {
2031            // Use the first container (lowest replica number)
2032            container_ids.into_iter().next().unwrap()
2033        };
2034
2035        self.runtime.exec(&target, cmd).await
2036    }
2037
2038    // ==================== Job Management ====================
2039
2040    /// Trigger a job execution
2041    ///
2042    /// # Arguments
2043    /// * `name` - Name of the registered job
2044    /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
2045    ///
2046    /// # Returns
2047    /// The execution ID for tracking the job
2048    ///
2049    /// # Errors
2050    /// - Returns error if job executor is not configured
2051    /// - Returns error if the job is not registered
2052    pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
2053        let executor = self
2054            .job_executor
2055            .as_ref()
2056            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2057
2058        let spec = executor
2059            .get_job_spec(name)
2060            .await
2061            .ok_or_else(|| AgentError::NotFound {
2062                container: name.to_string(),
2063                reason: "job not registered".to_string(),
2064            })?;
2065
2066        executor.trigger(name, &spec, trigger).await
2067    }
2068
2069    /// Get the status of a job execution
2070    ///
2071    /// # Arguments
2072    /// * `id` - The execution ID returned from `trigger_job`
2073    ///
2074    /// # Returns
2075    /// The job execution details, or None if not found
2076    pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
2077        if let Some(executor) = &self.job_executor {
2078            executor.get_execution(id).await
2079        } else {
2080            None
2081        }
2082    }
2083
2084    /// List all executions for a specific job
2085    ///
2086    /// # Arguments
2087    /// * `name` - Name of the job
2088    ///
2089    /// # Returns
2090    /// Vector of job executions for the specified job
2091    pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
2092        if let Some(executor) = &self.job_executor {
2093            executor.list_executions(name).await
2094        } else {
2095            Vec::new()
2096        }
2097    }
2098
2099    /// Cancel a running job execution
2100    ///
2101    /// # Arguments
2102    /// * `id` - The execution ID to cancel
2103    ///
2104    /// # Errors
2105    /// Returns error if job executor is not configured or if cancellation fails
2106    pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
2107        let executor = self
2108            .job_executor
2109            .as_ref()
2110            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2111
2112        executor.cancel(id).await
2113    }
2114
2115    // ==================== Cron Management ====================
2116
2117    /// Manually trigger a cron job (outside of its schedule)
2118    ///
2119    /// # Arguments
2120    /// * `name` - Name of the cron job
2121    ///
2122    /// # Returns
2123    /// The execution ID for tracking the triggered job
2124    ///
2125    /// # Errors
2126    /// Returns error if cron scheduler is not configured or job not found
2127    pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
2128        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2129            AgentError::Configuration("Cron scheduler not configured".to_string())
2130        })?;
2131
2132        scheduler.trigger_now(name).await
2133    }
2134
2135    /// Enable or disable a cron job
2136    ///
2137    /// # Arguments
2138    /// * `name` - Name of the cron job
2139    /// * `enabled` - Whether to enable or disable the job
2140    pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
2141        if let Some(scheduler) = &self.cron_scheduler {
2142            scheduler.set_enabled(name, enabled).await;
2143        }
2144    }
2145
2146    /// List all registered cron jobs
2147    pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2148        if let Some(scheduler) = &self.cron_scheduler {
2149            scheduler.list_jobs().await
2150        } else {
2151            Vec::new()
2152        }
2153    }
2154
2155    /// Start the cron scheduler background task
2156    ///
2157    /// This spawns a background task that checks for due cron jobs every second.
2158    /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
2159    ///
2160    /// # Errors
2161    /// Returns error if cron scheduler is not configured
2162    pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2163        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2164            AgentError::Configuration("Cron scheduler not configured".to_string())
2165        })?;
2166
2167        let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2168        Ok(tokio::spawn(async move {
2169            scheduler.run_loop().await;
2170        }))
2171    }
2172
2173    /// Shutdown the cron scheduler
2174    pub fn shutdown_cron(&self) {
2175        if let Some(scheduler) = &self.cron_scheduler {
2176            scheduler.shutdown();
2177        }
2178    }
2179}
2180
2181#[cfg(test)]
2182#[allow(deprecated)]
2183mod tests {
2184    use super::*;
2185    use crate::runtime::MockRuntime;
2186
2187    #[tokio::test]
2188    async fn test_service_manager() {
2189        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2190        let manager = ServiceManager::new(runtime);
2191
2192        // Add service
2193        let spec = mock_spec();
2194        manager
2195            .upsert_service("test".to_string(), spec)
2196            .await
2197            .unwrap();
2198
2199        // Scale up
2200        manager.scale_service("test", 3).await.unwrap();
2201
2202        // Check count
2203        let count = manager.service_replica_count("test").await.unwrap();
2204        assert_eq!(count, 3);
2205
2206        // List services
2207        let services = manager.list_services().await;
2208        assert_eq!(services, vec!["test".to_string()]);
2209    }
2210
2211    #[tokio::test]
2212    async fn test_service_manager_basic_lifecycle() {
2213        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2214        let manager = ServiceManager::new(runtime);
2215
2216        // Add service with HTTP endpoint
2217        let spec = mock_spec();
2218        manager
2219            .upsert_service("api".to_string(), spec)
2220            .await
2221            .unwrap();
2222
2223        // Scale up
2224        manager.scale_service("api", 2).await.unwrap();
2225
2226        // Check count
2227        let count = manager.service_replica_count("api").await.unwrap();
2228        assert_eq!(count, 2);
2229
2230        // Remove service
2231        manager.remove_service("api").await.unwrap();
2232
2233        // Verify service is gone
2234        let services = manager.list_services().await;
2235        assert!(!services.contains(&"api".to_string()));
2236    }
2237
2238    #[tokio::test]
2239    async fn test_service_manager_with_full_config() {
2240        use tokio::sync::RwLock;
2241
2242        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2243
2244        // Create a mock overlay manager (skip actual network setup)
2245        let overlay_manager = Arc::new(RwLock::new(
2246            OverlayManager::new("test-deployment".to_string())
2247                .await
2248                .unwrap(),
2249        ));
2250
2251        let manager =
2252            ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2253
2254        // Add service
2255        let spec = mock_spec();
2256        manager
2257            .upsert_service("web".to_string(), spec)
2258            .await
2259            .unwrap();
2260
2261        // Verify service is registered
2262        let services = manager.list_services().await;
2263        assert!(services.contains(&"web".to_string()));
2264    }
2265
2266    fn mock_spec() -> ServiceSpec {
2267        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2268            r"
2269version: v1
2270deployment: test
2271services:
2272  test:
2273    rtype: service
2274    image:
2275      name: test:latest
2276    endpoints:
2277      - name: http
2278        protocol: http
2279        port: 8080
2280    scale:
2281      mode: fixed
2282      replicas: 1
2283",
2284        )
2285        .unwrap()
2286        .services
2287        .remove("test")
2288        .unwrap()
2289    }
2290
2291    /// Helper to create a `ServiceSpec` with dependencies
2292    fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
2293        let mut spec = mock_spec();
2294        spec.depends = deps;
2295        spec
2296    }
2297
2298    /// Helper to create a `DependsSpec`
2299    fn dep(
2300        service: &str,
2301        condition: zlayer_spec::DependencyCondition,
2302        timeout_ms: u64,
2303        on_timeout: zlayer_spec::TimeoutAction,
2304    ) -> DependsSpec {
2305        DependsSpec {
2306            service: service.to_string(),
2307            condition,
2308            timeout: Some(Duration::from_millis(timeout_ms)),
2309            on_timeout,
2310        }
2311    }
2312
2313    #[tokio::test]
2314    async fn test_deploy_with_dependencies_no_deps() {
2315        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2316        let manager = ServiceManager::new(runtime);
2317
2318        // Services with no dependencies
2319        let mut services = HashMap::new();
2320        services.insert("a".to_string(), mock_spec());
2321        services.insert("b".to_string(), mock_spec());
2322
2323        // Should deploy both without issue
2324        manager.deploy_with_dependencies(services).await.unwrap();
2325
2326        // Both services should be registered
2327        let service_list = manager.list_services().await;
2328        assert_eq!(service_list.len(), 2);
2329    }
2330
2331    #[tokio::test]
2332    async fn test_deploy_with_dependencies_linear() {
2333        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2334        let manager = ServiceManager::new(runtime);
2335
2336        // A -> B -> C (A depends on B, B depends on C)
2337        // All use "started" condition which is satisfied when container is running
2338        let mut services = HashMap::new();
2339        services.insert("c".to_string(), mock_spec());
2340        services.insert(
2341            "b".to_string(),
2342            mock_spec_with_deps(vec![dep(
2343                "c",
2344                zlayer_spec::DependencyCondition::Started,
2345                5000,
2346                zlayer_spec::TimeoutAction::Fail,
2347            )]),
2348        );
2349        services.insert(
2350            "a".to_string(),
2351            mock_spec_with_deps(vec![dep(
2352                "b",
2353                zlayer_spec::DependencyCondition::Started,
2354                5000,
2355                zlayer_spec::TimeoutAction::Fail,
2356            )]),
2357        );
2358
2359        // Should deploy in order: c, b, a
2360        manager.deploy_with_dependencies(services).await.unwrap();
2361
2362        // All services should be registered
2363        let service_list = manager.list_services().await;
2364        assert_eq!(service_list.len(), 3);
2365    }
2366
2367    #[tokio::test]
2368    async fn test_deploy_with_dependencies_cycle_detection() {
2369        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2370        let manager = ServiceManager::new(runtime);
2371
2372        // A -> B -> A (cycle)
2373        let mut services = HashMap::new();
2374        services.insert(
2375            "a".to_string(),
2376            mock_spec_with_deps(vec![dep(
2377                "b",
2378                zlayer_spec::DependencyCondition::Started,
2379                5000,
2380                zlayer_spec::TimeoutAction::Fail,
2381            )]),
2382        );
2383        services.insert(
2384            "b".to_string(),
2385            mock_spec_with_deps(vec![dep(
2386                "a",
2387                zlayer_spec::DependencyCondition::Started,
2388                5000,
2389                zlayer_spec::TimeoutAction::Fail,
2390            )]),
2391        );
2392
2393        // Should fail with cycle detection
2394        let result = manager.deploy_with_dependencies(services).await;
2395        assert!(result.is_err());
2396        let err = result.unwrap_err().to_string();
2397        assert!(err.contains("Cyclic dependency"));
2398    }
2399
2400    #[tokio::test]
2401    async fn test_deploy_with_dependencies_timeout_continue() {
2402        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2403        let manager = ServiceManager::new(runtime);
2404
2405        // A depends on B (healthy), but B never becomes healthy
2406        // Using continue action, so it should proceed anyway
2407        let mut services = HashMap::new();
2408        services.insert("b".to_string(), mock_spec());
2409        services.insert(
2410            "a".to_string(),
2411            mock_spec_with_deps(vec![dep(
2412                "b",
2413                zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
2414                100,                                       // Short timeout
2415                zlayer_spec::TimeoutAction::Continue,      // But continue anyway
2416            )]),
2417        );
2418
2419        // Should deploy both despite timeout
2420        manager.deploy_with_dependencies(services).await.unwrap();
2421
2422        let service_list = manager.list_services().await;
2423        assert_eq!(service_list.len(), 2);
2424    }
2425
2426    #[tokio::test]
2427    async fn test_deploy_with_dependencies_timeout_warn() {
2428        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2429        let manager = ServiceManager::new(runtime);
2430
2431        // A depends on B (healthy), but B never becomes healthy
2432        // Using warn action, so it should proceed with a warning
2433        let mut services = HashMap::new();
2434        services.insert("b".to_string(), mock_spec());
2435        services.insert(
2436            "a".to_string(),
2437            mock_spec_with_deps(vec![dep(
2438                "b",
2439                zlayer_spec::DependencyCondition::Healthy,
2440                100,
2441                zlayer_spec::TimeoutAction::Warn,
2442            )]),
2443        );
2444
2445        // Should deploy both despite timeout (with warning)
2446        manager.deploy_with_dependencies(services).await.unwrap();
2447
2448        let service_list = manager.list_services().await;
2449        assert_eq!(service_list.len(), 2);
2450    }
2451
2452    #[tokio::test]
2453    async fn test_deploy_with_dependencies_timeout_fail() {
2454        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2455        let manager = ServiceManager::new(runtime);
2456
2457        // A depends on B (healthy), but B never becomes healthy
2458        // Using fail action, so deployment should fail
2459        let mut services = HashMap::new();
2460        services.insert("b".to_string(), mock_spec());
2461        services.insert(
2462            "a".to_string(),
2463            mock_spec_with_deps(vec![dep(
2464                "b",
2465                zlayer_spec::DependencyCondition::Healthy,
2466                100,
2467                zlayer_spec::TimeoutAction::Fail,
2468            )]),
2469        );
2470
2471        // Should fail after B is started but doesn't become healthy
2472        let result = manager.deploy_with_dependencies(services).await;
2473        assert!(result.is_err());
2474
2475        // B should be started (it has no deps), but A should fail
2476        let err = result.unwrap_err().to_string();
2477        assert!(err.contains("Dependency timeout"));
2478    }
2479
2480    #[tokio::test]
2481    async fn test_check_dependencies_all_satisfied() {
2482        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2483        let manager = ServiceManager::new(runtime);
2484
2485        // Mark a service as healthy
2486        manager
2487            .update_health_state("db", HealthState::Healthy)
2488            .await;
2489
2490        let deps = vec![DependsSpec {
2491            service: "db".to_string(),
2492            condition: zlayer_spec::DependencyCondition::Healthy,
2493            timeout: Some(Duration::from_secs(60)),
2494            on_timeout: zlayer_spec::TimeoutAction::Fail,
2495        }];
2496
2497        let satisfied = manager.check_dependencies(&deps).await.unwrap();
2498        assert!(satisfied);
2499    }
2500
2501    #[tokio::test]
2502    async fn test_check_dependencies_not_satisfied() {
2503        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2504        let manager = ServiceManager::new(runtime);
2505
2506        // Service not healthy (no state set = Unknown)
2507        let deps = vec![DependsSpec {
2508            service: "db".to_string(),
2509            condition: zlayer_spec::DependencyCondition::Healthy,
2510            timeout: Some(Duration::from_secs(60)),
2511            on_timeout: zlayer_spec::TimeoutAction::Fail,
2512        }];
2513
2514        let satisfied = manager.check_dependencies(&deps).await.unwrap();
2515        assert!(!satisfied);
2516    }
2517
2518    #[tokio::test]
2519    async fn test_health_state_tracking() {
2520        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2521        let manager = ServiceManager::new(runtime);
2522
2523        // Update health states
2524        manager
2525            .update_health_state("db", HealthState::Healthy)
2526            .await;
2527        manager
2528            .update_health_state("cache", HealthState::Unknown)
2529            .await;
2530
2531        // Verify states
2532        let states = manager.health_states();
2533        let states_read = states.read().await;
2534
2535        assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
2536        assert!(matches!(
2537            states_read.get("cache"),
2538            Some(HealthState::Unknown)
2539        ));
2540    }
2541
2542    // ==================== Job/Cron Integration Tests ====================
2543
2544    fn mock_job_spec() -> ServiceSpec {
2545        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2546            r"
2547version: v1
2548deployment: test
2549services:
2550  backup:
2551    rtype: job
2552    image:
2553      name: backup:latest
2554",
2555        )
2556        .unwrap()
2557        .services
2558        .remove("backup")
2559        .unwrap()
2560    }
2561
2562    fn mock_cron_spec() -> ServiceSpec {
2563        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2564            r#"
2565version: v1
2566deployment: test
2567services:
2568  cleanup:
2569    rtype: cron
2570    schedule: "0 0 * * * * *"
2571    image:
2572      name: cleanup:latest
2573"#,
2574        )
2575        .unwrap()
2576        .services
2577        .remove("cleanup")
2578        .unwrap()
2579    }
2580
2581    #[tokio::test]
2582    async fn test_service_manager_with_job_executor() {
2583        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2584        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2585
2586        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2587
2588        // Register job
2589        let job_spec = mock_job_spec();
2590        manager
2591            .upsert_service("backup".to_string(), job_spec)
2592            .await
2593            .unwrap();
2594
2595        // Trigger job
2596        let exec_id = manager
2597            .trigger_job("backup", JobTrigger::Cli)
2598            .await
2599            .unwrap();
2600
2601        // Give job time to start
2602        tokio::time::sleep(Duration::from_millis(50)).await;
2603
2604        // Check execution exists
2605        let execution = manager.get_job_execution(&exec_id).await;
2606        assert!(execution.is_some());
2607        assert_eq!(execution.unwrap().job_name, "backup");
2608    }
2609
2610    #[tokio::test]
2611    async fn test_service_manager_with_cron_scheduler() {
2612        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2613        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2614        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2615
2616        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2617
2618        // Register cron job
2619        let cron_spec = mock_cron_spec();
2620        manager
2621            .upsert_service("cleanup".to_string(), cron_spec)
2622            .await
2623            .unwrap();
2624
2625        // List cron jobs
2626        let cron_jobs = manager.list_cron_jobs().await;
2627        assert_eq!(cron_jobs.len(), 1);
2628        assert_eq!(cron_jobs[0].name, "cleanup");
2629        assert!(cron_jobs[0].enabled);
2630    }
2631
2632    #[tokio::test]
2633    async fn test_service_manager_trigger_cron() {
2634        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2635        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2636        let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
2637
2638        let manager = ServiceManager::new(runtime)
2639            .with_job_executor(job_executor)
2640            .with_cron_scheduler(cron_scheduler);
2641
2642        // Register cron job
2643        let cron_spec = mock_cron_spec();
2644        manager
2645            .upsert_service("cleanup".to_string(), cron_spec)
2646            .await
2647            .unwrap();
2648
2649        // Manually trigger the cron job
2650        let exec_id = manager.trigger_cron("cleanup").await.unwrap();
2651        assert!(!exec_id.0.is_empty());
2652    }
2653
2654    #[tokio::test]
2655    async fn test_service_manager_enable_disable_cron() {
2656        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2657        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2658        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2659
2660        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2661
2662        // Register cron job
2663        let cron_spec = mock_cron_spec();
2664        manager
2665            .upsert_service("cleanup".to_string(), cron_spec)
2666            .await
2667            .unwrap();
2668
2669        // Initially enabled
2670        let cron_jobs = manager.list_cron_jobs().await;
2671        assert!(cron_jobs[0].enabled);
2672
2673        // Disable
2674        manager.set_cron_enabled("cleanup", false).await;
2675        let cron_jobs = manager.list_cron_jobs().await;
2676        assert!(!cron_jobs[0].enabled);
2677
2678        // Re-enable
2679        manager.set_cron_enabled("cleanup", true).await;
2680        let cron_jobs = manager.list_cron_jobs().await;
2681        assert!(cron_jobs[0].enabled);
2682    }
2683
2684    #[tokio::test]
2685    async fn test_service_manager_remove_cleans_up_job() {
2686        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2687        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2688
2689        let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
2690
2691        // Register job
2692        let job_spec = mock_job_spec();
2693        manager
2694            .upsert_service("backup".to_string(), job_spec)
2695            .await
2696            .unwrap();
2697
2698        // Verify job is registered
2699        let spec = job_executor.get_job_spec("backup").await;
2700        assert!(spec.is_some());
2701
2702        // Remove job
2703        manager.remove_service("backup").await.unwrap();
2704
2705        // Verify job is unregistered
2706        let spec = job_executor.get_job_spec("backup").await;
2707        assert!(spec.is_none());
2708    }
2709
2710    #[tokio::test]
2711    async fn test_service_manager_remove_cleans_up_cron() {
2712        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2713        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2714        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2715
2716        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
2717
2718        // Register cron job
2719        let cron_spec = mock_cron_spec();
2720        manager
2721            .upsert_service("cleanup".to_string(), cron_spec)
2722            .await
2723            .unwrap();
2724
2725        // Verify cron job is registered
2726        assert_eq!(cron_scheduler.job_count().await, 1);
2727
2728        // Remove cron job
2729        manager.remove_service("cleanup").await.unwrap();
2730
2731        // Verify cron job is unregistered
2732        assert_eq!(cron_scheduler.job_count().await, 0);
2733    }
2734
2735    #[tokio::test]
2736    async fn test_service_manager_job_without_executor() {
2737        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2738        let manager = ServiceManager::new(runtime);
2739
2740        // Try to trigger job without executor configured
2741        let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
2742        assert!(result.is_err());
2743        assert!(result.unwrap_err().to_string().contains("not configured"));
2744    }
2745
2746    #[tokio::test]
2747    async fn test_service_manager_cron_without_scheduler() {
2748        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2749        let manager = ServiceManager::new(runtime);
2750
2751        // Try to register cron job without scheduler configured
2752        let cron_spec = mock_cron_spec();
2753        let result = manager
2754            .upsert_service("cleanup".to_string(), cron_spec)
2755            .await;
2756        assert!(result.is_err());
2757        assert!(result.unwrap_err().to_string().contains("not configured"));
2758    }
2759
2760    #[tokio::test]
2761    async fn test_service_manager_list_job_executions() {
2762        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2763        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2764
2765        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2766
2767        // Register job
2768        let job_spec = mock_job_spec();
2769        manager
2770            .upsert_service("backup".to_string(), job_spec)
2771            .await
2772            .unwrap();
2773
2774        // Trigger job twice
2775        manager
2776            .trigger_job("backup", JobTrigger::Cli)
2777            .await
2778            .unwrap();
2779        manager
2780            .trigger_job("backup", JobTrigger::Scheduler)
2781            .await
2782            .unwrap();
2783
2784        // Give jobs time to start
2785        tokio::time::sleep(Duration::from_millis(50)).await;
2786
2787        // List executions
2788        let executions = manager.list_job_executions("backup").await;
2789        assert_eq!(executions.len(), 2);
2790    }
2791
2792    // ==================== Container Supervisor Integration Tests ====================
2793
2794    #[tokio::test]
2795    async fn test_service_manager_with_supervisor() {
2796        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2797        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2798
2799        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2800
2801        // Add service
2802        let spec = mock_spec();
2803        manager
2804            .upsert_service("api".to_string(), spec)
2805            .await
2806            .unwrap();
2807
2808        // Scale up - containers should be registered with supervisor
2809        manager.scale_service("api", 2).await.unwrap();
2810
2811        // Verify containers are supervised
2812        assert_eq!(supervisor.supervised_count().await, 2);
2813
2814        // Scale down - containers should be unregistered
2815        manager.scale_service("api", 1).await.unwrap();
2816        assert_eq!(supervisor.supervised_count().await, 1);
2817
2818        // Remove service - remaining containers should be unregistered
2819        manager.remove_service("api").await.unwrap();
2820        assert_eq!(supervisor.supervised_count().await, 0);
2821    }
2822
2823    #[tokio::test]
2824    async fn test_service_manager_supervisor_state() {
2825        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2826        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2827
2828        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
2829
2830        // Add and scale service
2831        let spec = mock_spec();
2832        manager
2833            .upsert_service("web".to_string(), spec)
2834            .await
2835            .unwrap();
2836        manager.scale_service("web", 1).await.unwrap();
2837
2838        // Check supervised state
2839        let container_id = ContainerId {
2840            service: "web".to_string(),
2841            replica: 1,
2842        };
2843        let state = manager.get_container_supervised_state(&container_id).await;
2844        assert_eq!(state, Some(SupervisedState::Running));
2845    }
2846
2847    #[tokio::test]
2848    async fn test_service_manager_start_supervisor() {
2849        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2850        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2851
2852        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2853
2854        // Start the supervisor
2855        let handle = manager.start_container_supervisor().unwrap();
2856
2857        // Give it time to start
2858        tokio::time::sleep(Duration::from_millis(50)).await;
2859        assert!(supervisor.is_running());
2860
2861        // Shutdown
2862        manager.shutdown_container_supervisor();
2863
2864        // Wait for it to stop
2865        tokio::time::timeout(Duration::from_secs(1), handle)
2866            .await
2867            .unwrap()
2868            .unwrap();
2869
2870        assert!(!supervisor.is_running());
2871    }
2872
2873    #[tokio::test]
2874    async fn test_service_manager_supervisor_not_configured() {
2875        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2876        let manager = ServiceManager::new(runtime);
2877
2878        // Try to start supervisor without configuring it
2879        let result = manager.start_container_supervisor();
2880        assert!(result.is_err());
2881        assert!(result.unwrap_err().to_string().contains("not configured"));
2882    }
2883
2884    // ==================== Stream Registry Integration Tests ====================
2885
2886    fn mock_tcp_spec() -> ServiceSpec {
2887        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2888            r"
2889version: v1
2890deployment: test
2891services:
2892  database:
2893    rtype: service
2894    image:
2895      name: postgres:latest
2896    endpoints:
2897      - name: postgresql
2898        protocol: tcp
2899        port: 5432
2900    scale:
2901      mode: fixed
2902      replicas: 1
2903",
2904        )
2905        .unwrap()
2906        .services
2907        .remove("database")
2908        .unwrap()
2909    }
2910
2911    fn mock_udp_spec() -> ServiceSpec {
2912        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2913            r"
2914version: v1
2915deployment: test
2916services:
2917  dns:
2918    rtype: service
2919    image:
2920      name: dns:latest
2921    endpoints:
2922      - name: dns
2923        protocol: udp
2924        port: 53
2925    scale:
2926      mode: fixed
2927      replicas: 1
2928",
2929        )
2930        .unwrap()
2931        .services
2932        .remove("dns")
2933        .unwrap()
2934    }
2935
2936    fn mock_mixed_spec() -> ServiceSpec {
2937        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2938            r"
2939version: v1
2940deployment: test
2941services:
2942  mixed:
2943    rtype: service
2944    image:
2945      name: mixed:latest
2946    endpoints:
2947      - name: http
2948        protocol: http
2949        port: 8080
2950      - name: grpc
2951        protocol: tcp
2952        port: 9000
2953      - name: metrics
2954        protocol: udp
2955        port: 8125
2956    scale:
2957      mode: fixed
2958      replicas: 1
2959",
2960        )
2961        .unwrap()
2962        .services
2963        .remove("mixed")
2964        .unwrap()
2965    }
2966
2967    #[tokio::test]
2968    async fn test_service_manager_with_stream_registry_tcp() {
2969        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2970        let stream_registry = Arc::new(StreamRegistry::new());
2971
2972        let mut manager = ServiceManager::new(runtime);
2973        manager.set_stream_registry(stream_registry.clone());
2974        manager.set_deployment_name("test".to_string());
2975
2976        // Add TCP-only service
2977        let spec = mock_tcp_spec();
2978        manager
2979            .upsert_service("database".to_string(), spec)
2980            .await
2981            .unwrap();
2982
2983        // Verify TCP route was registered
2984        assert_eq!(stream_registry.tcp_count(), 1);
2985        assert!(stream_registry.tcp_ports().contains(&5432));
2986
2987        // Remove service and verify cleanup
2988        manager.remove_service("database").await.unwrap();
2989        assert_eq!(stream_registry.tcp_count(), 0);
2990    }
2991
2992    #[tokio::test]
2993    async fn test_service_manager_with_stream_registry_udp() {
2994        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2995        let stream_registry = Arc::new(StreamRegistry::new());
2996
2997        let mut manager = ServiceManager::new(runtime);
2998        manager.set_stream_registry(stream_registry.clone());
2999        manager.set_deployment_name("test".to_string());
3000
3001        // Add UDP-only service
3002        let spec = mock_udp_spec();
3003        manager
3004            .upsert_service("dns".to_string(), spec)
3005            .await
3006            .unwrap();
3007
3008        // Verify UDP route was registered
3009        assert_eq!(stream_registry.udp_count(), 1);
3010        assert!(stream_registry.udp_ports().contains(&53));
3011
3012        // Remove service and verify cleanup
3013        manager.remove_service("dns").await.unwrap();
3014        assert_eq!(stream_registry.udp_count(), 0);
3015    }
3016
3017    #[tokio::test]
3018    async fn test_service_manager_with_stream_registry_mixed() {
3019        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3020        let stream_registry = Arc::new(StreamRegistry::new());
3021
3022        let mut manager = ServiceManager::new(runtime);
3023        manager.set_stream_registry(stream_registry.clone());
3024        manager.set_deployment_name("test".to_string());
3025
3026        // Add mixed service (HTTP + TCP + UDP)
3027        let spec = mock_mixed_spec();
3028        manager
3029            .upsert_service("mixed".to_string(), spec)
3030            .await
3031            .unwrap();
3032
3033        // Verify stream routes were registered
3034        assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
3035        assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
3036
3037        assert!(stream_registry.tcp_ports().contains(&9000));
3038        assert!(stream_registry.udp_ports().contains(&8125));
3039
3040        // Remove service and verify stream cleanup
3041        manager.remove_service("mixed").await.unwrap();
3042        assert_eq!(stream_registry.tcp_count(), 0);
3043        assert_eq!(stream_registry.udp_count(), 0);
3044    }
3045
3046    #[tokio::test]
3047    async fn test_service_manager_stream_registry_builder() {
3048        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3049        let stream_registry = Arc::new(StreamRegistry::new());
3050
3051        // Test builder pattern
3052        let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
3053
3054        // Verify stream registry is accessible
3055        assert!(manager.stream_registry().is_some());
3056    }
3057
3058    #[tokio::test]
3059    async fn test_tcp_service_without_stream_registry() {
3060        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3061
3062        // Manager without stream registry
3063        let mut manager = ServiceManager::new(runtime);
3064        manager.set_deployment_name("test".to_string());
3065
3066        // Add TCP service - should log warning but not fail
3067        let spec = mock_tcp_spec();
3068        manager
3069            .upsert_service("database".to_string(), spec)
3070            .await
3071            .unwrap();
3072
3073        // No stream registry to check, but service should be tracked
3074        let services = manager.list_services().await;
3075        assert!(services.contains(&"database".to_string()));
3076    }
3077}