Skip to main content

zlayer_agent/
service.rs

1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6    DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{
24    effective_pull_policy, DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType,
25    ServiceSpec,
26};
27
28/// Service instance manages a single service's containers
29pub struct ServiceInstance {
30    pub service_name: String,
31    pub spec: ServiceSpec,
32    runtime: Arc<dyn Runtime + Send + Sync>,
33    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
34    /// Overlay network manager for container networking (optional, not needed for Docker runtime)
35    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
36    /// Proxy manager for updating backend health (optional)
37    proxy_manager: Option<Arc<ProxyManager>>,
38    /// DNS server for service discovery (optional)
39    dns_server: Option<Arc<DnsServer>>,
40    /// Shared health states map so callbacks can update ServiceManager-level health
41    health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
42    /// Most recently observed image digest after a successful pull. Used by
43    /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
44    /// requiring callers to track digest state externally. Wrapped in a
45    /// `RwLock` so `&self` methods (`scale_to`) can update it.
46    last_pulled_digest: tokio::sync::RwLock<Option<String>>,
47}
48
49impl ServiceInstance {
50    /// Create a new service instance
51    pub fn new(
52        service_name: String,
53        spec: ServiceSpec,
54        runtime: Arc<dyn Runtime + Send + Sync>,
55        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
56    ) -> Self {
57        Self {
58            service_name,
59            spec,
60            runtime,
61            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
62            overlay_manager,
63            proxy_manager: None,
64            dns_server: None,
65            health_states: None,
66            last_pulled_digest: tokio::sync::RwLock::new(None),
67        }
68    }
69
70    /// Create a new service instance with proxy manager for health-aware load balancing
71    pub fn with_proxy(
72        service_name: String,
73        spec: ServiceSpec,
74        runtime: Arc<dyn Runtime + Send + Sync>,
75        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
76        proxy_manager: Arc<ProxyManager>,
77    ) -> Self {
78        Self {
79            service_name,
80            spec,
81            runtime,
82            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
83            overlay_manager,
84            proxy_manager: Some(proxy_manager),
85            dns_server: None,
86            health_states: None,
87            last_pulled_digest: tokio::sync::RwLock::new(None),
88        }
89    }
90
91    /// Builder method to add DNS server for service discovery
92    #[must_use]
93    pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
94        self.dns_server = Some(dns_server);
95        self
96    }
97
98    /// Set the DNS server for service discovery
99    pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
100        self.dns_server = Some(dns_server);
101    }
102
103    /// Set the proxy manager for health-aware load balancing
104    pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
105        self.proxy_manager = Some(proxy_manager);
106    }
107
108    /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
109    pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
110        self.health_states = Some(states);
111    }
112
113    /// Get the last observed image digest (after the most recent successful
114    /// pull). Returns `None` when no pull has happened yet, when the runtime
115    /// does not expose digests, or when no matching `ImageInfo` was found.
116    pub async fn last_pulled_digest(&self) -> Option<String> {
117        self.last_pulled_digest.read().await.clone()
118    }
119
120    /// Pull the service image using `effective_pull_policy` (so a default
121    /// `IfNotPresent` on a `:latest` tag auto-upgrades to `Newer`) and refresh
122    /// the cached digest from `Runtime::list_images` when the runtime exposes
123    /// it. Returns the digest observed after the pull, when known.
124    ///
125    /// `Never` skips the pull entirely; the cached digest is returned
126    /// unchanged.
127    async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
128        let image_str = self.spec.image.name.to_string();
129        let effective = effective_pull_policy(&self.spec.image.name, self.spec.image.pull_policy);
130
131        if matches!(effective, PullPolicy::Never) {
132            return Ok(self.last_pulled_digest.read().await.clone());
133        }
134
135        self.runtime
136            .pull_image_with_policy(&image_str, effective, None)
137            .await
138            .map_err(|e| AgentError::PullFailed {
139                image: self.spec.image.name.to_string(),
140                reason: e.to_string(),
141            })?;
142
143        // Best-effort: try to discover the resolved digest via list_images.
144        // Runtimes that don't support introspection (Unsupported) leave the
145        // cached digest unchanged; drift detection then falls back to "always
146        // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
147        // when no digests are known".
148        let new_digest = match self.runtime.list_images().await {
149            Ok(images) => images
150                .into_iter()
151                .find(|info| info.reference == image_str)
152                .and_then(|info| info.digest),
153            Err(e) => {
154                tracing::debug!(
155                    image = %image_str,
156                    error = %e,
157                    "list_images unavailable; cannot record post-pull digest"
158                );
159                None
160            }
161        };
162
163        if let Some(ref digest) = new_digest {
164            *self.last_pulled_digest.write().await = Some(digest.clone());
165        }
166
167        Ok(new_digest)
168    }
169
170    /// Scale to the desired number of replicas
171    ///
172    /// This method uses short-lived locks to avoid blocking concurrent operations.
173    /// I/O operations (pull, create, start, stop, remove) are performed without
174    /// holding the containers lock to allow other operations to proceed.
175    ///
176    /// # Errors
177    /// Returns an error if image pull, container creation, or container lifecycle operations fail.
178    #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
179    pub async fn scale_to(&self, replicas: u32) -> Result<()> {
180        // Phase 1: Determine current state (short read lock)
181        let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
182
183        // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
184        // here with replicas == current_replicas in the steady state) actually
185        // refreshes the cached digest. We skip the pull when scaling strictly
186        // down (no new containers needed) and when policy is `Never`. Cached
187        // layers make this cheap when nothing changed.
188        let effective = effective_pull_policy(&self.spec.image.name, self.spec.image.pull_policy);
189        if replicas >= current_replicas && !matches!(effective, PullPolicy::Never) {
190            let _ = self.pull_and_refresh_digest().await?;
191        }
192
193        // Phase 2: Scale up - create new containers (no lock held during I/O)
194        if replicas > current_replicas {
195            for i in current_replicas..replicas {
196                let id = ContainerId {
197                    service: self.service_name.clone(),
198                    replica: i + 1,
199                };
200
201                // Create container (no lock needed - I/O operation)
202                //
203                // RouteToPeer must propagate unchanged: the scheduler uses it
204                // to re-place the workload on a capable peer, and wrapping it
205                // in `CreateFailed` would hide the signal and mark the service
206                // dead instead of rescheduling it. All other errors are
207                // normalised to `CreateFailed` for upstream handling.
208                self.runtime
209                    .create_container(&id, &self.spec)
210                    .await
211                    .map_err(|e| match e {
212                        AgentError::RouteToPeer { .. } => e,
213                        other => AgentError::CreateFailed {
214                            id: id.to_string(),
215                            reason: other.to_string(),
216                        },
217                    })?;
218
219                // Run init actions with error policy enforcement (no lock needed)
220                let init_orchestrator = InitOrchestrator::with_error_policy(
221                    id.clone(),
222                    self.spec.init.clone(),
223                    self.spec.errors.clone(),
224                );
225                init_orchestrator.run().await?;
226
227                // Start container (no lock needed - I/O operation)
228                self.runtime
229                    .start_container(&id)
230                    .await
231                    .map_err(|e| AgentError::StartFailed {
232                        id: id.to_string(),
233                        reason: e.to_string(),
234                    })?;
235
236                // Get container PID with retries (may not be immediately available)
237                let mut container_pid = None;
238                for attempt in 1..=5u32 {
239                    match self.runtime.get_container_pid(&id).await {
240                        Ok(Some(pid)) => {
241                            container_pid = Some(pid);
242                            break;
243                        }
244                        Ok(None) if attempt < 5 => {
245                            tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
246                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
247                        }
248                        Ok(None) => {
249                            tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
250                        }
251                        Err(e) => {
252                            tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
253                            if attempt < 5 {
254                                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
255                            }
256                        }
257                    }
258                }
259
260                // Verify the container is still running before attempting
261                // overlay attach. If the init process crashed during start
262                // (bad image, missing libs, failed mount), the PID above is
263                // now dead and every `ip link set ... netns {pid}` will
264                // return a cryptic RTNETLINK error. Surface the real cause
265                // from the container's log tail instead of the cascade.
266                if container_pid.is_some() {
267                    let alive = match self.runtime.container_state(&id).await {
268                        Ok(
269                            ContainerState::Running
270                            | ContainerState::Pending
271                            | ContainerState::Initializing,
272                        ) => true,
273                        Ok(state) => {
274                            tracing::warn!(
275                                container = %id,
276                                ?state,
277                                "container exited before overlay attach could run"
278                            );
279                            false
280                        }
281                        Err(e) => {
282                            // State query failed — don't block the attach on
283                            // it. The overlay manager's own cleanup-on-error
284                            // path now handles the dead-PID case cleanly.
285                            tracing::warn!(
286                                container = %id,
287                                error = %e,
288                                "container state query failed before overlay attach, proceeding"
289                            );
290                            true
291                        }
292                    };
293                    if !alive {
294                        let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
295                            || "  <log read failed>".to_string(),
296                            |entries| {
297                                if entries.is_empty() {
298                                    "  <no log output>".to_string()
299                                } else {
300                                    entries
301                                        .into_iter()
302                                        .map(|e| format!("  {}", e.message))
303                                        .collect::<Vec<_>>()
304                                        .join("\n")
305                                }
306                            },
307                        );
308                        return Err(AgentError::StartFailed {
309                            id: id.to_string(),
310                            reason: format!("container exited during startup:\n{log_tail}"),
311                        });
312                    }
313                }
314
315                // Attach to overlay network if manager is available.
316                //
317                // Linux uses the container PID to enter the netns and attach a
318                // veth. Windows has no PID-addressable netns — the HCN namespace
319                // GUID (obtained from `get_container_namespace_id`) is used
320                // instead, and the endpoint's IP has already been populated by
321                // `EndpointAttachment::create_overlay` during container creation.
322                // We simply register that IP with the slice allocator so host
323                // accounting stays in sync.
324                let overlay_ip = if let Some(overlay) = &self.overlay_manager {
325                    let overlay_guard = overlay.read().await;
326                    #[cfg(target_os = "windows")]
327                    let attach_result: Option<std::net::IpAddr> = {
328                        let _ = container_pid; // unused on Windows
329                        match self.runtime.get_container_namespace_id(&id).await {
330                            Ok(Some(ns_id)) => {
331                                let ip_override =
332                                    self.runtime.get_container_ip(&id).await.ok().flatten();
333                                let dns_server = overlay_guard.dns_server_addr().map(|sa| sa.ip());
334                                let dns_domain =
335                                    overlay_guard.dns_domain().map(ToString::to_string);
336                                match overlay_guard
337                                    .attach_container_hcn(
338                                        ns_id,
339                                        &self.service_name,
340                                        ip_override,
341                                        true,
342                                        dns_server,
343                                        dns_domain,
344                                    )
345                                    .await
346                                {
347                                    Ok(ip) => Some(ip),
348                                    Err(e) => {
349                                        tracing::warn!(
350                                            container = %id,
351                                            error = %e,
352                                            "HCN overlay attach failed"
353                                        );
354                                        None
355                                    }
356                                }
357                            }
358                            Ok(None) => {
359                                tracing::debug!(
360                                    container = %id,
361                                    "skipping HCN overlay attach - no namespace id available"
362                                );
363                                None
364                            }
365                            Err(e) => {
366                                tracing::warn!(
367                                    container = %id,
368                                    error = %e,
369                                    "failed to fetch HCN namespace id"
370                                );
371                                None
372                            }
373                        }
374                    };
375                    #[cfg(not(target_os = "windows"))]
376                    let attach_result: Option<std::net::IpAddr> = {
377                        if let Some(pid) = container_pid {
378                            match overlay_guard
379                                .attach_container(pid, &self.service_name, true)
380                                .await
381                            {
382                                Ok(ip) => Some(ip),
383                                Err(e) => {
384                                    tracing::warn!(
385                                        container = %id,
386                                        error = %e,
387                                        "failed to attach container to overlay network"
388                                    );
389                                    None
390                                }
391                            }
392                        } else {
393                            // No PID available (e.g. WASM runtime) - skip overlay attachment
394                            tracing::debug!(
395                                container = %id,
396                                "skipping overlay attachment - no PID available"
397                            );
398                            None
399                        }
400                    };
401
402                    if let Some(ip) = attach_result {
403                        tracing::info!(
404                            container = %id,
405                            overlay_ip = %ip,
406                            "attached container to overlay network"
407                        );
408
409                        // Register DNS for service discovery
410                        if let Some(dns) = &self.dns_server {
411                            // Register service-level hostname: {service}.service.local
412                            let service_hostname = format!("{}.service.local", self.service_name);
413
414                            // Register replica-specific hostname: {replica}.{service}.service.local
415                            let replica_hostname =
416                                format!("{}.{}.service.local", id.replica, self.service_name);
417
418                            match dns.add_record(&service_hostname, ip).await {
419                                Ok(()) => tracing::debug!(
420                                    hostname = %service_hostname,
421                                    ip = %ip,
422                                    "registered DNS for service"
423                                ),
424                                Err(e) => tracing::warn!(
425                                    hostname = %service_hostname,
426                                    error = %e,
427                                    "failed to register DNS for service"
428                                ),
429                            }
430
431                            // Also register replica-specific entry
432                            if let Err(e) = dns.add_record(&replica_hostname, ip).await {
433                                tracing::warn!(
434                                    hostname = %replica_hostname,
435                                    error = %e,
436                                    "failed to register replica DNS"
437                                );
438                            } else {
439                                tracing::debug!(
440                                    hostname = %replica_hostname,
441                                    ip = %ip,
442                                    "registered DNS for replica"
443                                );
444                            }
445                        }
446
447                        Some(ip)
448                    } else {
449                        None
450                    }
451                } else {
452                    None
453                };
454
455                // If overlay failed, try the container runtime's own IP as fallback
456                let effective_ip = if overlay_ip.is_none() {
457                    match self.runtime.get_container_ip(&id).await {
458                        Ok(Some(ip)) => {
459                            tracing::info!(
460                                container = %id,
461                                ip = %ip,
462                                "using runtime container IP for proxy (overlay unavailable)"
463                            );
464                            Some(ip)
465                        }
466                        Ok(None) => {
467                            tracing::warn!(
468                                container = %id,
469                                "no container IP available from runtime, proxy routing will be unavailable"
470                            );
471                            None
472                        }
473                        Err(e) => {
474                            tracing::warn!(
475                                container = %id,
476                                error = %e,
477                                "failed to get container IP from runtime"
478                            );
479                            None
480                        }
481                    }
482                } else {
483                    overlay_ip
484                };
485
486                tracing::info!(
487                    container = %id,
488                    service = %self.service_name,
489                    overlay_ip = ?overlay_ip,
490                    effective_ip = ?effective_ip,
491                    "Container IP resolution complete"
492                );
493
494                // Query port override from the runtime.
495                // On macOS sandbox, each container is assigned a unique port since
496                // all processes share the host network (no network namespaces).
497                // The runtime passes the port to the process via the PORT env var.
498                let port_override = match self.runtime.get_container_port_override(&id).await {
499                    Ok(Some(port)) => {
500                        tracing::info!(
501                            container = %id,
502                            port = port,
503                            "runtime assigned dynamic port override for this container"
504                        );
505                        Some(port)
506                    }
507                    Ok(None) => None,
508                    Err(e) => {
509                        tracing::warn!(
510                            container = %id,
511                            error = %e,
512                            "failed to query port override from runtime, using spec port"
513                        );
514                        None
515                    }
516                };
517
518                // Start health monitoring and store handle (no lock needed during start)
519                let health_monitor_handle = {
520                    let mut check = self.spec.health.check.clone();
521
522                    // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
523                    // port the container is listening on. With mac-sandbox, each
524                    // replica gets a unique assigned port via port_override.
525                    if let HealthCheck::Tcp { ref mut port } = check {
526                        if *port == 0 {
527                            *port = port_override.unwrap_or_else(|| {
528                                self.spec
529                                    .endpoints
530                                    .iter()
531                                    .find(|ep| {
532                                        matches!(
533                                            ep.protocol,
534                                            Protocol::Http | Protocol::Https | Protocol::Websocket
535                                        )
536                                    })
537                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
538                            });
539                        }
540                    }
541
542                    let start_grace = self
543                        .spec
544                        .health
545                        .start_grace
546                        .unwrap_or(Duration::from_secs(5));
547                    let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
548                    let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
549                    let retries = self.spec.health.retries;
550
551                    let checker = HealthChecker::new(check, effective_ip);
552                    let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
553                        .with_start_grace(start_grace)
554                        .with_check_timeout(check_timeout);
555
556                    // Create health callback to update proxy backend health if proxy is configured
557                    // and we have an overlay IP for this container
558                    if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
559                        let proxy = Arc::clone(proxy);
560                        let service_name = self.service_name.clone();
561                        // Get the container's target port, using the runtime override if present.
562                        // On macOS sandbox, port_override gives each replica a unique port
563                        // so the proxy can distinguish backends sharing 127.0.0.1.
564                        let port = port_override.unwrap_or_else(|| {
565                            self.spec
566                                .endpoints
567                                .iter()
568                                .find(|ep| {
569                                    matches!(
570                                        ep.protocol,
571                                        Protocol::Http | Protocol::Https | Protocol::Websocket
572                                    )
573                                })
574                                .map_or(8080, zlayer_spec::EndpointSpec::target_port)
575                        });
576
577                        let backend_addr = SocketAddr::new(ip, port);
578
579                        // Register backend with load balancer so proxy can route to it.
580                        // This must happen before the health callback is created, because
581                        // update_backend_health only updates *existing* backends.
582                        proxy.add_backend(&self.service_name, backend_addr).await;
583
584                        let health_states_opt = self.health_states.clone();
585                        let svc_name_for_states = self.service_name.clone();
586
587                        let health_callback: HealthCallback =
588                            Arc::new(move |container_id: ContainerId, is_healthy: bool| {
589                                let proxy = Arc::clone(&proxy);
590                                let service_name = service_name.clone();
591                                tracing::info!(
592                                    container = %container_id,
593                                    service = %service_name,
594                                    backend = %backend_addr,
595                                    healthy = is_healthy,
596                                    "health status changed, updating proxy backend"
597                                );
598                                // Spawn a task to update the proxy (callback is sync, proxy update is async)
599                                tokio::spawn(async move {
600                                    proxy
601                                        .update_backend_health(
602                                            &service_name,
603                                            backend_addr,
604                                            is_healthy,
605                                        )
606                                        .await;
607                                });
608                                // Bridge health state back to ServiceManager's health_states map
609                                if let Some(ref health_states) = health_states_opt {
610                                    let states = Arc::clone(health_states);
611                                    let svc = svc_name_for_states.clone();
612                                    tokio::spawn(async move {
613                                        let state = if is_healthy {
614                                            HealthState::Healthy
615                                        } else {
616                                            HealthState::Unhealthy {
617                                                failures: 0,
618                                                reason: "health check failed".into(),
619                                            }
620                                        };
621                                        states.write().await.insert(svc, state);
622                                    });
623                                }
624                            });
625
626                        monitor = monitor.with_callback(health_callback);
627                    }
628
629                    monitor.start()
630                };
631
632                // Update state (short write lock)
633                {
634                    let mut containers = self.containers.write().await;
635                    containers.insert(
636                        id.clone(),
637                        Container {
638                            id: id.clone(),
639                            state: ContainerState::Running,
640                            pid: None,
641                            task: None,
642                            overlay_ip: effective_ip,
643                            health_monitor: Some(health_monitor_handle),
644                            port_override,
645                        },
646                    );
647                } // Lock released here
648            }
649        }
650
651        // Phase 3: Scale down - remove containers (short write lock per removal)
652        if replicas < current_replicas {
653            for i in replicas..current_replicas {
654                let id = ContainerId {
655                    service: self.service_name.clone(),
656                    replica: i + 1,
657                };
658
659                // Remove from state first and get the container to abort health monitor (short write lock)
660                let removed_container = {
661                    let mut containers = self.containers.write().await;
662                    containers.remove(&id)
663                }; // Lock released here
664
665                // Then perform cleanup (no lock held - I/O operations)
666                if let Some(container) = removed_container {
667                    // Abort the health monitor task if it exists
668                    if let Some(handle) = container.health_monitor {
669                        handle.abort();
670                    }
671
672                    // Remove DNS records for this container
673                    if let Some(dns) = &self.dns_server {
674                        // Remove replica-specific DNS entry
675                        let replica_hostname =
676                            format!("{}.{}.service.local", id.replica, self.service_name);
677                        if let Err(e) = dns.remove_record(&replica_hostname).await {
678                            tracing::warn!(
679                                hostname = %replica_hostname,
680                                error = %e,
681                                "failed to remove replica DNS record"
682                            );
683                        } else {
684                            tracing::debug!(
685                                hostname = %replica_hostname,
686                                "removed replica DNS record"
687                            );
688                        }
689
690                        // Note: We don't remove the service-level hostname here because
691                        // other replicas may still be using it. The service-level record
692                        // should be cleaned up when the entire service is removed.
693                    }
694
695                    // Stop container
696                    self.runtime
697                        .stop_container(&id, Duration::from_secs(30))
698                        .await?;
699
700                    // Sync volumes to S3 before removal (no-op if not configured)
701                    if let Err(e) = self.runtime.sync_container_volumes(&id).await {
702                        tracing::warn!(
703                            container = %id,
704                            error = %e,
705                            "failed to sync volumes before removal"
706                        );
707                    }
708
709                    // Remove container
710                    self.runtime.remove_container(&id).await?;
711                }
712            }
713        }
714
715        Ok(())
716    }
717
718    /// Get current number of replicas
719    pub async fn replica_count(&self) -> usize {
720        self.containers.read().await.len()
721    }
722
723    /// Get all container IDs
724    pub async fn container_ids(&self) -> Vec<ContainerId> {
725        self.containers.read().await.keys().cloned().collect()
726    }
727
728    /// Get read access to the containers map
729    ///
730    /// This allows callers to access container overlay IPs and other metadata
731    /// without copying the entire map.
732    pub fn containers(
733        &self,
734    ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
735        &self.containers
736    }
737
738    /// Check if this service instance has an overlay manager configured
739    pub fn has_overlay_manager(&self) -> bool {
740        self.overlay_manager.is_some()
741    }
742
743    /// Check if this service instance has a proxy manager configured
744    pub fn has_proxy_manager(&self) -> bool {
745        self.proxy_manager.is_some()
746    }
747
748    /// Check if this service instance has a DNS server configured
749    pub fn has_dns_server(&self) -> bool {
750        self.dns_server.is_some()
751    }
752}
753
754/// Service manager for multiple services
755pub struct ServiceManager {
756    runtime: Arc<dyn Runtime + Send + Sync>,
757    services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
758    scale_semaphore: Arc<Semaphore>,
759    /// Overlay network manager for container networking
760    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
761    /// Stream registry for L4 proxy route registration (TCP/UDP)
762    stream_registry: Option<Arc<StreamRegistry>>,
763    /// Proxy manager for health-aware load balancing (hyper-based proxy)
764    proxy_manager: Option<Arc<ProxyManager>>,
765    /// DNS server for service discovery
766    dns_server: Option<Arc<DnsServer>>,
767    /// Deployment name (used for generating hostnames)
768    deployment_name: Option<String>,
769    /// Health states for dependency condition checking
770    health_states: Arc<RwLock<HashMap<String, HealthState>>>,
771    /// Job executor for run-to-completion workloads
772    job_executor: Option<Arc<JobExecutor>>,
773    /// Cron scheduler for time-based job triggers
774    cron_scheduler: Option<Arc<CronScheduler>>,
775    /// Container supervisor for crash/panic policy enforcement
776    container_supervisor: Option<Arc<ContainerSupervisor>>,
777}
778
779// ---------------------------------------------------------------------------
780// ServiceManagerBuilder
781// ---------------------------------------------------------------------------
782
783/// Builder for constructing a [`ServiceManager`] with optional subsystems.
784///
785/// Prefer using `ServiceManager::builder(runtime)` to start building.
786///
787/// # Example
788///
789/// ```ignore
790/// let manager = ServiceManager::builder(runtime)
791///     .overlay_manager(om)
792///     .proxy_manager(proxy)
793///     .deployment_name("prod")
794///     .build();
795/// ```
796pub struct ServiceManagerBuilder {
797    runtime: Arc<dyn Runtime + Send + Sync>,
798    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
799    proxy_manager: Option<Arc<ProxyManager>>,
800    stream_registry: Option<Arc<StreamRegistry>>,
801    dns_server: Option<Arc<DnsServer>>,
802    deployment_name: Option<String>,
803    job_executor: Option<Arc<JobExecutor>>,
804    cron_scheduler: Option<Arc<CronScheduler>>,
805    container_supervisor: Option<Arc<ContainerSupervisor>>,
806}
807
808impl ServiceManagerBuilder {
809    /// Create a new builder with the required runtime.
810    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
811        Self {
812            runtime,
813            overlay_manager: None,
814            proxy_manager: None,
815            stream_registry: None,
816            dns_server: None,
817            deployment_name: None,
818            job_executor: None,
819            cron_scheduler: None,
820            container_supervisor: None,
821        }
822    }
823
824    /// Set the overlay network manager for container networking.
825    #[must_use]
826    pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
827        self.overlay_manager = Some(om);
828        self
829    }
830
831    /// Set the proxy manager for health-aware load balancing.
832    #[must_use]
833    pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
834        self.proxy_manager = Some(pm);
835        self
836    }
837
838    /// Set the stream registry for TCP/UDP L4 proxy route registration.
839    #[must_use]
840    pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
841        self.stream_registry = Some(sr);
842        self
843    }
844
845    /// Set the DNS server for service discovery.
846    #[must_use]
847    pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
848        self.dns_server = Some(dns);
849        self
850    }
851
852    /// Set the deployment name (used for hostname generation).
853    #[must_use]
854    pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
855        self.deployment_name = Some(name.into());
856        self
857    }
858
859    /// Set the job executor for run-to-completion workloads.
860    #[must_use]
861    pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
862        self.job_executor = Some(je);
863        self
864    }
865
866    /// Set the cron scheduler for time-based job triggers.
867    #[must_use]
868    pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
869        self.cron_scheduler = Some(cs);
870        self
871    }
872
873    /// Set the container supervisor for crash/panic policy enforcement.
874    #[must_use]
875    pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
876        self.container_supervisor = Some(cs);
877        self
878    }
879
880    /// Consume the builder and produce a fully-wired [`ServiceManager`].
881    ///
882    /// Logs warnings for missing recommended subsystems (proxy,
883    /// `stream_registry`, `container_supervisor`, `deployment_name`).
884    pub fn build(self) -> ServiceManager {
885        if self.proxy_manager.is_none() {
886            tracing::warn!("ServiceManager built without proxy_manager");
887        }
888        if self.stream_registry.is_none() {
889            tracing::warn!("ServiceManager built without stream_registry");
890        }
891        if self.container_supervisor.is_none() {
892            tracing::warn!("ServiceManager built without container_supervisor");
893        }
894        if self.deployment_name.is_none() {
895            tracing::warn!("ServiceManager built without deployment_name");
896        }
897
898        ServiceManager {
899            runtime: self.runtime,
900            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
901            scale_semaphore: Arc::new(Semaphore::new(10)),
902            overlay_manager: self.overlay_manager,
903            stream_registry: self.stream_registry,
904            proxy_manager: self.proxy_manager,
905            dns_server: self.dns_server,
906            deployment_name: self.deployment_name,
907            health_states: Arc::new(RwLock::new(HashMap::new())),
908            job_executor: self.job_executor,
909            cron_scheduler: self.cron_scheduler,
910            container_supervisor: self.container_supervisor,
911        }
912    }
913}
914
915impl ServiceManager {
916    /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
917    ///
918    /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
919    ///
920    /// # Example
921    ///
922    /// ```ignore
923    /// let manager = ServiceManager::builder(runtime)
924    ///     .overlay_manager(om)
925    ///     .proxy_manager(proxy)
926    ///     .build();
927    /// ```
928    pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
929        ServiceManagerBuilder::new(runtime)
930    }
931
932    /// Create a new service manager
933    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
934    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
935        Self {
936            runtime,
937            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
938            scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
939            overlay_manager: None,
940            stream_registry: None,
941            proxy_manager: None,
942            dns_server: None,
943            deployment_name: None,
944            health_states: Arc::new(RwLock::new(HashMap::new())),
945            job_executor: None,
946            cron_scheduler: None,
947            container_supervisor: None,
948        }
949    }
950
951    /// Create a service manager with overlay network support
952    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
953    pub fn with_overlay(
954        runtime: Arc<dyn Runtime + Send + Sync>,
955        overlay_manager: Arc<RwLock<OverlayManager>>,
956    ) -> Self {
957        Self {
958            runtime,
959            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
960            scale_semaphore: Arc::new(Semaphore::new(10)),
961            overlay_manager: Some(overlay_manager),
962            stream_registry: None,
963            proxy_manager: None,
964            dns_server: None,
965            deployment_name: None,
966            health_states: Arc::new(RwLock::new(HashMap::new())),
967            job_executor: None,
968            cron_scheduler: None,
969            container_supervisor: None,
970        }
971    }
972
973    /// Create a fully-configured service manager with overlay and proxy support
974    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
975    pub fn with_full_config(
976        runtime: Arc<dyn Runtime + Send + Sync>,
977        overlay_manager: Arc<RwLock<OverlayManager>>,
978        deployment_name: String,
979    ) -> Self {
980        Self {
981            runtime,
982            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
983            scale_semaphore: Arc::new(Semaphore::new(10)),
984            overlay_manager: Some(overlay_manager),
985            stream_registry: None,
986            proxy_manager: None,
987            dns_server: None,
988            deployment_name: Some(deployment_name),
989            health_states: Arc::new(RwLock::new(HashMap::new())),
990            job_executor: None,
991            cron_scheduler: None,
992            container_supervisor: None,
993        }
994    }
995
996    /// Get the health states map for external monitoring
997    pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
998        Arc::clone(&self.health_states)
999    }
1000
1001    /// Update health state for a service
1002    pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1003        let mut states = self.health_states.write().await;
1004        states.insert(service_name.to_string(), state);
1005    }
1006
1007    /// Set the deployment name (used for generating hostnames)
1008    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1009    pub fn set_deployment_name(&mut self, name: String) {
1010        self.deployment_name = Some(name);
1011    }
1012
1013    /// Set the stream registry for L4 proxy integration (TCP/UDP)
1014    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1015    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1016        self.stream_registry = Some(registry);
1017    }
1018
1019    /// Builder pattern: add stream registry for L4 proxy integration
1020    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1021    #[must_use]
1022    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1023        self.stream_registry = Some(registry);
1024        self
1025    }
1026
1027    /// Get the stream registry (if configured)
1028    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1029        self.stream_registry.as_ref()
1030    }
1031
1032    /// Set the overlay manager for container networking
1033    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1034    pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1035        self.overlay_manager = Some(manager);
1036    }
1037
1038    /// Set the proxy manager for health-aware load balancing
1039    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1040    pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1041        self.proxy_manager = Some(proxy);
1042    }
1043
1044    /// Builder pattern: add proxy manager for health-aware load balancing
1045    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1046    #[must_use]
1047    pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1048        self.proxy_manager = Some(proxy);
1049        self
1050    }
1051
1052    /// Get the proxy manager (if configured)
1053    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1054        self.proxy_manager.as_ref()
1055    }
1056
1057    /// Set the DNS server for service discovery
1058    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1059    pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1060        self.dns_server = Some(dns);
1061    }
1062
1063    /// Builder pattern: add DNS server for service discovery
1064    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1065    #[must_use]
1066    pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1067        self.dns_server = Some(dns);
1068        self
1069    }
1070
1071    /// Get the DNS server (if configured)
1072    pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1073        self.dns_server.as_ref()
1074    }
1075
1076    /// Set the job executor for run-to-completion workloads
1077    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1078    pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1079        self.job_executor = Some(executor);
1080    }
1081
1082    /// Set the cron scheduler for time-based job triggers
1083    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1084    pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1085        self.cron_scheduler = Some(scheduler);
1086    }
1087
1088    /// Builder pattern: add job executor
1089    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1090    #[must_use]
1091    pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1092        self.job_executor = Some(executor);
1093        self
1094    }
1095
1096    /// Builder pattern: add cron scheduler
1097    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1098    #[must_use]
1099    pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1100        self.cron_scheduler = Some(scheduler);
1101        self
1102    }
1103
1104    /// Get the job executor (if configured)
1105    pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1106        self.job_executor.as_ref()
1107    }
1108
1109    /// Get the cron scheduler (if configured)
1110    pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1111        self.cron_scheduler.as_ref()
1112    }
1113
1114    /// Set the container supervisor for crash/panic policy enforcement
1115    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1116    pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1117        self.container_supervisor = Some(supervisor);
1118    }
1119
1120    /// Builder pattern: add container supervisor
1121    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1122    #[must_use]
1123    pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1124        self.container_supervisor = Some(supervisor);
1125        self
1126    }
1127
1128    /// Get the container supervisor (if configured)
1129    pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1130        self.container_supervisor.as_ref()
1131    }
1132
1133    /// Start the container supervisor background task
1134    ///
1135    /// This spawns a background task that monitors containers for crashes
1136    /// and enforces the `on_panic` error policy.
1137    ///
1138    /// # Errors
1139    /// Returns an error if no container supervisor is configured.
1140    ///
1141    /// # Returns
1142    /// A `JoinHandle` for the supervisor task.
1143    pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1144        let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1145            AgentError::Configuration("Container supervisor not configured".to_string())
1146        })?;
1147
1148        let supervisor = Arc::clone(supervisor);
1149        Ok(tokio::spawn(async move {
1150            supervisor.run_loop().await;
1151        }))
1152    }
1153
1154    /// Shutdown the container supervisor
1155    pub fn shutdown_container_supervisor(&self) {
1156        if let Some(supervisor) = &self.container_supervisor {
1157            supervisor.shutdown();
1158        }
1159    }
1160
1161    /// Get the supervised state of a container
1162    pub async fn get_container_supervised_state(
1163        &self,
1164        container_id: &ContainerId,
1165    ) -> Option<SupervisedState> {
1166        if let Some(supervisor) = &self.container_supervisor {
1167            supervisor.get_state(container_id).await
1168        } else {
1169            None
1170        }
1171    }
1172
1173    /// Get supervisor events receiver
1174    ///
1175    /// Note: This can only be called once; the receiver is moved to the caller.
1176    pub async fn take_supervisor_events(
1177        &self,
1178    ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1179        if let Some(supervisor) = &self.container_supervisor {
1180            supervisor.take_event_receiver().await
1181        } else {
1182            None
1183        }
1184    }
1185
1186    // ==================== Dependency Orchestration ====================
1187
1188    /// Deploy multiple services respecting their dependency order
1189    ///
1190    /// This method:
1191    /// 1. Builds a dependency graph from the services
1192    /// 2. Validates no cycles exist
1193    /// 3. Computes topological order (services with no deps first)
1194    /// 4. For each service in order, waits for dependencies then starts the service
1195    ///
1196    /// # Arguments
1197    /// * `services` - Map of service name to service specification
1198    ///
1199    /// # Errors
1200    /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1201    /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1202    pub async fn deploy_with_dependencies(
1203        &self,
1204        services: HashMap<String, ServiceSpec>,
1205    ) -> Result<()> {
1206        if services.is_empty() {
1207            return Ok(());
1208        }
1209
1210        // Build dependency graph
1211        let graph = DependencyGraph::build(&services)?;
1212
1213        tracing::info!(
1214            service_count = services.len(),
1215            "Starting deployment with dependency ordering"
1216        );
1217
1218        // Get startup order
1219        let order = graph.startup_order();
1220        tracing::debug!(order = ?order, "Computed startup order");
1221
1222        // Start services in dependency order
1223        for service_name in order {
1224            let service_spec = services
1225                .get(service_name)
1226                .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1227
1228            // Wait for dependencies first
1229            if !service_spec.depends.is_empty() {
1230                tracing::info!(
1231                    service = %service_name,
1232                    dependency_count = service_spec.depends.len(),
1233                    "Waiting for dependencies"
1234                );
1235                self.wait_for_dependencies(service_name, &service_spec.depends)
1236                    .await?;
1237            }
1238
1239            // Register and start service
1240            tracing::info!(service = %service_name, "Starting service");
1241            Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
1242
1243            // Get the desired replica count from scale config
1244            let replicas = match &service_spec.scale {
1245                zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1246                zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1247                zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1248            };
1249            self.scale_service(service_name, replicas).await?;
1250
1251            // Mark service as started in health states (Unknown until health check runs)
1252            self.update_health_state(service_name, HealthState::Unknown)
1253                .await;
1254
1255            tracing::info!(
1256                service = %service_name,
1257                replicas = replicas,
1258                "Service started"
1259            );
1260        }
1261
1262        tracing::info!(service_count = services.len(), "Deployment complete");
1263
1264        Ok(())
1265    }
1266
1267    /// Wait for all dependencies of a service to be satisfied
1268    ///
1269    /// # Arguments
1270    /// * `service` - Name of the service waiting for dependencies
1271    /// * `deps` - Slice of dependency specifications
1272    ///
1273    /// # Errors
1274    /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1275    async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1276        let condition_checker = DependencyConditionChecker::new(
1277            Arc::clone(&self.runtime),
1278            Arc::clone(&self.health_states),
1279            None,
1280        );
1281
1282        let waiter = DependencyWaiter::new(condition_checker);
1283        let results = waiter.wait_for_all(deps).await?;
1284
1285        // Check results for failures
1286        for result in results {
1287            match result {
1288                WaitResult::TimedOutFail {
1289                    service: dep_service,
1290                    condition,
1291                    timeout,
1292                } => {
1293                    return Err(AgentError::DependencyTimeout {
1294                        service: service.to_string(),
1295                        dependency: dep_service,
1296                        condition: format!("{condition:?}"),
1297                        timeout,
1298                    });
1299                }
1300                WaitResult::TimedOutWarn {
1301                    service: dep_service,
1302                    condition,
1303                } => {
1304                    tracing::warn!(
1305                        service = %service,
1306                        dependency = %dep_service,
1307                        condition = ?condition,
1308                        "Dependency timed out but continuing"
1309                    );
1310                }
1311                WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1312                    // Continue silently
1313                }
1314            }
1315        }
1316
1317        Ok(())
1318    }
1319
1320    /// Check if all dependencies for a service are currently satisfied
1321    ///
1322    /// This is a one-shot check (no waiting). Useful for pre-flight validation.
1323    ///
1324    /// # Errors
1325    /// Returns an error if a dependency check fails unexpectedly.
1326    pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1327        let condition_checker = DependencyConditionChecker::new(
1328            Arc::clone(&self.runtime),
1329            Arc::clone(&self.health_states),
1330            None,
1331        );
1332
1333        for dep in deps {
1334            if !condition_checker.check(dep).await? {
1335                return Ok(false);
1336            }
1337        }
1338
1339        Ok(true)
1340    }
1341
1342    /// Add or update a workload (service, job, or cron)
1343    ///
1344    /// This method handles different resource types appropriately:
1345    /// - **Service**: Traditional long-running containers with scaling and health checks
1346    /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
1347    /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
1348    ///
1349    /// # Errors
1350    /// Returns an error if service creation, scaling, or cron registration fails.
1351    #[allow(clippy::too_many_lines)]
1352    pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1353        match spec.rtype {
1354            ResourceType::Service => {
1355                // Long-running service: create/update instance
1356                let mut services = self.services.write().await;
1357
1358                if let Some(instance) = services.get_mut(&name) {
1359                    // Update existing service. We need to:
1360                    //   1. Update the in-memory spec (so future scale-ups use the new image).
1361                    //   2. Honour the effective pull policy. For Never/IfNotPresent (after
1362                    //      effective resolution) we noop. For Always/Newer we pull, compare
1363                    //      digests, and trigger a rolling recreate when drift is observed.
1364                    instance.spec = spec.clone();
1365                    if let Some(dns) = &self.dns_server {
1366                        instance.set_dns_server(Arc::clone(dns));
1367                    }
1368
1369                    let effective = effective_pull_policy(&spec.image.name, spec.image.pull_policy);
1370                    let old_digest = instance.last_pulled_digest().await;
1371                    let current_replicas =
1372                        u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
1373                    drop(services); // Release write lock before pull / scale (which take their own locks).
1374
1375                    match effective {
1376                        PullPolicy::Never | PullPolicy::IfNotPresent => {
1377                            // No pull, no recreate. Drift is silently ignored when the
1378                            // user has explicitly opted into "do not refresh" semantics.
1379                            tracing::debug!(
1380                                service = %name,
1381                                policy = ?effective,
1382                                "service unchanged on re-deploy (effective pull policy skips refresh)"
1383                            );
1384                        }
1385                        PullPolicy::Always | PullPolicy::Newer => {
1386                            // Pull (this updates the cached digest as a side-effect).
1387                            // We need a read guard to keep the instance alive while
1388                            // calling its &self method.
1389                            let services_ro = self.services.read().await;
1390                            let new_digest = if let Some(inst) = services_ro.get(&name) {
1391                                inst.pull_and_refresh_digest().await?
1392                            } else {
1393                                // The service vanished between our write-lock release
1394                                // and read-lock acquisition (race with remove_service).
1395                                // Treat this as a no-op; the caller will see the removal.
1396                                tracing::warn!(
1397                                    service = %name,
1398                                    "service removed during upsert; skipping drift recreate"
1399                                );
1400                                return Ok(());
1401                            };
1402                            drop(services_ro);
1403
1404                            // Decide whether to recreate. Always forces a recreate.
1405                            // Newer recreates only when the digest actually changed.
1406                            // When digests are unknown (runtime doesn't expose them),
1407                            // we can't observe drift safely under Newer, so no-op.
1408                            let should_recreate = match effective {
1409                                PullPolicy::Always => true,
1410                                PullPolicy::Newer => match (&old_digest, &new_digest) {
1411                                    (Some(old), Some(new)) => old != new,
1412                                    _ => false,
1413                                },
1414                                _ => false,
1415                            };
1416
1417                            if should_recreate && current_replicas > 0 {
1418                                tracing::info!(
1419                                    service = %name,
1420                                    policy = ?effective,
1421                                    old_digest = ?old_digest,
1422                                    new_digest = ?new_digest,
1423                                    replicas = current_replicas,
1424                                    "image drift detected; performing rolling recreate"
1425                                );
1426                                self.scale_service(&name, 0).await?;
1427                                self.scale_service(&name, current_replicas).await?;
1428                                tracing::info!(
1429                                    service = %name,
1430                                    new_digest = ?new_digest,
1431                                    "service recreated with refreshed image"
1432                                );
1433                            } else {
1434                                tracing::debug!(
1435                                    service = %name,
1436                                    policy = ?effective,
1437                                    old_digest = ?old_digest,
1438                                    new_digest = ?new_digest,
1439                                    "service up to date; no recreate required"
1440                                );
1441                            }
1442                        }
1443                    }
1444                    return Ok(());
1445                }
1446                // Create new service with proxy manager for health-aware load balancing
1447                let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1448                let mut instance = if let Some(proxy) = &self.proxy_manager {
1449                    ServiceInstance::with_proxy(
1450                        name.clone(),
1451                        spec,
1452                        self.runtime.clone(),
1453                        overlay,
1454                        Arc::clone(proxy),
1455                    )
1456                } else {
1457                    ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1458                };
1459                // Set DNS server if configured
1460                if let Some(dns) = &self.dns_server {
1461                    instance.set_dns_server(Arc::clone(dns));
1462                }
1463                // Wire shared health states so callbacks bridge back to ServiceManager
1464                instance.set_health_states(Arc::clone(&self.health_states));
1465                // Register HTTP routes via proxy manager
1466                if let Some(proxy) = &self.proxy_manager {
1467                    proxy.add_service(&name, &instance.spec).await;
1468                }
1469                // Register TCP/UDP endpoints in stream registry
1470                if let Some(stream_registry) = &self.stream_registry {
1471                    for endpoint in &instance.spec.endpoints {
1472                        let svc = StreamService::new(
1473                            name.clone(),
1474                            Vec::new(), // No backends yet; added on scale-up
1475                        );
1476                        match endpoint.protocol {
1477                            Protocol::Tcp => {
1478                                stream_registry.register_tcp(endpoint.port, svc);
1479                                tracing::debug!(
1480                                    service = %name,
1481                                    port = endpoint.port,
1482                                    "Registered TCP stream route"
1483                                );
1484                            }
1485                            Protocol::Udp => {
1486                                stream_registry.register_udp(endpoint.port, svc);
1487                                tracing::debug!(
1488                                    service = %name,
1489                                    port = endpoint.port,
1490                                    "Registered UDP stream route"
1491                                );
1492                            }
1493                            _ => {} // HTTP routes handled by proxy manager
1494                        }
1495                    }
1496                }
1497                services.insert(name, instance);
1498            }
1499            ResourceType::Job => {
1500                // Job: Just store the spec for later triggering
1501                // Jobs don't start containers immediately; they're triggered on-demand
1502                if let Some(executor) = &self.job_executor {
1503                    executor.register_job(&name, spec).await;
1504                    tracing::info!(job = %name, "Registered job spec");
1505                } else {
1506                    tracing::warn!(
1507                        job = %name,
1508                        "Job executor not configured, storing as service for reference"
1509                    );
1510                    // Fallback: store as service instance for reference
1511                    let mut services = self.services.write().await;
1512                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1513                    let mut instance = if let Some(proxy) = &self.proxy_manager {
1514                        ServiceInstance::with_proxy(
1515                            name.clone(),
1516                            spec,
1517                            self.runtime.clone(),
1518                            overlay,
1519                            Arc::clone(proxy),
1520                        )
1521                    } else {
1522                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1523                    };
1524                    // Set DNS server if configured
1525                    if let Some(dns) = &self.dns_server {
1526                        instance.set_dns_server(Arc::clone(dns));
1527                    }
1528                    services.insert(name, instance);
1529                }
1530            }
1531            ResourceType::Cron => {
1532                // Cron: Register with the cron scheduler
1533                if let Some(scheduler) = &self.cron_scheduler {
1534                    scheduler.register(&name, &spec).await?;
1535                    tracing::info!(cron = %name, "Registered cron job with scheduler");
1536                } else {
1537                    return Err(AgentError::Configuration(format!(
1538                        "Cron scheduler not configured for cron job '{name}'"
1539                    )));
1540                }
1541            }
1542        }
1543
1544        Ok(())
1545    }
1546
1547    /// Update backend addresses via `ProxyManager` after scaling
1548    async fn update_proxy_backends(&self, service_name: &str, addrs: Vec<SocketAddr>) {
1549        if let Some(proxy) = &self.proxy_manager {
1550            proxy.update_backends(service_name, addrs).await;
1551        }
1552    }
1553
1554    /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
1555    ///
1556    /// For containers with a port override (macOS sandbox), the addresses already
1557    /// carry the runtime-assigned port. In that case, the container listens on the
1558    /// override port for all traffic, so we use the address port directly. For
1559    /// containers without a port override (Linux, VMs), we reconstruct addresses
1560    /// using the endpoint's declared port, since each container has its own IP
1561    /// and can bind any port independently.
1562    fn update_stream_backends(&self, spec: &ServiceSpec, addrs: &[SocketAddr]) {
1563        let Some(stream_registry) = &self.stream_registry else {
1564            return;
1565        };
1566
1567        // Determine if any addresses have a port override by checking whether
1568        // all addresses use the same port as the primary spec endpoint. If not,
1569        // they carry per-container port overrides and should be used as-is.
1570        let primary_spec_port = spec
1571            .endpoints
1572            .iter()
1573            .find(|ep| {
1574                matches!(
1575                    ep.protocol,
1576                    Protocol::Http | Protocol::Https | Protocol::Websocket
1577                )
1578            })
1579            .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1580
1581        let has_port_overrides = addrs.iter().any(|addr| addr.port() != primary_spec_port);
1582
1583        for endpoint in &spec.endpoints {
1584            match endpoint.protocol {
1585                Protocol::Tcp => {
1586                    let tcp_backends: Vec<SocketAddr> = if has_port_overrides {
1587                        // Port overrides active (macOS sandbox): the container listens
1588                        // on its assigned port for all traffic. Use addresses as-is.
1589                        addrs.to_vec()
1590                    } else {
1591                        // Normal case: each container has its own IP, construct
1592                        // addresses using the TCP endpoint's container target port.
1593                        addrs
1594                            .iter()
1595                            .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1596                            .collect()
1597                    };
1598
1599                    stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
1600
1601                    tracing::debug!(
1602                        endpoint = %endpoint.name,
1603                        port = endpoint.port,
1604                        backend_count = addrs.len(),
1605                        "Updated TCP stream backends"
1606                    );
1607                }
1608                Protocol::Udp => {
1609                    let udp_backends: Vec<SocketAddr> = if has_port_overrides {
1610                        addrs.to_vec()
1611                    } else {
1612                        addrs
1613                            .iter()
1614                            .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1615                            .collect()
1616                    };
1617
1618                    stream_registry.update_udp_backends(endpoint.port, udp_backends);
1619
1620                    tracing::debug!(
1621                        endpoint = %endpoint.name,
1622                        port = endpoint.port,
1623                        backend_count = addrs.len(),
1624                        "Updated UDP stream backends"
1625                    );
1626                }
1627                _ => {} // HTTP endpoints handled by update_proxy_backends
1628            }
1629        }
1630    }
1631
1632    /// Scale a service to desired replica count
1633    ///
1634    /// # Errors
1635    /// Returns an error if the service is not found or scaling fails.
1636    #[allow(clippy::cast_possible_truncation)]
1637    pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
1638        let _permit = self.scale_semaphore.acquire().await;
1639
1640        let services = self.services.read().await;
1641        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1642            container: name.to_string(),
1643            reason: "service not found".to_string(),
1644        })?;
1645
1646        // Get current replica count before scaling
1647        let current_replicas = instance.replica_count().await as u32;
1648
1649        // Perform the scaling operation
1650        instance.scale_to(replicas).await?;
1651
1652        // After scaling, update proxy backends with new container addresses
1653        // Note: In a real implementation, we would get actual container IPs
1654        // from the overlay network or container runtime. For now, we construct
1655        // backend addresses based on the endpoint port and localhost (for same-node).
1656        // TODO: Get actual container addresses from overlay_manager or runtime
1657        let addrs = self.collect_backend_addrs(instance, replicas).await;
1658
1659        // Update HTTP backends via ProxyManager
1660        if self.proxy_manager.is_some() && !addrs.is_empty() {
1661            self.update_proxy_backends(name, addrs.clone()).await;
1662        }
1663
1664        // Update TCP/UDP backends in StreamRegistry
1665        if self.stream_registry.is_some() {
1666            self.update_stream_backends(&instance.spec, &addrs);
1667        }
1668
1669        // Register new containers with supervisor for crash monitoring
1670        if let Some(supervisor) = &self.container_supervisor {
1671            // For scale-up, register new containers
1672            if replicas > current_replicas {
1673                for i in current_replicas..replicas {
1674                    let container_id = ContainerId {
1675                        service: name.to_string(),
1676                        replica: i + 1,
1677                    };
1678                    supervisor.supervise(&container_id, &instance.spec).await;
1679                }
1680            }
1681            // For scale-down, unregister removed containers
1682            if replicas < current_replicas {
1683                for i in replicas..current_replicas {
1684                    let container_id = ContainerId {
1685                        service: name.to_string(),
1686                        replica: i + 1,
1687                    };
1688                    supervisor.unsupervise(&container_id).await;
1689                }
1690            }
1691        }
1692
1693        Ok(())
1694    }
1695
1696    /// Collect backend addresses for a service's containers
1697    ///
1698    /// This queries the service instance's containers for their overlay network
1699    /// IP addresses and constructs backend addresses using those IPs with the
1700    /// service's endpoint port.
1701    ///
1702    /// If a container has a `port_override` (e.g., macOS sandbox where all
1703    /// containers share the host network), that port is used instead of the
1704    /// spec-declared endpoint port. This allows multiple replicas on the same
1705    /// IP (`127.0.0.1`) to be distinguished by port.
1706    async fn collect_backend_addrs(
1707        &self,
1708        instance: &ServiceInstance,
1709        _replicas: u32, // No longer needed - we iterate containers directly
1710    ) -> Vec<SocketAddr> {
1711        let mut addrs = Vec::new();
1712
1713        // Get the primary container target port (first HTTP endpoint) as the default
1714        let spec_port = instance
1715            .spec
1716            .endpoints
1717            .iter()
1718            .find(|ep| {
1719                matches!(
1720                    ep.protocol,
1721                    Protocol::Http | Protocol::Https | Protocol::Websocket
1722                )
1723            })
1724            .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1725
1726        // Collect backend addresses from containers with overlay IPs
1727        let containers = instance.containers().read().await;
1728
1729        for container in containers.values() {
1730            if let Some(ip) = container.overlay_ip {
1731                // Use the runtime-assigned port override if present (macOS sandbox),
1732                // otherwise fall back to the spec-declared endpoint port.
1733                let port = container.port_override.unwrap_or(spec_port);
1734                addrs.push(SocketAddr::new(ip, port));
1735            }
1736        }
1737
1738        // If no overlay IPs available, this might be Docker runtime or failed attachments
1739        // Log a warning but don't fallback to localhost in production
1740        if addrs.is_empty() && !containers.is_empty() {
1741            tracing::warn!(
1742                service = %instance.service_name,
1743                container_count = containers.len(),
1744                "no overlay IPs available for backends - containers may not be reachable via proxy"
1745            );
1746        }
1747
1748        addrs
1749    }
1750
1751    /// Get service replica count
1752    ///
1753    /// # Errors
1754    /// Returns an error if the service is not found.
1755    pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
1756        let services = self.services.read().await;
1757        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1758            container: name.to_string(),
1759            reason: "service not found".to_string(),
1760        })?;
1761
1762        Ok(instance.replica_count().await)
1763    }
1764
1765    /// Remove a workload (service, job, or cron)
1766    ///
1767    /// This method handles cleanup for different resource types:
1768    /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
1769    /// - **Job**: Unregisters from job executor
1770    /// - **Cron**: Unregisters from cron scheduler
1771    ///
1772    /// # Errors
1773    /// Returns an error if the service cannot be removed or scale-down fails.
1774    pub async fn remove_service(&self, name: &str) -> Result<()> {
1775        // Try to unregister from cron scheduler first
1776        if let Some(scheduler) = &self.cron_scheduler {
1777            scheduler.unregister(name).await;
1778        }
1779
1780        // Try to unregister from job executor
1781        if let Some(executor) = &self.job_executor {
1782            executor.unregister_job(name).await;
1783        }
1784
1785        // Unregister stream routes (TCP/UDP) from the stream registry
1786        if let Some(stream_registry) = &self.stream_registry {
1787            // Need to get the service spec to know which ports to unregister
1788            let services = self.services.read().await;
1789            if let Some(instance) = services.get(name) {
1790                for endpoint in &instance.spec.endpoints {
1791                    match endpoint.protocol {
1792                        Protocol::Tcp => {
1793                            let _ = stream_registry.unregister_tcp(endpoint.port);
1794                            tracing::debug!(
1795                                service = %name,
1796                                port = endpoint.port,
1797                                "Unregistered TCP stream route"
1798                            );
1799                        }
1800                        Protocol::Udp => {
1801                            let _ = stream_registry.unregister_udp(endpoint.port);
1802                            tracing::debug!(
1803                                service = %name,
1804                                port = endpoint.port,
1805                                "Unregistered UDP stream route"
1806                            );
1807                        }
1808                        _ => {} // HTTP routes handled above
1809                    }
1810                }
1811            }
1812            drop(services); // Release read lock
1813        }
1814
1815        // Unregister containers from the supervisor
1816        if let Some(supervisor) = &self.container_supervisor {
1817            let containers = self.get_service_containers(name).await;
1818            for container_id in containers {
1819                supervisor.unsupervise(&container_id).await;
1820            }
1821            tracing::debug!(service = %name, "Unregistered containers from supervisor");
1822        }
1823
1824        // Clean up DNS records for the service
1825        if let Some(dns) = &self.dns_server {
1826            // Remove the service-level DNS entry
1827            let service_hostname = format!("{name}.service.local");
1828            if let Err(e) = dns.remove_record(&service_hostname).await {
1829                tracing::warn!(
1830                    hostname = %service_hostname,
1831                    error = %e,
1832                    "failed to remove service DNS record"
1833                );
1834            } else {
1835                tracing::debug!(
1836                    hostname = %service_hostname,
1837                    "removed service DNS record"
1838                );
1839            }
1840
1841            // Also remove any remaining replica-specific DNS entries
1842            let services = self.services.read().await;
1843            if let Some(instance) = services.get(name) {
1844                let containers = instance.containers().read().await;
1845                for (id, _) in containers.iter() {
1846                    let replica_hostname = format!("{}.{}.service.local", id.replica, name);
1847                    if let Err(e) = dns.remove_record(&replica_hostname).await {
1848                        tracing::warn!(
1849                            hostname = %replica_hostname,
1850                            error = %e,
1851                            "failed to remove replica DNS record during service removal"
1852                        );
1853                    }
1854                }
1855            }
1856            drop(services); // Release read lock before write lock
1857        }
1858
1859        // Remove from services map (may or may not exist depending on rtype)
1860        let mut services = self.services.write().await;
1861        if services.remove(name).is_some() {
1862            tracing::debug!(service = %name, "Removed service from manager");
1863        }
1864
1865        Ok(())
1866    }
1867
1868    /// Introspect service infrastructure wiring.
1869    /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
1870    pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
1871        let services = self.services.read().await;
1872        services.get(name).map(|i| {
1873            (
1874                i.has_overlay_manager(),
1875                i.has_proxy_manager(),
1876                i.has_dns_server(),
1877            )
1878        })
1879    }
1880
1881    /// List all services
1882    pub async fn list_services(&self) -> Vec<String> {
1883        self.services.read().await.keys().cloned().collect()
1884    }
1885
1886    /// Get logs for a service, aggregated from all container replicas.
1887    ///
1888    /// # Arguments
1889    /// * `service_name` - Name of the service to fetch logs for
1890    /// * `tail` - Number of lines to return per container (0 = all)
1891    /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
1892    ///
1893    /// # Errors
1894    /// Returns an error if the service or instance is not found.
1895    ///
1896    /// # Returns
1897    /// Structured log entries from all (or specific) container replicas. Each
1898    /// entry has its `service` and `deployment` fields populated when available.
1899    pub async fn get_service_logs(
1900        &self,
1901        service_name: &str,
1902        tail: usize,
1903        instance: Option<&str>,
1904    ) -> Result<Vec<LogEntry>> {
1905        let container_ids = self.get_service_containers(service_name).await;
1906
1907        if container_ids.is_empty() {
1908            return Err(AgentError::NotFound {
1909                container: service_name.to_string(),
1910                reason: "no containers found for service".to_string(),
1911            });
1912        }
1913
1914        // If a specific instance is requested, filter to just that one
1915        let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
1916            if let Ok(replica_num) = inst.parse::<u32>() {
1917                container_ids
1918                    .iter()
1919                    .filter(|id| id.replica == replica_num)
1920                    .collect()
1921            } else {
1922                // Try matching by full container ID string suffix
1923                container_ids
1924                    .iter()
1925                    .filter(|id| id.to_string().contains(inst))
1926                    .collect()
1927            }
1928        } else {
1929            container_ids.iter().collect()
1930        };
1931
1932        if target_ids.is_empty() {
1933            return Err(AgentError::NotFound {
1934                container: format!("{}/{}", service_name, instance.unwrap_or("?")),
1935                reason: "instance not found".to_string(),
1936            });
1937        }
1938
1939        let mut all_entries: Vec<LogEntry> = Vec::new();
1940
1941        for id in &target_ids {
1942            match self.runtime.container_logs(id, tail).await {
1943                Ok(mut entries) => {
1944                    // Populate service and deployment metadata on each entry
1945                    for entry in &mut entries {
1946                        if entry.service.is_none() {
1947                            entry.service = Some(service_name.to_string());
1948                        }
1949                        if entry.deployment.is_none() {
1950                            entry.deployment.clone_from(&self.deployment_name);
1951                        }
1952                    }
1953                    all_entries.extend(entries);
1954                }
1955                Err(e) => {
1956                    tracing::warn!(
1957                        service = service_name,
1958                        container = %id,
1959                        error = %e,
1960                        "Failed to read container logs"
1961                    );
1962                }
1963            }
1964        }
1965
1966        Ok(all_entries)
1967    }
1968
1969    /// Get all container IDs for a specific service
1970    ///
1971    /// Returns an empty vector if the service doesn't exist.
1972    ///
1973    /// # Arguments
1974    /// * `service_name` - Name of the service to query
1975    ///
1976    /// # Returns
1977    /// Vector of `ContainerIds` for all replicas of the service
1978    pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
1979        let services = self.services.read().await;
1980        if let Some(instance) = services.get(service_name) {
1981            instance.container_ids().await
1982        } else {
1983            Vec::new()
1984        }
1985    }
1986
1987    /// Execute a command inside a running container for a service
1988    ///
1989    /// Picks a specific replica if provided, otherwise uses the first available container.
1990    ///
1991    /// # Arguments
1992    /// * `service_name` - Name of the service
1993    /// * `replica` - Optional replica number to target
1994    /// * `cmd` - Command and arguments to execute
1995    ///
1996    /// # Errors
1997    /// Returns an error if the service or replica is not found, or if exec fails.
1998    ///
1999    /// # Panics
2000    /// Panics if no replica is specified and the container list is unexpectedly empty
2001    /// after the emptiness check (should not happen in practice).
2002    ///
2003    /// # Returns
2004    /// Tuple of (`exit_code`, stdout, stderr)
2005    pub async fn exec_in_container(
2006        &self,
2007        service_name: &str,
2008        replica: Option<u32>,
2009        cmd: &[String],
2010    ) -> Result<(i32, String, String)> {
2011        let container_ids = self.get_service_containers(service_name).await;
2012
2013        if container_ids.is_empty() {
2014            return Err(AgentError::NotFound {
2015                container: service_name.to_string(),
2016                reason: "no containers found for service".to_string(),
2017            });
2018        }
2019
2020        // Pick the target container
2021        let target = if let Some(rep) = replica {
2022            container_ids
2023                .into_iter()
2024                .find(|cid| cid.replica == rep)
2025                .ok_or_else(|| AgentError::NotFound {
2026                    container: format!("{service_name}-rep-{rep}"),
2027                    reason: format!("replica {rep} not found for service"),
2028                })?
2029        } else {
2030            // Use the first container (lowest replica number)
2031            container_ids.into_iter().next().unwrap()
2032        };
2033
2034        self.runtime.exec(&target, cmd).await
2035    }
2036
2037    // ==================== Job Management ====================
2038
2039    /// Trigger a job execution
2040    ///
2041    /// # Arguments
2042    /// * `name` - Name of the registered job
2043    /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
2044    ///
2045    /// # Returns
2046    /// The execution ID for tracking the job
2047    ///
2048    /// # Errors
2049    /// - Returns error if job executor is not configured
2050    /// - Returns error if the job is not registered
2051    pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
2052        let executor = self
2053            .job_executor
2054            .as_ref()
2055            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2056
2057        let spec = executor
2058            .get_job_spec(name)
2059            .await
2060            .ok_or_else(|| AgentError::NotFound {
2061                container: name.to_string(),
2062                reason: "job not registered".to_string(),
2063            })?;
2064
2065        executor.trigger(name, &spec, trigger).await
2066    }
2067
2068    /// Get the status of a job execution
2069    ///
2070    /// # Arguments
2071    /// * `id` - The execution ID returned from `trigger_job`
2072    ///
2073    /// # Returns
2074    /// The job execution details, or None if not found
2075    pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
2076        if let Some(executor) = &self.job_executor {
2077            executor.get_execution(id).await
2078        } else {
2079            None
2080        }
2081    }
2082
2083    /// List all executions for a specific job
2084    ///
2085    /// # Arguments
2086    /// * `name` - Name of the job
2087    ///
2088    /// # Returns
2089    /// Vector of job executions for the specified job
2090    pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
2091        if let Some(executor) = &self.job_executor {
2092            executor.list_executions(name).await
2093        } else {
2094            Vec::new()
2095        }
2096    }
2097
2098    /// Cancel a running job execution
2099    ///
2100    /// # Arguments
2101    /// * `id` - The execution ID to cancel
2102    ///
2103    /// # Errors
2104    /// Returns error if job executor is not configured or if cancellation fails
2105    pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
2106        let executor = self
2107            .job_executor
2108            .as_ref()
2109            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2110
2111        executor.cancel(id).await
2112    }
2113
2114    // ==================== Cron Management ====================
2115
2116    /// Manually trigger a cron job (outside of its schedule)
2117    ///
2118    /// # Arguments
2119    /// * `name` - Name of the cron job
2120    ///
2121    /// # Returns
2122    /// The execution ID for tracking the triggered job
2123    ///
2124    /// # Errors
2125    /// Returns error if cron scheduler is not configured or job not found
2126    pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
2127        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2128            AgentError::Configuration("Cron scheduler not configured".to_string())
2129        })?;
2130
2131        scheduler.trigger_now(name).await
2132    }
2133
2134    /// Enable or disable a cron job
2135    ///
2136    /// # Arguments
2137    /// * `name` - Name of the cron job
2138    /// * `enabled` - Whether to enable or disable the job
2139    pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
2140        if let Some(scheduler) = &self.cron_scheduler {
2141            scheduler.set_enabled(name, enabled).await;
2142        }
2143    }
2144
2145    /// List all registered cron jobs
2146    pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2147        if let Some(scheduler) = &self.cron_scheduler {
2148            scheduler.list_jobs().await
2149        } else {
2150            Vec::new()
2151        }
2152    }
2153
2154    /// Start the cron scheduler background task
2155    ///
2156    /// This spawns a background task that checks for due cron jobs every second.
2157    /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
2158    ///
2159    /// # Errors
2160    /// Returns error if cron scheduler is not configured
2161    pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2162        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2163            AgentError::Configuration("Cron scheduler not configured".to_string())
2164        })?;
2165
2166        let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2167        Ok(tokio::spawn(async move {
2168            scheduler.run_loop().await;
2169        }))
2170    }
2171
2172    /// Shutdown the cron scheduler
2173    pub fn shutdown_cron(&self) {
2174        if let Some(scheduler) = &self.cron_scheduler {
2175            scheduler.shutdown();
2176        }
2177    }
2178}
2179
2180#[cfg(test)]
2181#[allow(deprecated)]
2182mod tests {
2183    use super::*;
2184    use crate::runtime::MockRuntime;
2185
2186    #[tokio::test]
2187    async fn test_service_manager() {
2188        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2189        let manager = ServiceManager::new(runtime);
2190
2191        // Add service
2192        let spec = mock_spec();
2193        Box::pin(manager.upsert_service("test".to_string(), spec))
2194            .await
2195            .unwrap();
2196
2197        // Scale up
2198        manager.scale_service("test", 3).await.unwrap();
2199
2200        // Check count
2201        let count = manager.service_replica_count("test").await.unwrap();
2202        assert_eq!(count, 3);
2203
2204        // List services
2205        let services = manager.list_services().await;
2206        assert_eq!(services, vec!["test".to_string()]);
2207    }
2208
2209    #[tokio::test]
2210    async fn test_service_manager_basic_lifecycle() {
2211        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2212        let manager = ServiceManager::new(runtime);
2213
2214        // Add service with HTTP endpoint
2215        let spec = mock_spec();
2216        Box::pin(manager.upsert_service("api".to_string(), spec))
2217            .await
2218            .unwrap();
2219
2220        // Scale up
2221        manager.scale_service("api", 2).await.unwrap();
2222
2223        // Check count
2224        let count = manager.service_replica_count("api").await.unwrap();
2225        assert_eq!(count, 2);
2226
2227        // Remove service
2228        manager.remove_service("api").await.unwrap();
2229
2230        // Verify service is gone
2231        let services = manager.list_services().await;
2232        assert!(!services.contains(&"api".to_string()));
2233    }
2234
2235    #[tokio::test]
2236    async fn test_service_manager_with_full_config() {
2237        use tokio::sync::RwLock;
2238
2239        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2240
2241        // Create a mock overlay manager (skip actual network setup)
2242        let overlay_manager = Arc::new(RwLock::new(
2243            OverlayManager::new("test-deployment".to_string())
2244                .await
2245                .unwrap(),
2246        ));
2247
2248        let manager =
2249            ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2250
2251        // Add service
2252        let spec = mock_spec();
2253        Box::pin(manager.upsert_service("web".to_string(), spec))
2254            .await
2255            .unwrap();
2256
2257        // Verify service is registered
2258        let services = manager.list_services().await;
2259        assert!(services.contains(&"web".to_string()));
2260    }
2261
2262    fn mock_spec() -> ServiceSpec {
2263        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2264            r"
2265version: v1
2266deployment: test
2267services:
2268  test:
2269    rtype: service
2270    image:
2271      name: test:latest
2272    endpoints:
2273      - name: http
2274        protocol: http
2275        port: 8080
2276    scale:
2277      mode: fixed
2278      replicas: 1
2279",
2280        )
2281        .unwrap()
2282        .services
2283        .remove("test")
2284        .unwrap()
2285    }
2286
2287    /// Helper to create a `ServiceSpec` with dependencies
2288    fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
2289        let mut spec = mock_spec();
2290        spec.depends = deps;
2291        spec
2292    }
2293
2294    /// Helper to create a `DependsSpec`
2295    fn dep(
2296        service: &str,
2297        condition: zlayer_spec::DependencyCondition,
2298        timeout_ms: u64,
2299        on_timeout: zlayer_spec::TimeoutAction,
2300    ) -> DependsSpec {
2301        DependsSpec {
2302            service: service.to_string(),
2303            condition,
2304            timeout: Some(Duration::from_millis(timeout_ms)),
2305            on_timeout,
2306        }
2307    }
2308
2309    #[tokio::test]
2310    async fn test_deploy_with_dependencies_no_deps() {
2311        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2312        let manager = ServiceManager::new(runtime);
2313
2314        // Services with no dependencies
2315        let mut services = HashMap::new();
2316        services.insert("a".to_string(), mock_spec());
2317        services.insert("b".to_string(), mock_spec());
2318
2319        // Should deploy both without issue
2320        Box::pin(manager.deploy_with_dependencies(services))
2321            .await
2322            .unwrap();
2323
2324        // Both services should be registered
2325        let service_list = manager.list_services().await;
2326        assert_eq!(service_list.len(), 2);
2327    }
2328
2329    #[tokio::test]
2330    async fn test_deploy_with_dependencies_linear() {
2331        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2332        let manager = ServiceManager::new(runtime);
2333
2334        // A -> B -> C (A depends on B, B depends on C)
2335        // All use "started" condition which is satisfied when container is running
2336        let mut services = HashMap::new();
2337        services.insert("c".to_string(), mock_spec());
2338        services.insert(
2339            "b".to_string(),
2340            mock_spec_with_deps(vec![dep(
2341                "c",
2342                zlayer_spec::DependencyCondition::Started,
2343                5000,
2344                zlayer_spec::TimeoutAction::Fail,
2345            )]),
2346        );
2347        services.insert(
2348            "a".to_string(),
2349            mock_spec_with_deps(vec![dep(
2350                "b",
2351                zlayer_spec::DependencyCondition::Started,
2352                5000,
2353                zlayer_spec::TimeoutAction::Fail,
2354            )]),
2355        );
2356
2357        // Should deploy in order: c, b, a
2358        Box::pin(manager.deploy_with_dependencies(services))
2359            .await
2360            .unwrap();
2361
2362        // All services should be registered
2363        let service_list = manager.list_services().await;
2364        assert_eq!(service_list.len(), 3);
2365    }
2366
2367    #[tokio::test]
2368    async fn test_deploy_with_dependencies_cycle_detection() {
2369        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2370        let manager = ServiceManager::new(runtime);
2371
2372        // A -> B -> A (cycle)
2373        let mut services = HashMap::new();
2374        services.insert(
2375            "a".to_string(),
2376            mock_spec_with_deps(vec![dep(
2377                "b",
2378                zlayer_spec::DependencyCondition::Started,
2379                5000,
2380                zlayer_spec::TimeoutAction::Fail,
2381            )]),
2382        );
2383        services.insert(
2384            "b".to_string(),
2385            mock_spec_with_deps(vec![dep(
2386                "a",
2387                zlayer_spec::DependencyCondition::Started,
2388                5000,
2389                zlayer_spec::TimeoutAction::Fail,
2390            )]),
2391        );
2392
2393        // Should fail with cycle detection
2394        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
2395        assert!(result.is_err());
2396        let err = result.unwrap_err().to_string();
2397        assert!(err.contains("Cyclic dependency"));
2398    }
2399
2400    #[tokio::test]
2401    async fn test_deploy_with_dependencies_timeout_continue() {
2402        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2403        let manager = ServiceManager::new(runtime);
2404
2405        // A depends on B (healthy), but B never becomes healthy
2406        // Using continue action, so it should proceed anyway
2407        let mut services = HashMap::new();
2408        services.insert("b".to_string(), mock_spec());
2409        services.insert(
2410            "a".to_string(),
2411            mock_spec_with_deps(vec![dep(
2412                "b",
2413                zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
2414                100,                                       // Short timeout
2415                zlayer_spec::TimeoutAction::Continue,      // But continue anyway
2416            )]),
2417        );
2418
2419        // Should deploy both despite timeout
2420        Box::pin(manager.deploy_with_dependencies(services))
2421            .await
2422            .unwrap();
2423
2424        let service_list = manager.list_services().await;
2425        assert_eq!(service_list.len(), 2);
2426    }
2427
2428    #[tokio::test]
2429    async fn test_deploy_with_dependencies_timeout_warn() {
2430        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2431        let manager = ServiceManager::new(runtime);
2432
2433        // A depends on B (healthy), but B never becomes healthy
2434        // Using warn action, so it should proceed with a warning
2435        let mut services = HashMap::new();
2436        services.insert("b".to_string(), mock_spec());
2437        services.insert(
2438            "a".to_string(),
2439            mock_spec_with_deps(vec![dep(
2440                "b",
2441                zlayer_spec::DependencyCondition::Healthy,
2442                100,
2443                zlayer_spec::TimeoutAction::Warn,
2444            )]),
2445        );
2446
2447        // Should deploy both despite timeout (with warning)
2448        Box::pin(manager.deploy_with_dependencies(services))
2449            .await
2450            .unwrap();
2451
2452        let service_list = manager.list_services().await;
2453        assert_eq!(service_list.len(), 2);
2454    }
2455
2456    #[tokio::test]
2457    async fn test_deploy_with_dependencies_timeout_fail() {
2458        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2459        let manager = ServiceManager::new(runtime);
2460
2461        // A depends on B (healthy), but B never becomes healthy
2462        // Using fail action, so deployment should fail
2463        let mut services = HashMap::new();
2464        services.insert("b".to_string(), mock_spec());
2465        services.insert(
2466            "a".to_string(),
2467            mock_spec_with_deps(vec![dep(
2468                "b",
2469                zlayer_spec::DependencyCondition::Healthy,
2470                100,
2471                zlayer_spec::TimeoutAction::Fail,
2472            )]),
2473        );
2474
2475        // Should fail after B is started but doesn't become healthy
2476        let result = Box::pin(manager.deploy_with_dependencies(services)).await;
2477        assert!(result.is_err());
2478
2479        // B should be started (it has no deps), but A should fail
2480        let err = result.unwrap_err().to_string();
2481        assert!(err.contains("Dependency timeout"));
2482    }
2483
2484    #[tokio::test]
2485    async fn test_check_dependencies_all_satisfied() {
2486        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2487        let manager = ServiceManager::new(runtime);
2488
2489        // Mark a service as healthy
2490        manager
2491            .update_health_state("db", HealthState::Healthy)
2492            .await;
2493
2494        let deps = vec![DependsSpec {
2495            service: "db".to_string(),
2496            condition: zlayer_spec::DependencyCondition::Healthy,
2497            timeout: Some(Duration::from_secs(60)),
2498            on_timeout: zlayer_spec::TimeoutAction::Fail,
2499        }];
2500
2501        let satisfied = manager.check_dependencies(&deps).await.unwrap();
2502        assert!(satisfied);
2503    }
2504
2505    #[tokio::test]
2506    async fn test_check_dependencies_not_satisfied() {
2507        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2508        let manager = ServiceManager::new(runtime);
2509
2510        // Service not healthy (no state set = Unknown)
2511        let deps = vec![DependsSpec {
2512            service: "db".to_string(),
2513            condition: zlayer_spec::DependencyCondition::Healthy,
2514            timeout: Some(Duration::from_secs(60)),
2515            on_timeout: zlayer_spec::TimeoutAction::Fail,
2516        }];
2517
2518        let satisfied = manager.check_dependencies(&deps).await.unwrap();
2519        assert!(!satisfied);
2520    }
2521
2522    #[tokio::test]
2523    async fn test_health_state_tracking() {
2524        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2525        let manager = ServiceManager::new(runtime);
2526
2527        // Update health states
2528        manager
2529            .update_health_state("db", HealthState::Healthy)
2530            .await;
2531        manager
2532            .update_health_state("cache", HealthState::Unknown)
2533            .await;
2534
2535        // Verify states
2536        let states = manager.health_states();
2537        let states_read = states.read().await;
2538
2539        assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
2540        assert!(matches!(
2541            states_read.get("cache"),
2542            Some(HealthState::Unknown)
2543        ));
2544    }
2545
2546    // ==================== Job/Cron Integration Tests ====================
2547
2548    fn mock_job_spec() -> ServiceSpec {
2549        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2550            r"
2551version: v1
2552deployment: test
2553services:
2554  backup:
2555    rtype: job
2556    image:
2557      name: backup:latest
2558",
2559        )
2560        .unwrap()
2561        .services
2562        .remove("backup")
2563        .unwrap()
2564    }
2565
2566    fn mock_cron_spec() -> ServiceSpec {
2567        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2568            r#"
2569version: v1
2570deployment: test
2571services:
2572  cleanup:
2573    rtype: cron
2574    schedule: "0 0 * * * * *"
2575    image:
2576      name: cleanup:latest
2577"#,
2578        )
2579        .unwrap()
2580        .services
2581        .remove("cleanup")
2582        .unwrap()
2583    }
2584
2585    #[tokio::test]
2586    async fn test_service_manager_with_job_executor() {
2587        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2588        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2589
2590        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2591
2592        // Register job
2593        let job_spec = mock_job_spec();
2594        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
2595            .await
2596            .unwrap();
2597
2598        // Trigger job
2599        let exec_id = manager
2600            .trigger_job("backup", JobTrigger::Cli)
2601            .await
2602            .unwrap();
2603
2604        // Give job time to start
2605        tokio::time::sleep(Duration::from_millis(50)).await;
2606
2607        // Check execution exists
2608        let execution = manager.get_job_execution(&exec_id).await;
2609        assert!(execution.is_some());
2610        assert_eq!(execution.unwrap().job_name, "backup");
2611    }
2612
2613    #[tokio::test]
2614    async fn test_service_manager_with_cron_scheduler() {
2615        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2616        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2617        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2618
2619        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2620
2621        // Register cron job
2622        let cron_spec = mock_cron_spec();
2623        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
2624            .await
2625            .unwrap();
2626
2627        // List cron jobs
2628        let cron_jobs = manager.list_cron_jobs().await;
2629        assert_eq!(cron_jobs.len(), 1);
2630        assert_eq!(cron_jobs[0].name, "cleanup");
2631        assert!(cron_jobs[0].enabled);
2632    }
2633
2634    #[tokio::test]
2635    async fn test_service_manager_trigger_cron() {
2636        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2637        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2638        let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
2639
2640        let manager = ServiceManager::new(runtime)
2641            .with_job_executor(job_executor)
2642            .with_cron_scheduler(cron_scheduler);
2643
2644        // Register cron job
2645        let cron_spec = mock_cron_spec();
2646        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
2647            .await
2648            .unwrap();
2649
2650        // Manually trigger the cron job
2651        let exec_id = manager.trigger_cron("cleanup").await.unwrap();
2652        assert!(!exec_id.0.is_empty());
2653    }
2654
2655    #[tokio::test]
2656    async fn test_service_manager_enable_disable_cron() {
2657        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2658        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2659        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2660
2661        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2662
2663        // Register cron job
2664        let cron_spec = mock_cron_spec();
2665        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
2666            .await
2667            .unwrap();
2668
2669        // Initially enabled
2670        let cron_jobs = manager.list_cron_jobs().await;
2671        assert!(cron_jobs[0].enabled);
2672
2673        // Disable
2674        manager.set_cron_enabled("cleanup", false).await;
2675        let cron_jobs = manager.list_cron_jobs().await;
2676        assert!(!cron_jobs[0].enabled);
2677
2678        // Re-enable
2679        manager.set_cron_enabled("cleanup", true).await;
2680        let cron_jobs = manager.list_cron_jobs().await;
2681        assert!(cron_jobs[0].enabled);
2682    }
2683
2684    #[tokio::test]
2685    async fn test_service_manager_remove_cleans_up_job() {
2686        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2687        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2688
2689        let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
2690
2691        // Register job
2692        let job_spec = mock_job_spec();
2693        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
2694            .await
2695            .unwrap();
2696
2697        // Verify job is registered
2698        let spec = job_executor.get_job_spec("backup").await;
2699        assert!(spec.is_some());
2700
2701        // Remove job
2702        manager.remove_service("backup").await.unwrap();
2703
2704        // Verify job is unregistered
2705        let spec = job_executor.get_job_spec("backup").await;
2706        assert!(spec.is_none());
2707    }
2708
2709    #[tokio::test]
2710    async fn test_service_manager_remove_cleans_up_cron() {
2711        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2712        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2713        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2714
2715        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
2716
2717        // Register cron job
2718        let cron_spec = mock_cron_spec();
2719        Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
2720            .await
2721            .unwrap();
2722
2723        // Verify cron job is registered
2724        assert_eq!(cron_scheduler.job_count().await, 1);
2725
2726        // Remove cron job
2727        manager.remove_service("cleanup").await.unwrap();
2728
2729        // Verify cron job is unregistered
2730        assert_eq!(cron_scheduler.job_count().await, 0);
2731    }
2732
2733    #[tokio::test]
2734    async fn test_service_manager_job_without_executor() {
2735        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2736        let manager = ServiceManager::new(runtime);
2737
2738        // Try to trigger job without executor configured
2739        let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
2740        assert!(result.is_err());
2741        assert!(result.unwrap_err().to_string().contains("not configured"));
2742    }
2743
2744    #[tokio::test]
2745    async fn test_service_manager_cron_without_scheduler() {
2746        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2747        let manager = ServiceManager::new(runtime);
2748
2749        // Try to register cron job without scheduler configured
2750        let cron_spec = mock_cron_spec();
2751        let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
2752        assert!(result.is_err());
2753        assert!(result.unwrap_err().to_string().contains("not configured"));
2754    }
2755
2756    #[tokio::test]
2757    async fn test_service_manager_list_job_executions() {
2758        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2759        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2760
2761        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2762
2763        // Register job
2764        let job_spec = mock_job_spec();
2765        Box::pin(manager.upsert_service("backup".to_string(), job_spec))
2766            .await
2767            .unwrap();
2768
2769        // Trigger job twice
2770        manager
2771            .trigger_job("backup", JobTrigger::Cli)
2772            .await
2773            .unwrap();
2774        manager
2775            .trigger_job("backup", JobTrigger::Scheduler)
2776            .await
2777            .unwrap();
2778
2779        // Give jobs time to start
2780        tokio::time::sleep(Duration::from_millis(50)).await;
2781
2782        // List executions
2783        let executions = manager.list_job_executions("backup").await;
2784        assert_eq!(executions.len(), 2);
2785    }
2786
2787    // ==================== Container Supervisor Integration Tests ====================
2788
2789    #[tokio::test]
2790    async fn test_service_manager_with_supervisor() {
2791        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2792        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2793
2794        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2795
2796        // Add service
2797        let spec = mock_spec();
2798        Box::pin(manager.upsert_service("api".to_string(), spec))
2799            .await
2800            .unwrap();
2801
2802        // Scale up - containers should be registered with supervisor
2803        manager.scale_service("api", 2).await.unwrap();
2804
2805        // Verify containers are supervised
2806        assert_eq!(supervisor.supervised_count().await, 2);
2807
2808        // Scale down - containers should be unregistered
2809        manager.scale_service("api", 1).await.unwrap();
2810        assert_eq!(supervisor.supervised_count().await, 1);
2811
2812        // Remove service - remaining containers should be unregistered
2813        manager.remove_service("api").await.unwrap();
2814        assert_eq!(supervisor.supervised_count().await, 0);
2815    }
2816
2817    #[tokio::test]
2818    async fn test_service_manager_supervisor_state() {
2819        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2820        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2821
2822        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
2823
2824        // Add and scale service
2825        let spec = mock_spec();
2826        Box::pin(manager.upsert_service("web".to_string(), spec))
2827            .await
2828            .unwrap();
2829        manager.scale_service("web", 1).await.unwrap();
2830
2831        // Check supervised state
2832        let container_id = ContainerId {
2833            service: "web".to_string(),
2834            replica: 1,
2835        };
2836        let state = manager.get_container_supervised_state(&container_id).await;
2837        assert_eq!(state, Some(SupervisedState::Running));
2838    }
2839
2840    #[tokio::test]
2841    async fn test_service_manager_start_supervisor() {
2842        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2843        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2844
2845        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2846
2847        // Start the supervisor
2848        let handle = manager.start_container_supervisor().unwrap();
2849
2850        // Give it time to start
2851        tokio::time::sleep(Duration::from_millis(50)).await;
2852        assert!(supervisor.is_running());
2853
2854        // Shutdown
2855        manager.shutdown_container_supervisor();
2856
2857        // Wait for it to stop
2858        tokio::time::timeout(Duration::from_secs(1), handle)
2859            .await
2860            .unwrap()
2861            .unwrap();
2862
2863        assert!(!supervisor.is_running());
2864    }
2865
2866    #[tokio::test]
2867    async fn test_service_manager_supervisor_not_configured() {
2868        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2869        let manager = ServiceManager::new(runtime);
2870
2871        // Try to start supervisor without configuring it
2872        let result = manager.start_container_supervisor();
2873        assert!(result.is_err());
2874        assert!(result.unwrap_err().to_string().contains("not configured"));
2875    }
2876
2877    // ==================== Stream Registry Integration Tests ====================
2878
2879    fn mock_tcp_spec() -> ServiceSpec {
2880        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2881            r"
2882version: v1
2883deployment: test
2884services:
2885  database:
2886    rtype: service
2887    image:
2888      name: postgres:latest
2889    endpoints:
2890      - name: postgresql
2891        protocol: tcp
2892        port: 5432
2893    scale:
2894      mode: fixed
2895      replicas: 1
2896",
2897        )
2898        .unwrap()
2899        .services
2900        .remove("database")
2901        .unwrap()
2902    }
2903
2904    fn mock_udp_spec() -> ServiceSpec {
2905        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2906            r"
2907version: v1
2908deployment: test
2909services:
2910  dns:
2911    rtype: service
2912    image:
2913      name: dns:latest
2914    endpoints:
2915      - name: dns
2916        protocol: udp
2917        port: 53
2918    scale:
2919      mode: fixed
2920      replicas: 1
2921",
2922        )
2923        .unwrap()
2924        .services
2925        .remove("dns")
2926        .unwrap()
2927    }
2928
2929    fn mock_mixed_spec() -> ServiceSpec {
2930        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2931            r"
2932version: v1
2933deployment: test
2934services:
2935  mixed:
2936    rtype: service
2937    image:
2938      name: mixed:latest
2939    endpoints:
2940      - name: http
2941        protocol: http
2942        port: 8080
2943      - name: grpc
2944        protocol: tcp
2945        port: 9000
2946      - name: metrics
2947        protocol: udp
2948        port: 8125
2949    scale:
2950      mode: fixed
2951      replicas: 1
2952",
2953        )
2954        .unwrap()
2955        .services
2956        .remove("mixed")
2957        .unwrap()
2958    }
2959
2960    #[tokio::test]
2961    async fn test_service_manager_with_stream_registry_tcp() {
2962        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2963        let stream_registry = Arc::new(StreamRegistry::new());
2964
2965        let mut manager = ServiceManager::new(runtime);
2966        manager.set_stream_registry(stream_registry.clone());
2967        manager.set_deployment_name("test".to_string());
2968
2969        // Add TCP-only service
2970        let spec = mock_tcp_spec();
2971        Box::pin(manager.upsert_service("database".to_string(), spec))
2972            .await
2973            .unwrap();
2974
2975        // Verify TCP route was registered
2976        assert_eq!(stream_registry.tcp_count(), 1);
2977        assert!(stream_registry.tcp_ports().contains(&5432));
2978
2979        // Remove service and verify cleanup
2980        manager.remove_service("database").await.unwrap();
2981        assert_eq!(stream_registry.tcp_count(), 0);
2982    }
2983
2984    #[tokio::test]
2985    async fn test_service_manager_with_stream_registry_udp() {
2986        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2987        let stream_registry = Arc::new(StreamRegistry::new());
2988
2989        let mut manager = ServiceManager::new(runtime);
2990        manager.set_stream_registry(stream_registry.clone());
2991        manager.set_deployment_name("test".to_string());
2992
2993        // Add UDP-only service
2994        let spec = mock_udp_spec();
2995        Box::pin(manager.upsert_service("dns".to_string(), spec))
2996            .await
2997            .unwrap();
2998
2999        // Verify UDP route was registered
3000        assert_eq!(stream_registry.udp_count(), 1);
3001        assert!(stream_registry.udp_ports().contains(&53));
3002
3003        // Remove service and verify cleanup
3004        manager.remove_service("dns").await.unwrap();
3005        assert_eq!(stream_registry.udp_count(), 0);
3006    }
3007
3008    #[tokio::test]
3009    async fn test_service_manager_with_stream_registry_mixed() {
3010        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3011        let stream_registry = Arc::new(StreamRegistry::new());
3012
3013        let mut manager = ServiceManager::new(runtime);
3014        manager.set_stream_registry(stream_registry.clone());
3015        manager.set_deployment_name("test".to_string());
3016
3017        // Add mixed service (HTTP + TCP + UDP)
3018        let spec = mock_mixed_spec();
3019        Box::pin(manager.upsert_service("mixed".to_string(), spec))
3020            .await
3021            .unwrap();
3022
3023        // Verify stream routes were registered
3024        assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
3025        assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
3026
3027        assert!(stream_registry.tcp_ports().contains(&9000));
3028        assert!(stream_registry.udp_ports().contains(&8125));
3029
3030        // Remove service and verify stream cleanup
3031        manager.remove_service("mixed").await.unwrap();
3032        assert_eq!(stream_registry.tcp_count(), 0);
3033        assert_eq!(stream_registry.udp_count(), 0);
3034    }
3035
3036    #[tokio::test]
3037    async fn test_service_manager_stream_registry_builder() {
3038        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3039        let stream_registry = Arc::new(StreamRegistry::new());
3040
3041        // Test builder pattern
3042        let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
3043
3044        // Verify stream registry is accessible
3045        assert!(manager.stream_registry().is_some());
3046    }
3047
3048    #[tokio::test]
3049    async fn test_tcp_service_without_stream_registry() {
3050        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3051
3052        // Manager without stream registry
3053        let mut manager = ServiceManager::new(runtime);
3054        manager.set_deployment_name("test".to_string());
3055
3056        // Add TCP service - should log warning but not fail
3057        let spec = mock_tcp_spec();
3058        Box::pin(manager.upsert_service("database".to_string(), spec))
3059            .await
3060            .unwrap();
3061
3062        // No stream registry to check, but service should be tracked
3063        let services = manager.list_services().await;
3064        assert!(services.contains(&"database".to_string()));
3065    }
3066}