Skip to main content

zlayer_agent/
service.rs

1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6    DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, ResourceType, ServiceSpec};
24
25/// Service instance manages a single service's containers
26pub struct ServiceInstance {
27    pub service_name: String,
28    pub spec: ServiceSpec,
29    runtime: Arc<dyn Runtime + Send + Sync>,
30    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31    /// Overlay network manager for container networking (optional, not needed for Docker runtime)
32    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33    /// Proxy manager for updating backend health (optional)
34    proxy_manager: Option<Arc<ProxyManager>>,
35    /// DNS server for service discovery (optional)
36    dns_server: Option<Arc<DnsServer>>,
37    /// Shared health states map so callbacks can update ServiceManager-level health
38    health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
39}
40
41impl ServiceInstance {
42    /// Create a new service instance
43    pub fn new(
44        service_name: String,
45        spec: ServiceSpec,
46        runtime: Arc<dyn Runtime + Send + Sync>,
47        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
48    ) -> Self {
49        Self {
50            service_name,
51            spec,
52            runtime,
53            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
54            overlay_manager,
55            proxy_manager: None,
56            dns_server: None,
57            health_states: None,
58        }
59    }
60
61    /// Create a new service instance with proxy manager for health-aware load balancing
62    pub fn with_proxy(
63        service_name: String,
64        spec: ServiceSpec,
65        runtime: Arc<dyn Runtime + Send + Sync>,
66        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
67        proxy_manager: Arc<ProxyManager>,
68    ) -> Self {
69        Self {
70            service_name,
71            spec,
72            runtime,
73            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
74            overlay_manager,
75            proxy_manager: Some(proxy_manager),
76            dns_server: None,
77            health_states: None,
78        }
79    }
80
81    /// Builder method to add DNS server for service discovery
82    #[must_use]
83    pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
84        self.dns_server = Some(dns_server);
85        self
86    }
87
88    /// Set the DNS server for service discovery
89    pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
90        self.dns_server = Some(dns_server);
91    }
92
93    /// Set the proxy manager for health-aware load balancing
94    pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
95        self.proxy_manager = Some(proxy_manager);
96    }
97
98    /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
99    pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
100        self.health_states = Some(states);
101    }
102
103    /// Scale to the desired number of replicas
104    ///
105    /// This method uses short-lived locks to avoid blocking concurrent operations.
106    /// I/O operations (pull, create, start, stop, remove) are performed without
107    /// holding the containers lock to allow other operations to proceed.
108    ///
109    /// # Errors
110    /// Returns an error if image pull, container creation, or container lifecycle operations fail.
111    #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
112    pub async fn scale_to(&self, replicas: u32) -> Result<()> {
113        // Phase 1: Determine current state (short read lock)
114        let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
115
116        // Phase 2: Scale up - create new containers (no lock held during I/O)
117        if replicas > current_replicas {
118            // Pull image ONCE before creating any replicas (cached layers are reused)
119            self.runtime
120                .pull_image_with_policy(&self.spec.image.name, self.spec.image.pull_policy, None)
121                .await
122                .map_err(|e| AgentError::PullFailed {
123                    image: self.spec.image.name.clone(),
124                    reason: e.to_string(),
125                })?;
126
127            for i in current_replicas..replicas {
128                let id = ContainerId {
129                    service: self.service_name.clone(),
130                    replica: i + 1,
131                };
132
133                // Create container (no lock needed - I/O operation)
134                //
135                // RouteToPeer must propagate unchanged: the scheduler uses it
136                // to re-place the workload on a capable peer, and wrapping it
137                // in `CreateFailed` would hide the signal and mark the service
138                // dead instead of rescheduling it. All other errors are
139                // normalised to `CreateFailed` for upstream handling.
140                self.runtime
141                    .create_container(&id, &self.spec)
142                    .await
143                    .map_err(|e| match e {
144                        AgentError::RouteToPeer { .. } => e,
145                        other => AgentError::CreateFailed {
146                            id: id.to_string(),
147                            reason: other.to_string(),
148                        },
149                    })?;
150
151                // Run init actions with error policy enforcement (no lock needed)
152                let init_orchestrator = InitOrchestrator::with_error_policy(
153                    id.clone(),
154                    self.spec.init.clone(),
155                    self.spec.errors.clone(),
156                );
157                init_orchestrator.run().await?;
158
159                // Start container (no lock needed - I/O operation)
160                self.runtime
161                    .start_container(&id)
162                    .await
163                    .map_err(|e| AgentError::StartFailed {
164                        id: id.to_string(),
165                        reason: e.to_string(),
166                    })?;
167
168                // Get container PID with retries (may not be immediately available)
169                let mut container_pid = None;
170                for attempt in 1..=5u32 {
171                    match self.runtime.get_container_pid(&id).await {
172                        Ok(Some(pid)) => {
173                            container_pid = Some(pid);
174                            break;
175                        }
176                        Ok(None) if attempt < 5 => {
177                            tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
178                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
179                        }
180                        Ok(None) => {
181                            tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
182                        }
183                        Err(e) => {
184                            tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
185                            if attempt < 5 {
186                                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
187                            }
188                        }
189                    }
190                }
191
192                // Verify the container is still running before attempting
193                // overlay attach. If the init process crashed during start
194                // (bad image, missing libs, failed mount), the PID above is
195                // now dead and every `ip link set ... netns {pid}` will
196                // return a cryptic RTNETLINK error. Surface the real cause
197                // from the container's log tail instead of the cascade.
198                if container_pid.is_some() {
199                    let alive = match self.runtime.container_state(&id).await {
200                        Ok(
201                            ContainerState::Running
202                            | ContainerState::Pending
203                            | ContainerState::Initializing,
204                        ) => true,
205                        Ok(state) => {
206                            tracing::warn!(
207                                container = %id,
208                                ?state,
209                                "container exited before overlay attach could run"
210                            );
211                            false
212                        }
213                        Err(e) => {
214                            // State query failed — don't block the attach on
215                            // it. The overlay manager's own cleanup-on-error
216                            // path now handles the dead-PID case cleanly.
217                            tracing::warn!(
218                                container = %id,
219                                error = %e,
220                                "container state query failed before overlay attach, proceeding"
221                            );
222                            true
223                        }
224                    };
225                    if !alive {
226                        let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
227                            || "  <log read failed>".to_string(),
228                            |entries| {
229                                if entries.is_empty() {
230                                    "  <no log output>".to_string()
231                                } else {
232                                    entries
233                                        .into_iter()
234                                        .map(|e| format!("  {}", e.message))
235                                        .collect::<Vec<_>>()
236                                        .join("\n")
237                                }
238                            },
239                        );
240                        return Err(AgentError::StartFailed {
241                            id: id.to_string(),
242                            reason: format!("container exited during startup:\n{log_tail}"),
243                        });
244                    }
245                }
246
247                // Attach to overlay network if manager is available.
248                //
249                // Linux uses the container PID to enter the netns and attach a
250                // veth. Windows has no PID-addressable netns — the HCN namespace
251                // GUID (obtained from `get_container_namespace_id`) is used
252                // instead, and the endpoint's IP has already been populated by
253                // `EndpointAttachment::create_overlay` during container creation.
254                // We simply register that IP with the slice allocator so host
255                // accounting stays in sync.
256                let overlay_ip = if let Some(overlay) = &self.overlay_manager {
257                    let overlay_guard = overlay.read().await;
258                    #[cfg(target_os = "windows")]
259                    let attach_result: Option<std::net::IpAddr> = {
260                        let _ = container_pid; // unused on Windows
261                        match self.runtime.get_container_namespace_id(&id).await {
262                            Ok(Some(ns_id)) => {
263                                let ip_override =
264                                    self.runtime.get_container_ip(&id).await.ok().flatten();
265                                let dns_server = overlay_guard.dns_server_addr().map(|sa| sa.ip());
266                                let dns_domain =
267                                    overlay_guard.dns_domain().map(ToString::to_string);
268                                match overlay_guard
269                                    .attach_container_hcn(
270                                        ns_id,
271                                        &self.service_name,
272                                        ip_override,
273                                        true,
274                                        dns_server,
275                                        dns_domain,
276                                    )
277                                    .await
278                                {
279                                    Ok(ip) => Some(ip),
280                                    Err(e) => {
281                                        tracing::warn!(
282                                            container = %id,
283                                            error = %e,
284                                            "HCN overlay attach failed"
285                                        );
286                                        None
287                                    }
288                                }
289                            }
290                            Ok(None) => {
291                                tracing::debug!(
292                                    container = %id,
293                                    "skipping HCN overlay attach - no namespace id available"
294                                );
295                                None
296                            }
297                            Err(e) => {
298                                tracing::warn!(
299                                    container = %id,
300                                    error = %e,
301                                    "failed to fetch HCN namespace id"
302                                );
303                                None
304                            }
305                        }
306                    };
307                    #[cfg(not(target_os = "windows"))]
308                    let attach_result: Option<std::net::IpAddr> = {
309                        if let Some(pid) = container_pid {
310                            match overlay_guard
311                                .attach_container(pid, &self.service_name, true)
312                                .await
313                            {
314                                Ok(ip) => Some(ip),
315                                Err(e) => {
316                                    tracing::warn!(
317                                        container = %id,
318                                        error = %e,
319                                        "failed to attach container to overlay network"
320                                    );
321                                    None
322                                }
323                            }
324                        } else {
325                            // No PID available (e.g. WASM runtime) - skip overlay attachment
326                            tracing::debug!(
327                                container = %id,
328                                "skipping overlay attachment - no PID available"
329                            );
330                            None
331                        }
332                    };
333
334                    if let Some(ip) = attach_result {
335                        tracing::info!(
336                            container = %id,
337                            overlay_ip = %ip,
338                            "attached container to overlay network"
339                        );
340
341                        // Register DNS for service discovery
342                        if let Some(dns) = &self.dns_server {
343                            // Register service-level hostname: {service}.service.local
344                            let service_hostname = format!("{}.service.local", self.service_name);
345
346                            // Register replica-specific hostname: {replica}.{service}.service.local
347                            let replica_hostname =
348                                format!("{}.{}.service.local", id.replica, self.service_name);
349
350                            match dns.add_record(&service_hostname, ip).await {
351                                Ok(()) => tracing::debug!(
352                                    hostname = %service_hostname,
353                                    ip = %ip,
354                                    "registered DNS for service"
355                                ),
356                                Err(e) => tracing::warn!(
357                                    hostname = %service_hostname,
358                                    error = %e,
359                                    "failed to register DNS for service"
360                                ),
361                            }
362
363                            // Also register replica-specific entry
364                            if let Err(e) = dns.add_record(&replica_hostname, ip).await {
365                                tracing::warn!(
366                                    hostname = %replica_hostname,
367                                    error = %e,
368                                    "failed to register replica DNS"
369                                );
370                            } else {
371                                tracing::debug!(
372                                    hostname = %replica_hostname,
373                                    ip = %ip,
374                                    "registered DNS for replica"
375                                );
376                            }
377                        }
378
379                        Some(ip)
380                    } else {
381                        None
382                    }
383                } else {
384                    None
385                };
386
387                // If overlay failed, try the container runtime's own IP as fallback
388                let effective_ip = if overlay_ip.is_none() {
389                    match self.runtime.get_container_ip(&id).await {
390                        Ok(Some(ip)) => {
391                            tracing::info!(
392                                container = %id,
393                                ip = %ip,
394                                "using runtime container IP for proxy (overlay unavailable)"
395                            );
396                            Some(ip)
397                        }
398                        Ok(None) => {
399                            tracing::warn!(
400                                container = %id,
401                                "no container IP available from runtime, proxy routing will be unavailable"
402                            );
403                            None
404                        }
405                        Err(e) => {
406                            tracing::warn!(
407                                container = %id,
408                                error = %e,
409                                "failed to get container IP from runtime"
410                            );
411                            None
412                        }
413                    }
414                } else {
415                    overlay_ip
416                };
417
418                tracing::info!(
419                    container = %id,
420                    service = %self.service_name,
421                    overlay_ip = ?overlay_ip,
422                    effective_ip = ?effective_ip,
423                    "Container IP resolution complete"
424                );
425
426                // Query port override from the runtime.
427                // On macOS sandbox, each container is assigned a unique port since
428                // all processes share the host network (no network namespaces).
429                // The runtime passes the port to the process via the PORT env var.
430                let port_override = match self.runtime.get_container_port_override(&id).await {
431                    Ok(Some(port)) => {
432                        tracing::info!(
433                            container = %id,
434                            port = port,
435                            "runtime assigned dynamic port override for this container"
436                        );
437                        Some(port)
438                    }
439                    Ok(None) => None,
440                    Err(e) => {
441                        tracing::warn!(
442                            container = %id,
443                            error = %e,
444                            "failed to query port override from runtime, using spec port"
445                        );
446                        None
447                    }
448                };
449
450                // Start health monitoring and store handle (no lock needed during start)
451                let health_monitor_handle = {
452                    let mut check = self.spec.health.check.clone();
453
454                    // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
455                    // port the container is listening on. With mac-sandbox, each
456                    // replica gets a unique assigned port via port_override.
457                    if let HealthCheck::Tcp { ref mut port } = check {
458                        if *port == 0 {
459                            *port = port_override.unwrap_or_else(|| {
460                                self.spec
461                                    .endpoints
462                                    .iter()
463                                    .find(|ep| {
464                                        matches!(
465                                            ep.protocol,
466                                            Protocol::Http | Protocol::Https | Protocol::Websocket
467                                        )
468                                    })
469                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
470                            });
471                        }
472                    }
473
474                    let start_grace = self
475                        .spec
476                        .health
477                        .start_grace
478                        .unwrap_or(Duration::from_secs(5));
479                    let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
480                    let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
481                    let retries = self.spec.health.retries;
482
483                    let checker = HealthChecker::new(check, effective_ip);
484                    let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
485                        .with_start_grace(start_grace)
486                        .with_check_timeout(check_timeout);
487
488                    // Create health callback to update proxy backend health if proxy is configured
489                    // and we have an overlay IP for this container
490                    if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
491                        let proxy = Arc::clone(proxy);
492                        let service_name = self.service_name.clone();
493                        // Get the container's target port, using the runtime override if present.
494                        // On macOS sandbox, port_override gives each replica a unique port
495                        // so the proxy can distinguish backends sharing 127.0.0.1.
496                        let port = port_override.unwrap_or_else(|| {
497                            self.spec
498                                .endpoints
499                                .iter()
500                                .find(|ep| {
501                                    matches!(
502                                        ep.protocol,
503                                        Protocol::Http | Protocol::Https | Protocol::Websocket
504                                    )
505                                })
506                                .map_or(8080, zlayer_spec::EndpointSpec::target_port)
507                        });
508
509                        let backend_addr = SocketAddr::new(ip, port);
510
511                        // Register backend with load balancer so proxy can route to it.
512                        // This must happen before the health callback is created, because
513                        // update_backend_health only updates *existing* backends.
514                        proxy.add_backend(&self.service_name, backend_addr).await;
515
516                        let health_states_opt = self.health_states.clone();
517                        let svc_name_for_states = self.service_name.clone();
518
519                        let health_callback: HealthCallback =
520                            Arc::new(move |container_id: ContainerId, is_healthy: bool| {
521                                let proxy = Arc::clone(&proxy);
522                                let service_name = service_name.clone();
523                                tracing::info!(
524                                    container = %container_id,
525                                    service = %service_name,
526                                    backend = %backend_addr,
527                                    healthy = is_healthy,
528                                    "health status changed, updating proxy backend"
529                                );
530                                // Spawn a task to update the proxy (callback is sync, proxy update is async)
531                                tokio::spawn(async move {
532                                    proxy
533                                        .update_backend_health(
534                                            &service_name,
535                                            backend_addr,
536                                            is_healthy,
537                                        )
538                                        .await;
539                                });
540                                // Bridge health state back to ServiceManager's health_states map
541                                if let Some(ref health_states) = health_states_opt {
542                                    let states = Arc::clone(health_states);
543                                    let svc = svc_name_for_states.clone();
544                                    tokio::spawn(async move {
545                                        let state = if is_healthy {
546                                            HealthState::Healthy
547                                        } else {
548                                            HealthState::Unhealthy {
549                                                failures: 0,
550                                                reason: "health check failed".into(),
551                                            }
552                                        };
553                                        states.write().await.insert(svc, state);
554                                    });
555                                }
556                            });
557
558                        monitor = monitor.with_callback(health_callback);
559                    }
560
561                    monitor.start()
562                };
563
564                // Update state (short write lock)
565                {
566                    let mut containers = self.containers.write().await;
567                    containers.insert(
568                        id.clone(),
569                        Container {
570                            id: id.clone(),
571                            state: ContainerState::Running,
572                            pid: None,
573                            task: None,
574                            overlay_ip: effective_ip,
575                            health_monitor: Some(health_monitor_handle),
576                            port_override,
577                        },
578                    );
579                } // Lock released here
580            }
581        }
582
583        // Phase 3: Scale down - remove containers (short write lock per removal)
584        if replicas < current_replicas {
585            for i in replicas..current_replicas {
586                let id = ContainerId {
587                    service: self.service_name.clone(),
588                    replica: i + 1,
589                };
590
591                // Remove from state first and get the container to abort health monitor (short write lock)
592                let removed_container = {
593                    let mut containers = self.containers.write().await;
594                    containers.remove(&id)
595                }; // Lock released here
596
597                // Then perform cleanup (no lock held - I/O operations)
598                if let Some(container) = removed_container {
599                    // Abort the health monitor task if it exists
600                    if let Some(handle) = container.health_monitor {
601                        handle.abort();
602                    }
603
604                    // Remove DNS records for this container
605                    if let Some(dns) = &self.dns_server {
606                        // Remove replica-specific DNS entry
607                        let replica_hostname =
608                            format!("{}.{}.service.local", id.replica, self.service_name);
609                        if let Err(e) = dns.remove_record(&replica_hostname).await {
610                            tracing::warn!(
611                                hostname = %replica_hostname,
612                                error = %e,
613                                "failed to remove replica DNS record"
614                            );
615                        } else {
616                            tracing::debug!(
617                                hostname = %replica_hostname,
618                                "removed replica DNS record"
619                            );
620                        }
621
622                        // Note: We don't remove the service-level hostname here because
623                        // other replicas may still be using it. The service-level record
624                        // should be cleaned up when the entire service is removed.
625                    }
626
627                    // Stop container
628                    self.runtime
629                        .stop_container(&id, Duration::from_secs(30))
630                        .await?;
631
632                    // Sync volumes to S3 before removal (no-op if not configured)
633                    if let Err(e) = self.runtime.sync_container_volumes(&id).await {
634                        tracing::warn!(
635                            container = %id,
636                            error = %e,
637                            "failed to sync volumes before removal"
638                        );
639                    }
640
641                    // Remove container
642                    self.runtime.remove_container(&id).await?;
643                }
644            }
645        }
646
647        Ok(())
648    }
649
650    /// Get current number of replicas
651    pub async fn replica_count(&self) -> usize {
652        self.containers.read().await.len()
653    }
654
655    /// Get all container IDs
656    pub async fn container_ids(&self) -> Vec<ContainerId> {
657        self.containers.read().await.keys().cloned().collect()
658    }
659
660    /// Get read access to the containers map
661    ///
662    /// This allows callers to access container overlay IPs and other metadata
663    /// without copying the entire map.
664    pub fn containers(
665        &self,
666    ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
667        &self.containers
668    }
669
670    /// Check if this service instance has an overlay manager configured
671    pub fn has_overlay_manager(&self) -> bool {
672        self.overlay_manager.is_some()
673    }
674
675    /// Check if this service instance has a proxy manager configured
676    pub fn has_proxy_manager(&self) -> bool {
677        self.proxy_manager.is_some()
678    }
679
680    /// Check if this service instance has a DNS server configured
681    pub fn has_dns_server(&self) -> bool {
682        self.dns_server.is_some()
683    }
684}
685
686/// Service manager for multiple services
687pub struct ServiceManager {
688    runtime: Arc<dyn Runtime + Send + Sync>,
689    services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
690    scale_semaphore: Arc<Semaphore>,
691    /// Overlay network manager for container networking
692    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
693    /// Stream registry for L4 proxy route registration (TCP/UDP)
694    stream_registry: Option<Arc<StreamRegistry>>,
695    /// Proxy manager for health-aware load balancing (hyper-based proxy)
696    proxy_manager: Option<Arc<ProxyManager>>,
697    /// DNS server for service discovery
698    dns_server: Option<Arc<DnsServer>>,
699    /// Deployment name (used for generating hostnames)
700    deployment_name: Option<String>,
701    /// Health states for dependency condition checking
702    health_states: Arc<RwLock<HashMap<String, HealthState>>>,
703    /// Job executor for run-to-completion workloads
704    job_executor: Option<Arc<JobExecutor>>,
705    /// Cron scheduler for time-based job triggers
706    cron_scheduler: Option<Arc<CronScheduler>>,
707    /// Container supervisor for crash/panic policy enforcement
708    container_supervisor: Option<Arc<ContainerSupervisor>>,
709}
710
711// ---------------------------------------------------------------------------
712// ServiceManagerBuilder
713// ---------------------------------------------------------------------------
714
715/// Builder for constructing a [`ServiceManager`] with optional subsystems.
716///
717/// Prefer using `ServiceManager::builder(runtime)` to start building.
718///
719/// # Example
720///
721/// ```ignore
722/// let manager = ServiceManager::builder(runtime)
723///     .overlay_manager(om)
724///     .proxy_manager(proxy)
725///     .deployment_name("prod")
726///     .build();
727/// ```
728pub struct ServiceManagerBuilder {
729    runtime: Arc<dyn Runtime + Send + Sync>,
730    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
731    proxy_manager: Option<Arc<ProxyManager>>,
732    stream_registry: Option<Arc<StreamRegistry>>,
733    dns_server: Option<Arc<DnsServer>>,
734    deployment_name: Option<String>,
735    job_executor: Option<Arc<JobExecutor>>,
736    cron_scheduler: Option<Arc<CronScheduler>>,
737    container_supervisor: Option<Arc<ContainerSupervisor>>,
738}
739
740impl ServiceManagerBuilder {
741    /// Create a new builder with the required runtime.
742    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
743        Self {
744            runtime,
745            overlay_manager: None,
746            proxy_manager: None,
747            stream_registry: None,
748            dns_server: None,
749            deployment_name: None,
750            job_executor: None,
751            cron_scheduler: None,
752            container_supervisor: None,
753        }
754    }
755
756    /// Set the overlay network manager for container networking.
757    #[must_use]
758    pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
759        self.overlay_manager = Some(om);
760        self
761    }
762
763    /// Set the proxy manager for health-aware load balancing.
764    #[must_use]
765    pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
766        self.proxy_manager = Some(pm);
767        self
768    }
769
770    /// Set the stream registry for TCP/UDP L4 proxy route registration.
771    #[must_use]
772    pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
773        self.stream_registry = Some(sr);
774        self
775    }
776
777    /// Set the DNS server for service discovery.
778    #[must_use]
779    pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
780        self.dns_server = Some(dns);
781        self
782    }
783
784    /// Set the deployment name (used for hostname generation).
785    #[must_use]
786    pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
787        self.deployment_name = Some(name.into());
788        self
789    }
790
791    /// Set the job executor for run-to-completion workloads.
792    #[must_use]
793    pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
794        self.job_executor = Some(je);
795        self
796    }
797
798    /// Set the cron scheduler for time-based job triggers.
799    #[must_use]
800    pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
801        self.cron_scheduler = Some(cs);
802        self
803    }
804
805    /// Set the container supervisor for crash/panic policy enforcement.
806    #[must_use]
807    pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
808        self.container_supervisor = Some(cs);
809        self
810    }
811
812    /// Consume the builder and produce a fully-wired [`ServiceManager`].
813    ///
814    /// Logs warnings for missing recommended subsystems (proxy,
815    /// `stream_registry`, `container_supervisor`, `deployment_name`).
816    pub fn build(self) -> ServiceManager {
817        if self.proxy_manager.is_none() {
818            tracing::warn!("ServiceManager built without proxy_manager");
819        }
820        if self.stream_registry.is_none() {
821            tracing::warn!("ServiceManager built without stream_registry");
822        }
823        if self.container_supervisor.is_none() {
824            tracing::warn!("ServiceManager built without container_supervisor");
825        }
826        if self.deployment_name.is_none() {
827            tracing::warn!("ServiceManager built without deployment_name");
828        }
829
830        ServiceManager {
831            runtime: self.runtime,
832            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
833            scale_semaphore: Arc::new(Semaphore::new(10)),
834            overlay_manager: self.overlay_manager,
835            stream_registry: self.stream_registry,
836            proxy_manager: self.proxy_manager,
837            dns_server: self.dns_server,
838            deployment_name: self.deployment_name,
839            health_states: Arc::new(RwLock::new(HashMap::new())),
840            job_executor: self.job_executor,
841            cron_scheduler: self.cron_scheduler,
842            container_supervisor: self.container_supervisor,
843        }
844    }
845}
846
847impl ServiceManager {
848    /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
849    ///
850    /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
851    ///
852    /// # Example
853    ///
854    /// ```ignore
855    /// let manager = ServiceManager::builder(runtime)
856    ///     .overlay_manager(om)
857    ///     .proxy_manager(proxy)
858    ///     .build();
859    /// ```
860    pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
861        ServiceManagerBuilder::new(runtime)
862    }
863
864    /// Create a new service manager
865    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
866    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
867        Self {
868            runtime,
869            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
870            scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
871            overlay_manager: None,
872            stream_registry: None,
873            proxy_manager: None,
874            dns_server: None,
875            deployment_name: None,
876            health_states: Arc::new(RwLock::new(HashMap::new())),
877            job_executor: None,
878            cron_scheduler: None,
879            container_supervisor: None,
880        }
881    }
882
883    /// Create a service manager with overlay network support
884    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
885    pub fn with_overlay(
886        runtime: Arc<dyn Runtime + Send + Sync>,
887        overlay_manager: Arc<RwLock<OverlayManager>>,
888    ) -> Self {
889        Self {
890            runtime,
891            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
892            scale_semaphore: Arc::new(Semaphore::new(10)),
893            overlay_manager: Some(overlay_manager),
894            stream_registry: None,
895            proxy_manager: None,
896            dns_server: None,
897            deployment_name: None,
898            health_states: Arc::new(RwLock::new(HashMap::new())),
899            job_executor: None,
900            cron_scheduler: None,
901            container_supervisor: None,
902        }
903    }
904
905    /// Create a fully-configured service manager with overlay and proxy support
906    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
907    pub fn with_full_config(
908        runtime: Arc<dyn Runtime + Send + Sync>,
909        overlay_manager: Arc<RwLock<OverlayManager>>,
910        deployment_name: String,
911    ) -> Self {
912        Self {
913            runtime,
914            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
915            scale_semaphore: Arc::new(Semaphore::new(10)),
916            overlay_manager: Some(overlay_manager),
917            stream_registry: None,
918            proxy_manager: None,
919            dns_server: None,
920            deployment_name: Some(deployment_name),
921            health_states: Arc::new(RwLock::new(HashMap::new())),
922            job_executor: None,
923            cron_scheduler: None,
924            container_supervisor: None,
925        }
926    }
927
928    /// Get the health states map for external monitoring
929    pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
930        Arc::clone(&self.health_states)
931    }
932
933    /// Update health state for a service
934    pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
935        let mut states = self.health_states.write().await;
936        states.insert(service_name.to_string(), state);
937    }
938
939    /// Set the deployment name (used for generating hostnames)
940    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
941    pub fn set_deployment_name(&mut self, name: String) {
942        self.deployment_name = Some(name);
943    }
944
945    /// Set the stream registry for L4 proxy integration (TCP/UDP)
946    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
947    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
948        self.stream_registry = Some(registry);
949    }
950
951    /// Builder pattern: add stream registry for L4 proxy integration
952    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
953    #[must_use]
954    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
955        self.stream_registry = Some(registry);
956        self
957    }
958
959    /// Get the stream registry (if configured)
960    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
961        self.stream_registry.as_ref()
962    }
963
964    /// Set the overlay manager for container networking
965    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
966    pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
967        self.overlay_manager = Some(manager);
968    }
969
970    /// Set the proxy manager for health-aware load balancing
971    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
972    pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
973        self.proxy_manager = Some(proxy);
974    }
975
976    /// Builder pattern: add proxy manager for health-aware load balancing
977    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
978    #[must_use]
979    pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
980        self.proxy_manager = Some(proxy);
981        self
982    }
983
984    /// Get the proxy manager (if configured)
985    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
986        self.proxy_manager.as_ref()
987    }
988
989    /// Set the DNS server for service discovery
990    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
991    pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
992        self.dns_server = Some(dns);
993    }
994
995    /// Builder pattern: add DNS server for service discovery
996    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
997    #[must_use]
998    pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
999        self.dns_server = Some(dns);
1000        self
1001    }
1002
1003    /// Get the DNS server (if configured)
1004    pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1005        self.dns_server.as_ref()
1006    }
1007
1008    /// Set the job executor for run-to-completion workloads
1009    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1010    pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1011        self.job_executor = Some(executor);
1012    }
1013
1014    /// Set the cron scheduler for time-based job triggers
1015    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1016    pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1017        self.cron_scheduler = Some(scheduler);
1018    }
1019
1020    /// Builder pattern: add job executor
1021    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1022    #[must_use]
1023    pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1024        self.job_executor = Some(executor);
1025        self
1026    }
1027
1028    /// Builder pattern: add cron scheduler
1029    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1030    #[must_use]
1031    pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1032        self.cron_scheduler = Some(scheduler);
1033        self
1034    }
1035
1036    /// Get the job executor (if configured)
1037    pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1038        self.job_executor.as_ref()
1039    }
1040
1041    /// Get the cron scheduler (if configured)
1042    pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1043        self.cron_scheduler.as_ref()
1044    }
1045
1046    /// Set the container supervisor for crash/panic policy enforcement
1047    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1048    pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1049        self.container_supervisor = Some(supervisor);
1050    }
1051
1052    /// Builder pattern: add container supervisor
1053    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1054    #[must_use]
1055    pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1056        self.container_supervisor = Some(supervisor);
1057        self
1058    }
1059
1060    /// Get the container supervisor (if configured)
1061    pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1062        self.container_supervisor.as_ref()
1063    }
1064
1065    /// Start the container supervisor background task
1066    ///
1067    /// This spawns a background task that monitors containers for crashes
1068    /// and enforces the `on_panic` error policy.
1069    ///
1070    /// # Errors
1071    /// Returns an error if no container supervisor is configured.
1072    ///
1073    /// # Returns
1074    /// A `JoinHandle` for the supervisor task.
1075    pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1076        let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1077            AgentError::Configuration("Container supervisor not configured".to_string())
1078        })?;
1079
1080        let supervisor = Arc::clone(supervisor);
1081        Ok(tokio::spawn(async move {
1082            supervisor.run_loop().await;
1083        }))
1084    }
1085
1086    /// Shutdown the container supervisor
1087    pub fn shutdown_container_supervisor(&self) {
1088        if let Some(supervisor) = &self.container_supervisor {
1089            supervisor.shutdown();
1090        }
1091    }
1092
1093    /// Get the supervised state of a container
1094    pub async fn get_container_supervised_state(
1095        &self,
1096        container_id: &ContainerId,
1097    ) -> Option<SupervisedState> {
1098        if let Some(supervisor) = &self.container_supervisor {
1099            supervisor.get_state(container_id).await
1100        } else {
1101            None
1102        }
1103    }
1104
1105    /// Get supervisor events receiver
1106    ///
1107    /// Note: This can only be called once; the receiver is moved to the caller.
1108    pub async fn take_supervisor_events(
1109        &self,
1110    ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1111        if let Some(supervisor) = &self.container_supervisor {
1112            supervisor.take_event_receiver().await
1113        } else {
1114            None
1115        }
1116    }
1117
1118    // ==================== Dependency Orchestration ====================
1119
1120    /// Deploy multiple services respecting their dependency order
1121    ///
1122    /// This method:
1123    /// 1. Builds a dependency graph from the services
1124    /// 2. Validates no cycles exist
1125    /// 3. Computes topological order (services with no deps first)
1126    /// 4. For each service in order, waits for dependencies then starts the service
1127    ///
1128    /// # Arguments
1129    /// * `services` - Map of service name to service specification
1130    ///
1131    /// # Errors
1132    /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1133    /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1134    pub async fn deploy_with_dependencies(
1135        &self,
1136        services: HashMap<String, ServiceSpec>,
1137    ) -> Result<()> {
1138        if services.is_empty() {
1139            return Ok(());
1140        }
1141
1142        // Build dependency graph
1143        let graph = DependencyGraph::build(&services)?;
1144
1145        tracing::info!(
1146            service_count = services.len(),
1147            "Starting deployment with dependency ordering"
1148        );
1149
1150        // Get startup order
1151        let order = graph.startup_order();
1152        tracing::debug!(order = ?order, "Computed startup order");
1153
1154        // Start services in dependency order
1155        for service_name in order {
1156            let service_spec = services
1157                .get(service_name)
1158                .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1159
1160            // Wait for dependencies first
1161            if !service_spec.depends.is_empty() {
1162                tracing::info!(
1163                    service = %service_name,
1164                    dependency_count = service_spec.depends.len(),
1165                    "Waiting for dependencies"
1166                );
1167                self.wait_for_dependencies(service_name, &service_spec.depends)
1168                    .await?;
1169            }
1170
1171            // Register and start service
1172            tracing::info!(service = %service_name, "Starting service");
1173            self.upsert_service(service_name.clone(), service_spec.clone())
1174                .await?;
1175
1176            // Get the desired replica count from scale config
1177            let replicas = match &service_spec.scale {
1178                zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1179                zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1180                zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1181            };
1182            self.scale_service(service_name, replicas).await?;
1183
1184            // Mark service as started in health states (Unknown until health check runs)
1185            self.update_health_state(service_name, HealthState::Unknown)
1186                .await;
1187
1188            tracing::info!(
1189                service = %service_name,
1190                replicas = replicas,
1191                "Service started"
1192            );
1193        }
1194
1195        tracing::info!(service_count = services.len(), "Deployment complete");
1196
1197        Ok(())
1198    }
1199
1200    /// Wait for all dependencies of a service to be satisfied
1201    ///
1202    /// # Arguments
1203    /// * `service` - Name of the service waiting for dependencies
1204    /// * `deps` - Slice of dependency specifications
1205    ///
1206    /// # Errors
1207    /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1208    async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1209        let condition_checker = DependencyConditionChecker::new(
1210            Arc::clone(&self.runtime),
1211            Arc::clone(&self.health_states),
1212            None,
1213        );
1214
1215        let waiter = DependencyWaiter::new(condition_checker);
1216        let results = waiter.wait_for_all(deps).await?;
1217
1218        // Check results for failures
1219        for result in results {
1220            match result {
1221                WaitResult::TimedOutFail {
1222                    service: dep_service,
1223                    condition,
1224                    timeout,
1225                } => {
1226                    return Err(AgentError::DependencyTimeout {
1227                        service: service.to_string(),
1228                        dependency: dep_service,
1229                        condition: format!("{condition:?}"),
1230                        timeout,
1231                    });
1232                }
1233                WaitResult::TimedOutWarn {
1234                    service: dep_service,
1235                    condition,
1236                } => {
1237                    tracing::warn!(
1238                        service = %service,
1239                        dependency = %dep_service,
1240                        condition = ?condition,
1241                        "Dependency timed out but continuing"
1242                    );
1243                }
1244                WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1245                    // Continue silently
1246                }
1247            }
1248        }
1249
1250        Ok(())
1251    }
1252
1253    /// Check if all dependencies for a service are currently satisfied
1254    ///
1255    /// This is a one-shot check (no waiting). Useful for pre-flight validation.
1256    ///
1257    /// # Errors
1258    /// Returns an error if a dependency check fails unexpectedly.
1259    pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1260        let condition_checker = DependencyConditionChecker::new(
1261            Arc::clone(&self.runtime),
1262            Arc::clone(&self.health_states),
1263            None,
1264        );
1265
1266        for dep in deps {
1267            if !condition_checker.check(dep).await? {
1268                return Ok(false);
1269            }
1270        }
1271
1272        Ok(true)
1273    }
1274
1275    /// Add or update a workload (service, job, or cron)
1276    ///
1277    /// This method handles different resource types appropriately:
1278    /// - **Service**: Traditional long-running containers with scaling and health checks
1279    /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
1280    /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
1281    ///
1282    /// # Errors
1283    /// Returns an error if service creation, scaling, or cron registration fails.
1284    pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1285        match spec.rtype {
1286            ResourceType::Service => {
1287                // Long-running service: create/update instance
1288                let mut services = self.services.write().await;
1289
1290                if let Some(instance) = services.get_mut(&name) {
1291                    // Update existing service
1292                    instance.spec = spec;
1293                    // Update DNS server if configured
1294                    if let Some(dns) = &self.dns_server {
1295                        instance.set_dns_server(Arc::clone(dns));
1296                    }
1297                } else {
1298                    // Create new service with proxy manager for health-aware load balancing
1299                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1300                    let mut instance = if let Some(proxy) = &self.proxy_manager {
1301                        ServiceInstance::with_proxy(
1302                            name.clone(),
1303                            spec,
1304                            self.runtime.clone(),
1305                            overlay,
1306                            Arc::clone(proxy),
1307                        )
1308                    } else {
1309                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1310                    };
1311                    // Set DNS server if configured
1312                    if let Some(dns) = &self.dns_server {
1313                        instance.set_dns_server(Arc::clone(dns));
1314                    }
1315                    // Wire shared health states so callbacks bridge back to ServiceManager
1316                    instance.set_health_states(Arc::clone(&self.health_states));
1317                    // Register HTTP routes via proxy manager
1318                    if let Some(proxy) = &self.proxy_manager {
1319                        proxy.add_service(&name, &instance.spec).await;
1320                    }
1321                    // Register TCP/UDP endpoints in stream registry
1322                    if let Some(stream_registry) = &self.stream_registry {
1323                        for endpoint in &instance.spec.endpoints {
1324                            let svc = StreamService::new(
1325                                name.clone(),
1326                                Vec::new(), // No backends yet; added on scale-up
1327                            );
1328                            match endpoint.protocol {
1329                                Protocol::Tcp => {
1330                                    stream_registry.register_tcp(endpoint.port, svc);
1331                                    tracing::debug!(
1332                                        service = %name,
1333                                        port = endpoint.port,
1334                                        "Registered TCP stream route"
1335                                    );
1336                                }
1337                                Protocol::Udp => {
1338                                    stream_registry.register_udp(endpoint.port, svc);
1339                                    tracing::debug!(
1340                                        service = %name,
1341                                        port = endpoint.port,
1342                                        "Registered UDP stream route"
1343                                    );
1344                                }
1345                                _ => {} // HTTP routes handled by proxy manager
1346                            }
1347                        }
1348                    }
1349                    services.insert(name, instance);
1350                }
1351            }
1352            ResourceType::Job => {
1353                // Job: Just store the spec for later triggering
1354                // Jobs don't start containers immediately; they're triggered on-demand
1355                if let Some(executor) = &self.job_executor {
1356                    executor.register_job(&name, spec).await;
1357                    tracing::info!(job = %name, "Registered job spec");
1358                } else {
1359                    tracing::warn!(
1360                        job = %name,
1361                        "Job executor not configured, storing as service for reference"
1362                    );
1363                    // Fallback: store as service instance for reference
1364                    let mut services = self.services.write().await;
1365                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1366                    let mut instance = if let Some(proxy) = &self.proxy_manager {
1367                        ServiceInstance::with_proxy(
1368                            name.clone(),
1369                            spec,
1370                            self.runtime.clone(),
1371                            overlay,
1372                            Arc::clone(proxy),
1373                        )
1374                    } else {
1375                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1376                    };
1377                    // Set DNS server if configured
1378                    if let Some(dns) = &self.dns_server {
1379                        instance.set_dns_server(Arc::clone(dns));
1380                    }
1381                    services.insert(name, instance);
1382                }
1383            }
1384            ResourceType::Cron => {
1385                // Cron: Register with the cron scheduler
1386                if let Some(scheduler) = &self.cron_scheduler {
1387                    scheduler.register(&name, &spec).await?;
1388                    tracing::info!(cron = %name, "Registered cron job with scheduler");
1389                } else {
1390                    return Err(AgentError::Configuration(format!(
1391                        "Cron scheduler not configured for cron job '{name}'"
1392                    )));
1393                }
1394            }
1395        }
1396
1397        Ok(())
1398    }
1399
1400    /// Update backend addresses via `ProxyManager` after scaling
1401    async fn update_proxy_backends(&self, service_name: &str, addrs: Vec<SocketAddr>) {
1402        if let Some(proxy) = &self.proxy_manager {
1403            proxy.update_backends(service_name, addrs).await;
1404        }
1405    }
1406
1407    /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
1408    ///
1409    /// For containers with a port override (macOS sandbox), the addresses already
1410    /// carry the runtime-assigned port. In that case, the container listens on the
1411    /// override port for all traffic, so we use the address port directly. For
1412    /// containers without a port override (Linux, VMs), we reconstruct addresses
1413    /// using the endpoint's declared port, since each container has its own IP
1414    /// and can bind any port independently.
1415    fn update_stream_backends(&self, spec: &ServiceSpec, addrs: &[SocketAddr]) {
1416        let Some(stream_registry) = &self.stream_registry else {
1417            return;
1418        };
1419
1420        // Determine if any addresses have a port override by checking whether
1421        // all addresses use the same port as the primary spec endpoint. If not,
1422        // they carry per-container port overrides and should be used as-is.
1423        let primary_spec_port = spec
1424            .endpoints
1425            .iter()
1426            .find(|ep| {
1427                matches!(
1428                    ep.protocol,
1429                    Protocol::Http | Protocol::Https | Protocol::Websocket
1430                )
1431            })
1432            .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1433
1434        let has_port_overrides = addrs.iter().any(|addr| addr.port() != primary_spec_port);
1435
1436        for endpoint in &spec.endpoints {
1437            match endpoint.protocol {
1438                Protocol::Tcp => {
1439                    let tcp_backends: Vec<SocketAddr> = if has_port_overrides {
1440                        // Port overrides active (macOS sandbox): the container listens
1441                        // on its assigned port for all traffic. Use addresses as-is.
1442                        addrs.to_vec()
1443                    } else {
1444                        // Normal case: each container has its own IP, construct
1445                        // addresses using the TCP endpoint's container target port.
1446                        addrs
1447                            .iter()
1448                            .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1449                            .collect()
1450                    };
1451
1452                    stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
1453
1454                    tracing::debug!(
1455                        endpoint = %endpoint.name,
1456                        port = endpoint.port,
1457                        backend_count = addrs.len(),
1458                        "Updated TCP stream backends"
1459                    );
1460                }
1461                Protocol::Udp => {
1462                    let udp_backends: Vec<SocketAddr> = if has_port_overrides {
1463                        addrs.to_vec()
1464                    } else {
1465                        addrs
1466                            .iter()
1467                            .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1468                            .collect()
1469                    };
1470
1471                    stream_registry.update_udp_backends(endpoint.port, udp_backends);
1472
1473                    tracing::debug!(
1474                        endpoint = %endpoint.name,
1475                        port = endpoint.port,
1476                        backend_count = addrs.len(),
1477                        "Updated UDP stream backends"
1478                    );
1479                }
1480                _ => {} // HTTP endpoints handled by update_proxy_backends
1481            }
1482        }
1483    }
1484
1485    /// Scale a service to desired replica count
1486    ///
1487    /// # Errors
1488    /// Returns an error if the service is not found or scaling fails.
1489    #[allow(clippy::cast_possible_truncation)]
1490    pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
1491        let _permit = self.scale_semaphore.acquire().await;
1492
1493        let services = self.services.read().await;
1494        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1495            container: name.to_string(),
1496            reason: "service not found".to_string(),
1497        })?;
1498
1499        // Get current replica count before scaling
1500        let current_replicas = instance.replica_count().await as u32;
1501
1502        // Perform the scaling operation
1503        instance.scale_to(replicas).await?;
1504
1505        // After scaling, update proxy backends with new container addresses
1506        // Note: In a real implementation, we would get actual container IPs
1507        // from the overlay network or container runtime. For now, we construct
1508        // backend addresses based on the endpoint port and localhost (for same-node).
1509        // TODO: Get actual container addresses from overlay_manager or runtime
1510        let addrs = self.collect_backend_addrs(instance, replicas).await;
1511
1512        // Update HTTP backends via ProxyManager
1513        if self.proxy_manager.is_some() && !addrs.is_empty() {
1514            self.update_proxy_backends(name, addrs.clone()).await;
1515        }
1516
1517        // Update TCP/UDP backends in StreamRegistry
1518        if self.stream_registry.is_some() {
1519            self.update_stream_backends(&instance.spec, &addrs);
1520        }
1521
1522        // Register new containers with supervisor for crash monitoring
1523        if let Some(supervisor) = &self.container_supervisor {
1524            // For scale-up, register new containers
1525            if replicas > current_replicas {
1526                for i in current_replicas..replicas {
1527                    let container_id = ContainerId {
1528                        service: name.to_string(),
1529                        replica: i + 1,
1530                    };
1531                    supervisor.supervise(&container_id, &instance.spec).await;
1532                }
1533            }
1534            // For scale-down, unregister removed containers
1535            if replicas < current_replicas {
1536                for i in replicas..current_replicas {
1537                    let container_id = ContainerId {
1538                        service: name.to_string(),
1539                        replica: i + 1,
1540                    };
1541                    supervisor.unsupervise(&container_id).await;
1542                }
1543            }
1544        }
1545
1546        Ok(())
1547    }
1548
1549    /// Collect backend addresses for a service's containers
1550    ///
1551    /// This queries the service instance's containers for their overlay network
1552    /// IP addresses and constructs backend addresses using those IPs with the
1553    /// service's endpoint port.
1554    ///
1555    /// If a container has a `port_override` (e.g., macOS sandbox where all
1556    /// containers share the host network), that port is used instead of the
1557    /// spec-declared endpoint port. This allows multiple replicas on the same
1558    /// IP (`127.0.0.1`) to be distinguished by port.
1559    async fn collect_backend_addrs(
1560        &self,
1561        instance: &ServiceInstance,
1562        _replicas: u32, // No longer needed - we iterate containers directly
1563    ) -> Vec<SocketAddr> {
1564        let mut addrs = Vec::new();
1565
1566        // Get the primary container target port (first HTTP endpoint) as the default
1567        let spec_port = instance
1568            .spec
1569            .endpoints
1570            .iter()
1571            .find(|ep| {
1572                matches!(
1573                    ep.protocol,
1574                    Protocol::Http | Protocol::Https | Protocol::Websocket
1575                )
1576            })
1577            .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1578
1579        // Collect backend addresses from containers with overlay IPs
1580        let containers = instance.containers().read().await;
1581
1582        for container in containers.values() {
1583            if let Some(ip) = container.overlay_ip {
1584                // Use the runtime-assigned port override if present (macOS sandbox),
1585                // otherwise fall back to the spec-declared endpoint port.
1586                let port = container.port_override.unwrap_or(spec_port);
1587                addrs.push(SocketAddr::new(ip, port));
1588            }
1589        }
1590
1591        // If no overlay IPs available, this might be Docker runtime or failed attachments
1592        // Log a warning but don't fallback to localhost in production
1593        if addrs.is_empty() && !containers.is_empty() {
1594            tracing::warn!(
1595                service = %instance.service_name,
1596                container_count = containers.len(),
1597                "no overlay IPs available for backends - containers may not be reachable via proxy"
1598            );
1599        }
1600
1601        addrs
1602    }
1603
1604    /// Get service replica count
1605    ///
1606    /// # Errors
1607    /// Returns an error if the service is not found.
1608    pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
1609        let services = self.services.read().await;
1610        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1611            container: name.to_string(),
1612            reason: "service not found".to_string(),
1613        })?;
1614
1615        Ok(instance.replica_count().await)
1616    }
1617
1618    /// Remove a workload (service, job, or cron)
1619    ///
1620    /// This method handles cleanup for different resource types:
1621    /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
1622    /// - **Job**: Unregisters from job executor
1623    /// - **Cron**: Unregisters from cron scheduler
1624    ///
1625    /// # Errors
1626    /// Returns an error if the service cannot be removed or scale-down fails.
1627    pub async fn remove_service(&self, name: &str) -> Result<()> {
1628        // Try to unregister from cron scheduler first
1629        if let Some(scheduler) = &self.cron_scheduler {
1630            scheduler.unregister(name).await;
1631        }
1632
1633        // Try to unregister from job executor
1634        if let Some(executor) = &self.job_executor {
1635            executor.unregister_job(name).await;
1636        }
1637
1638        // Unregister stream routes (TCP/UDP) from the stream registry
1639        if let Some(stream_registry) = &self.stream_registry {
1640            // Need to get the service spec to know which ports to unregister
1641            let services = self.services.read().await;
1642            if let Some(instance) = services.get(name) {
1643                for endpoint in &instance.spec.endpoints {
1644                    match endpoint.protocol {
1645                        Protocol::Tcp => {
1646                            let _ = stream_registry.unregister_tcp(endpoint.port);
1647                            tracing::debug!(
1648                                service = %name,
1649                                port = endpoint.port,
1650                                "Unregistered TCP stream route"
1651                            );
1652                        }
1653                        Protocol::Udp => {
1654                            let _ = stream_registry.unregister_udp(endpoint.port);
1655                            tracing::debug!(
1656                                service = %name,
1657                                port = endpoint.port,
1658                                "Unregistered UDP stream route"
1659                            );
1660                        }
1661                        _ => {} // HTTP routes handled above
1662                    }
1663                }
1664            }
1665            drop(services); // Release read lock
1666        }
1667
1668        // Unregister containers from the supervisor
1669        if let Some(supervisor) = &self.container_supervisor {
1670            let containers = self.get_service_containers(name).await;
1671            for container_id in containers {
1672                supervisor.unsupervise(&container_id).await;
1673            }
1674            tracing::debug!(service = %name, "Unregistered containers from supervisor");
1675        }
1676
1677        // Clean up DNS records for the service
1678        if let Some(dns) = &self.dns_server {
1679            // Remove the service-level DNS entry
1680            let service_hostname = format!("{name}.service.local");
1681            if let Err(e) = dns.remove_record(&service_hostname).await {
1682                tracing::warn!(
1683                    hostname = %service_hostname,
1684                    error = %e,
1685                    "failed to remove service DNS record"
1686                );
1687            } else {
1688                tracing::debug!(
1689                    hostname = %service_hostname,
1690                    "removed service DNS record"
1691                );
1692            }
1693
1694            // Also remove any remaining replica-specific DNS entries
1695            let services = self.services.read().await;
1696            if let Some(instance) = services.get(name) {
1697                let containers = instance.containers().read().await;
1698                for (id, _) in containers.iter() {
1699                    let replica_hostname = format!("{}.{}.service.local", id.replica, name);
1700                    if let Err(e) = dns.remove_record(&replica_hostname).await {
1701                        tracing::warn!(
1702                            hostname = %replica_hostname,
1703                            error = %e,
1704                            "failed to remove replica DNS record during service removal"
1705                        );
1706                    }
1707                }
1708            }
1709            drop(services); // Release read lock before write lock
1710        }
1711
1712        // Remove from services map (may or may not exist depending on rtype)
1713        let mut services = self.services.write().await;
1714        if services.remove(name).is_some() {
1715            tracing::debug!(service = %name, "Removed service from manager");
1716        }
1717
1718        Ok(())
1719    }
1720
1721    /// Introspect service infrastructure wiring.
1722    /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
1723    pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
1724        let services = self.services.read().await;
1725        services.get(name).map(|i| {
1726            (
1727                i.has_overlay_manager(),
1728                i.has_proxy_manager(),
1729                i.has_dns_server(),
1730            )
1731        })
1732    }
1733
1734    /// List all services
1735    pub async fn list_services(&self) -> Vec<String> {
1736        self.services.read().await.keys().cloned().collect()
1737    }
1738
1739    /// Get logs for a service, aggregated from all container replicas.
1740    ///
1741    /// # Arguments
1742    /// * `service_name` - Name of the service to fetch logs for
1743    /// * `tail` - Number of lines to return per container (0 = all)
1744    /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
1745    ///
1746    /// # Errors
1747    /// Returns an error if the service or instance is not found.
1748    ///
1749    /// # Returns
1750    /// Structured log entries from all (or specific) container replicas. Each
1751    /// entry has its `service` and `deployment` fields populated when available.
1752    pub async fn get_service_logs(
1753        &self,
1754        service_name: &str,
1755        tail: usize,
1756        instance: Option<&str>,
1757    ) -> Result<Vec<LogEntry>> {
1758        let container_ids = self.get_service_containers(service_name).await;
1759
1760        if container_ids.is_empty() {
1761            return Err(AgentError::NotFound {
1762                container: service_name.to_string(),
1763                reason: "no containers found for service".to_string(),
1764            });
1765        }
1766
1767        // If a specific instance is requested, filter to just that one
1768        let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
1769            if let Ok(replica_num) = inst.parse::<u32>() {
1770                container_ids
1771                    .iter()
1772                    .filter(|id| id.replica == replica_num)
1773                    .collect()
1774            } else {
1775                // Try matching by full container ID string suffix
1776                container_ids
1777                    .iter()
1778                    .filter(|id| id.to_string().contains(inst))
1779                    .collect()
1780            }
1781        } else {
1782            container_ids.iter().collect()
1783        };
1784
1785        if target_ids.is_empty() {
1786            return Err(AgentError::NotFound {
1787                container: format!("{}/{}", service_name, instance.unwrap_or("?")),
1788                reason: "instance not found".to_string(),
1789            });
1790        }
1791
1792        let mut all_entries: Vec<LogEntry> = Vec::new();
1793
1794        for id in &target_ids {
1795            match self.runtime.container_logs(id, tail).await {
1796                Ok(mut entries) => {
1797                    // Populate service and deployment metadata on each entry
1798                    for entry in &mut entries {
1799                        if entry.service.is_none() {
1800                            entry.service = Some(service_name.to_string());
1801                        }
1802                        if entry.deployment.is_none() {
1803                            entry.deployment.clone_from(&self.deployment_name);
1804                        }
1805                    }
1806                    all_entries.extend(entries);
1807                }
1808                Err(e) => {
1809                    tracing::warn!(
1810                        service = service_name,
1811                        container = %id,
1812                        error = %e,
1813                        "Failed to read container logs"
1814                    );
1815                }
1816            }
1817        }
1818
1819        Ok(all_entries)
1820    }
1821
1822    /// Get all container IDs for a specific service
1823    ///
1824    /// Returns an empty vector if the service doesn't exist.
1825    ///
1826    /// # Arguments
1827    /// * `service_name` - Name of the service to query
1828    ///
1829    /// # Returns
1830    /// Vector of `ContainerIds` for all replicas of the service
1831    pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
1832        let services = self.services.read().await;
1833        if let Some(instance) = services.get(service_name) {
1834            instance.container_ids().await
1835        } else {
1836            Vec::new()
1837        }
1838    }
1839
1840    /// Execute a command inside a running container for a service
1841    ///
1842    /// Picks a specific replica if provided, otherwise uses the first available container.
1843    ///
1844    /// # Arguments
1845    /// * `service_name` - Name of the service
1846    /// * `replica` - Optional replica number to target
1847    /// * `cmd` - Command and arguments to execute
1848    ///
1849    /// # Errors
1850    /// Returns an error if the service or replica is not found, or if exec fails.
1851    ///
1852    /// # Panics
1853    /// Panics if no replica is specified and the container list is unexpectedly empty
1854    /// after the emptiness check (should not happen in practice).
1855    ///
1856    /// # Returns
1857    /// Tuple of (`exit_code`, stdout, stderr)
1858    pub async fn exec_in_container(
1859        &self,
1860        service_name: &str,
1861        replica: Option<u32>,
1862        cmd: &[String],
1863    ) -> Result<(i32, String, String)> {
1864        let container_ids = self.get_service_containers(service_name).await;
1865
1866        if container_ids.is_empty() {
1867            return Err(AgentError::NotFound {
1868                container: service_name.to_string(),
1869                reason: "no containers found for service".to_string(),
1870            });
1871        }
1872
1873        // Pick the target container
1874        let target = if let Some(rep) = replica {
1875            container_ids
1876                .into_iter()
1877                .find(|cid| cid.replica == rep)
1878                .ok_or_else(|| AgentError::NotFound {
1879                    container: format!("{service_name}-rep-{rep}"),
1880                    reason: format!("replica {rep} not found for service"),
1881                })?
1882        } else {
1883            // Use the first container (lowest replica number)
1884            container_ids.into_iter().next().unwrap()
1885        };
1886
1887        self.runtime.exec(&target, cmd).await
1888    }
1889
1890    // ==================== Job Management ====================
1891
1892    /// Trigger a job execution
1893    ///
1894    /// # Arguments
1895    /// * `name` - Name of the registered job
1896    /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
1897    ///
1898    /// # Returns
1899    /// The execution ID for tracking the job
1900    ///
1901    /// # Errors
1902    /// - Returns error if job executor is not configured
1903    /// - Returns error if the job is not registered
1904    pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
1905        let executor = self
1906            .job_executor
1907            .as_ref()
1908            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
1909
1910        let spec = executor
1911            .get_job_spec(name)
1912            .await
1913            .ok_or_else(|| AgentError::NotFound {
1914                container: name.to_string(),
1915                reason: "job not registered".to_string(),
1916            })?;
1917
1918        executor.trigger(name, &spec, trigger).await
1919    }
1920
1921    /// Get the status of a job execution
1922    ///
1923    /// # Arguments
1924    /// * `id` - The execution ID returned from `trigger_job`
1925    ///
1926    /// # Returns
1927    /// The job execution details, or None if not found
1928    pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
1929        if let Some(executor) = &self.job_executor {
1930            executor.get_execution(id).await
1931        } else {
1932            None
1933        }
1934    }
1935
1936    /// List all executions for a specific job
1937    ///
1938    /// # Arguments
1939    /// * `name` - Name of the job
1940    ///
1941    /// # Returns
1942    /// Vector of job executions for the specified job
1943    pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
1944        if let Some(executor) = &self.job_executor {
1945            executor.list_executions(name).await
1946        } else {
1947            Vec::new()
1948        }
1949    }
1950
1951    /// Cancel a running job execution
1952    ///
1953    /// # Arguments
1954    /// * `id` - The execution ID to cancel
1955    ///
1956    /// # Errors
1957    /// Returns error if job executor is not configured or if cancellation fails
1958    pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
1959        let executor = self
1960            .job_executor
1961            .as_ref()
1962            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
1963
1964        executor.cancel(id).await
1965    }
1966
1967    // ==================== Cron Management ====================
1968
1969    /// Manually trigger a cron job (outside of its schedule)
1970    ///
1971    /// # Arguments
1972    /// * `name` - Name of the cron job
1973    ///
1974    /// # Returns
1975    /// The execution ID for tracking the triggered job
1976    ///
1977    /// # Errors
1978    /// Returns error if cron scheduler is not configured or job not found
1979    pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
1980        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
1981            AgentError::Configuration("Cron scheduler not configured".to_string())
1982        })?;
1983
1984        scheduler.trigger_now(name).await
1985    }
1986
1987    /// Enable or disable a cron job
1988    ///
1989    /// # Arguments
1990    /// * `name` - Name of the cron job
1991    /// * `enabled` - Whether to enable or disable the job
1992    pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
1993        if let Some(scheduler) = &self.cron_scheduler {
1994            scheduler.set_enabled(name, enabled).await;
1995        }
1996    }
1997
1998    /// List all registered cron jobs
1999    pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2000        if let Some(scheduler) = &self.cron_scheduler {
2001            scheduler.list_jobs().await
2002        } else {
2003            Vec::new()
2004        }
2005    }
2006
2007    /// Start the cron scheduler background task
2008    ///
2009    /// This spawns a background task that checks for due cron jobs every second.
2010    /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
2011    ///
2012    /// # Errors
2013    /// Returns error if cron scheduler is not configured
2014    pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2015        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2016            AgentError::Configuration("Cron scheduler not configured".to_string())
2017        })?;
2018
2019        let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2020        Ok(tokio::spawn(async move {
2021            scheduler.run_loop().await;
2022        }))
2023    }
2024
2025    /// Shutdown the cron scheduler
2026    pub fn shutdown_cron(&self) {
2027        if let Some(scheduler) = &self.cron_scheduler {
2028            scheduler.shutdown();
2029        }
2030    }
2031}
2032
2033#[cfg(test)]
2034#[allow(deprecated)]
2035mod tests {
2036    use super::*;
2037    use crate::runtime::MockRuntime;
2038
2039    #[tokio::test]
2040    async fn test_service_manager() {
2041        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2042        let manager = ServiceManager::new(runtime);
2043
2044        // Add service
2045        let spec = mock_spec();
2046        manager
2047            .upsert_service("test".to_string(), spec)
2048            .await
2049            .unwrap();
2050
2051        // Scale up
2052        manager.scale_service("test", 3).await.unwrap();
2053
2054        // Check count
2055        let count = manager.service_replica_count("test").await.unwrap();
2056        assert_eq!(count, 3);
2057
2058        // List services
2059        let services = manager.list_services().await;
2060        assert_eq!(services, vec!["test".to_string()]);
2061    }
2062
2063    #[tokio::test]
2064    async fn test_service_manager_basic_lifecycle() {
2065        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2066        let manager = ServiceManager::new(runtime);
2067
2068        // Add service with HTTP endpoint
2069        let spec = mock_spec();
2070        manager
2071            .upsert_service("api".to_string(), spec)
2072            .await
2073            .unwrap();
2074
2075        // Scale up
2076        manager.scale_service("api", 2).await.unwrap();
2077
2078        // Check count
2079        let count = manager.service_replica_count("api").await.unwrap();
2080        assert_eq!(count, 2);
2081
2082        // Remove service
2083        manager.remove_service("api").await.unwrap();
2084
2085        // Verify service is gone
2086        let services = manager.list_services().await;
2087        assert!(!services.contains(&"api".to_string()));
2088    }
2089
2090    #[tokio::test]
2091    async fn test_service_manager_with_full_config() {
2092        use tokio::sync::RwLock;
2093
2094        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2095
2096        // Create a mock overlay manager (skip actual network setup)
2097        let overlay_manager = Arc::new(RwLock::new(
2098            OverlayManager::new("test-deployment".to_string())
2099                .await
2100                .unwrap(),
2101        ));
2102
2103        let manager =
2104            ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2105
2106        // Add service
2107        let spec = mock_spec();
2108        manager
2109            .upsert_service("web".to_string(), spec)
2110            .await
2111            .unwrap();
2112
2113        // Verify service is registered
2114        let services = manager.list_services().await;
2115        assert!(services.contains(&"web".to_string()));
2116    }
2117
2118    fn mock_spec() -> ServiceSpec {
2119        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2120            r"
2121version: v1
2122deployment: test
2123services:
2124  test:
2125    rtype: service
2126    image:
2127      name: test:latest
2128    endpoints:
2129      - name: http
2130        protocol: http
2131        port: 8080
2132    scale:
2133      mode: fixed
2134      replicas: 1
2135",
2136        )
2137        .unwrap()
2138        .services
2139        .remove("test")
2140        .unwrap()
2141    }
2142
2143    /// Helper to create a `ServiceSpec` with dependencies
2144    fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
2145        let mut spec = mock_spec();
2146        spec.depends = deps;
2147        spec
2148    }
2149
2150    /// Helper to create a `DependsSpec`
2151    fn dep(
2152        service: &str,
2153        condition: zlayer_spec::DependencyCondition,
2154        timeout_ms: u64,
2155        on_timeout: zlayer_spec::TimeoutAction,
2156    ) -> DependsSpec {
2157        DependsSpec {
2158            service: service.to_string(),
2159            condition,
2160            timeout: Some(Duration::from_millis(timeout_ms)),
2161            on_timeout,
2162        }
2163    }
2164
2165    #[tokio::test]
2166    async fn test_deploy_with_dependencies_no_deps() {
2167        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2168        let manager = ServiceManager::new(runtime);
2169
2170        // Services with no dependencies
2171        let mut services = HashMap::new();
2172        services.insert("a".to_string(), mock_spec());
2173        services.insert("b".to_string(), mock_spec());
2174
2175        // Should deploy both without issue
2176        manager.deploy_with_dependencies(services).await.unwrap();
2177
2178        // Both services should be registered
2179        let service_list = manager.list_services().await;
2180        assert_eq!(service_list.len(), 2);
2181    }
2182
2183    #[tokio::test]
2184    async fn test_deploy_with_dependencies_linear() {
2185        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2186        let manager = ServiceManager::new(runtime);
2187
2188        // A -> B -> C (A depends on B, B depends on C)
2189        // All use "started" condition which is satisfied when container is running
2190        let mut services = HashMap::new();
2191        services.insert("c".to_string(), mock_spec());
2192        services.insert(
2193            "b".to_string(),
2194            mock_spec_with_deps(vec![dep(
2195                "c",
2196                zlayer_spec::DependencyCondition::Started,
2197                5000,
2198                zlayer_spec::TimeoutAction::Fail,
2199            )]),
2200        );
2201        services.insert(
2202            "a".to_string(),
2203            mock_spec_with_deps(vec![dep(
2204                "b",
2205                zlayer_spec::DependencyCondition::Started,
2206                5000,
2207                zlayer_spec::TimeoutAction::Fail,
2208            )]),
2209        );
2210
2211        // Should deploy in order: c, b, a
2212        manager.deploy_with_dependencies(services).await.unwrap();
2213
2214        // All services should be registered
2215        let service_list = manager.list_services().await;
2216        assert_eq!(service_list.len(), 3);
2217    }
2218
2219    #[tokio::test]
2220    async fn test_deploy_with_dependencies_cycle_detection() {
2221        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2222        let manager = ServiceManager::new(runtime);
2223
2224        // A -> B -> A (cycle)
2225        let mut services = HashMap::new();
2226        services.insert(
2227            "a".to_string(),
2228            mock_spec_with_deps(vec![dep(
2229                "b",
2230                zlayer_spec::DependencyCondition::Started,
2231                5000,
2232                zlayer_spec::TimeoutAction::Fail,
2233            )]),
2234        );
2235        services.insert(
2236            "b".to_string(),
2237            mock_spec_with_deps(vec![dep(
2238                "a",
2239                zlayer_spec::DependencyCondition::Started,
2240                5000,
2241                zlayer_spec::TimeoutAction::Fail,
2242            )]),
2243        );
2244
2245        // Should fail with cycle detection
2246        let result = manager.deploy_with_dependencies(services).await;
2247        assert!(result.is_err());
2248        let err = result.unwrap_err().to_string();
2249        assert!(err.contains("Cyclic dependency"));
2250    }
2251
2252    #[tokio::test]
2253    async fn test_deploy_with_dependencies_timeout_continue() {
2254        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2255        let manager = ServiceManager::new(runtime);
2256
2257        // A depends on B (healthy), but B never becomes healthy
2258        // Using continue action, so it should proceed anyway
2259        let mut services = HashMap::new();
2260        services.insert("b".to_string(), mock_spec());
2261        services.insert(
2262            "a".to_string(),
2263            mock_spec_with_deps(vec![dep(
2264                "b",
2265                zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
2266                100,                                       // Short timeout
2267                zlayer_spec::TimeoutAction::Continue,      // But continue anyway
2268            )]),
2269        );
2270
2271        // Should deploy both despite timeout
2272        manager.deploy_with_dependencies(services).await.unwrap();
2273
2274        let service_list = manager.list_services().await;
2275        assert_eq!(service_list.len(), 2);
2276    }
2277
2278    #[tokio::test]
2279    async fn test_deploy_with_dependencies_timeout_warn() {
2280        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2281        let manager = ServiceManager::new(runtime);
2282
2283        // A depends on B (healthy), but B never becomes healthy
2284        // Using warn action, so it should proceed with a warning
2285        let mut services = HashMap::new();
2286        services.insert("b".to_string(), mock_spec());
2287        services.insert(
2288            "a".to_string(),
2289            mock_spec_with_deps(vec![dep(
2290                "b",
2291                zlayer_spec::DependencyCondition::Healthy,
2292                100,
2293                zlayer_spec::TimeoutAction::Warn,
2294            )]),
2295        );
2296
2297        // Should deploy both despite timeout (with warning)
2298        manager.deploy_with_dependencies(services).await.unwrap();
2299
2300        let service_list = manager.list_services().await;
2301        assert_eq!(service_list.len(), 2);
2302    }
2303
2304    #[tokio::test]
2305    async fn test_deploy_with_dependencies_timeout_fail() {
2306        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2307        let manager = ServiceManager::new(runtime);
2308
2309        // A depends on B (healthy), but B never becomes healthy
2310        // Using fail action, so deployment should fail
2311        let mut services = HashMap::new();
2312        services.insert("b".to_string(), mock_spec());
2313        services.insert(
2314            "a".to_string(),
2315            mock_spec_with_deps(vec![dep(
2316                "b",
2317                zlayer_spec::DependencyCondition::Healthy,
2318                100,
2319                zlayer_spec::TimeoutAction::Fail,
2320            )]),
2321        );
2322
2323        // Should fail after B is started but doesn't become healthy
2324        let result = manager.deploy_with_dependencies(services).await;
2325        assert!(result.is_err());
2326
2327        // B should be started (it has no deps), but A should fail
2328        let err = result.unwrap_err().to_string();
2329        assert!(err.contains("Dependency timeout"));
2330    }
2331
2332    #[tokio::test]
2333    async fn test_check_dependencies_all_satisfied() {
2334        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2335        let manager = ServiceManager::new(runtime);
2336
2337        // Mark a service as healthy
2338        manager
2339            .update_health_state("db", HealthState::Healthy)
2340            .await;
2341
2342        let deps = vec![DependsSpec {
2343            service: "db".to_string(),
2344            condition: zlayer_spec::DependencyCondition::Healthy,
2345            timeout: Some(Duration::from_secs(60)),
2346            on_timeout: zlayer_spec::TimeoutAction::Fail,
2347        }];
2348
2349        let satisfied = manager.check_dependencies(&deps).await.unwrap();
2350        assert!(satisfied);
2351    }
2352
2353    #[tokio::test]
2354    async fn test_check_dependencies_not_satisfied() {
2355        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2356        let manager = ServiceManager::new(runtime);
2357
2358        // Service not healthy (no state set = Unknown)
2359        let deps = vec![DependsSpec {
2360            service: "db".to_string(),
2361            condition: zlayer_spec::DependencyCondition::Healthy,
2362            timeout: Some(Duration::from_secs(60)),
2363            on_timeout: zlayer_spec::TimeoutAction::Fail,
2364        }];
2365
2366        let satisfied = manager.check_dependencies(&deps).await.unwrap();
2367        assert!(!satisfied);
2368    }
2369
2370    #[tokio::test]
2371    async fn test_health_state_tracking() {
2372        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2373        let manager = ServiceManager::new(runtime);
2374
2375        // Update health states
2376        manager
2377            .update_health_state("db", HealthState::Healthy)
2378            .await;
2379        manager
2380            .update_health_state("cache", HealthState::Unknown)
2381            .await;
2382
2383        // Verify states
2384        let states = manager.health_states();
2385        let states_read = states.read().await;
2386
2387        assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
2388        assert!(matches!(
2389            states_read.get("cache"),
2390            Some(HealthState::Unknown)
2391        ));
2392    }
2393
2394    // ==================== Job/Cron Integration Tests ====================
2395
2396    fn mock_job_spec() -> ServiceSpec {
2397        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2398            r"
2399version: v1
2400deployment: test
2401services:
2402  backup:
2403    rtype: job
2404    image:
2405      name: backup:latest
2406",
2407        )
2408        .unwrap()
2409        .services
2410        .remove("backup")
2411        .unwrap()
2412    }
2413
2414    fn mock_cron_spec() -> ServiceSpec {
2415        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2416            r#"
2417version: v1
2418deployment: test
2419services:
2420  cleanup:
2421    rtype: cron
2422    schedule: "0 0 * * * * *"
2423    image:
2424      name: cleanup:latest
2425"#,
2426        )
2427        .unwrap()
2428        .services
2429        .remove("cleanup")
2430        .unwrap()
2431    }
2432
2433    #[tokio::test]
2434    async fn test_service_manager_with_job_executor() {
2435        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2436        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2437
2438        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2439
2440        // Register job
2441        let job_spec = mock_job_spec();
2442        manager
2443            .upsert_service("backup".to_string(), job_spec)
2444            .await
2445            .unwrap();
2446
2447        // Trigger job
2448        let exec_id = manager
2449            .trigger_job("backup", JobTrigger::Cli)
2450            .await
2451            .unwrap();
2452
2453        // Give job time to start
2454        tokio::time::sleep(Duration::from_millis(50)).await;
2455
2456        // Check execution exists
2457        let execution = manager.get_job_execution(&exec_id).await;
2458        assert!(execution.is_some());
2459        assert_eq!(execution.unwrap().job_name, "backup");
2460    }
2461
2462    #[tokio::test]
2463    async fn test_service_manager_with_cron_scheduler() {
2464        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2465        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2466        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2467
2468        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2469
2470        // Register cron job
2471        let cron_spec = mock_cron_spec();
2472        manager
2473            .upsert_service("cleanup".to_string(), cron_spec)
2474            .await
2475            .unwrap();
2476
2477        // List cron jobs
2478        let cron_jobs = manager.list_cron_jobs().await;
2479        assert_eq!(cron_jobs.len(), 1);
2480        assert_eq!(cron_jobs[0].name, "cleanup");
2481        assert!(cron_jobs[0].enabled);
2482    }
2483
2484    #[tokio::test]
2485    async fn test_service_manager_trigger_cron() {
2486        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2487        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2488        let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
2489
2490        let manager = ServiceManager::new(runtime)
2491            .with_job_executor(job_executor)
2492            .with_cron_scheduler(cron_scheduler);
2493
2494        // Register cron job
2495        let cron_spec = mock_cron_spec();
2496        manager
2497            .upsert_service("cleanup".to_string(), cron_spec)
2498            .await
2499            .unwrap();
2500
2501        // Manually trigger the cron job
2502        let exec_id = manager.trigger_cron("cleanup").await.unwrap();
2503        assert!(!exec_id.0.is_empty());
2504    }
2505
2506    #[tokio::test]
2507    async fn test_service_manager_enable_disable_cron() {
2508        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2509        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2510        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2511
2512        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2513
2514        // Register cron job
2515        let cron_spec = mock_cron_spec();
2516        manager
2517            .upsert_service("cleanup".to_string(), cron_spec)
2518            .await
2519            .unwrap();
2520
2521        // Initially enabled
2522        let cron_jobs = manager.list_cron_jobs().await;
2523        assert!(cron_jobs[0].enabled);
2524
2525        // Disable
2526        manager.set_cron_enabled("cleanup", false).await;
2527        let cron_jobs = manager.list_cron_jobs().await;
2528        assert!(!cron_jobs[0].enabled);
2529
2530        // Re-enable
2531        manager.set_cron_enabled("cleanup", true).await;
2532        let cron_jobs = manager.list_cron_jobs().await;
2533        assert!(cron_jobs[0].enabled);
2534    }
2535
2536    #[tokio::test]
2537    async fn test_service_manager_remove_cleans_up_job() {
2538        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2539        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2540
2541        let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
2542
2543        // Register job
2544        let job_spec = mock_job_spec();
2545        manager
2546            .upsert_service("backup".to_string(), job_spec)
2547            .await
2548            .unwrap();
2549
2550        // Verify job is registered
2551        let spec = job_executor.get_job_spec("backup").await;
2552        assert!(spec.is_some());
2553
2554        // Remove job
2555        manager.remove_service("backup").await.unwrap();
2556
2557        // Verify job is unregistered
2558        let spec = job_executor.get_job_spec("backup").await;
2559        assert!(spec.is_none());
2560    }
2561
2562    #[tokio::test]
2563    async fn test_service_manager_remove_cleans_up_cron() {
2564        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2565        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2566        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2567
2568        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
2569
2570        // Register cron job
2571        let cron_spec = mock_cron_spec();
2572        manager
2573            .upsert_service("cleanup".to_string(), cron_spec)
2574            .await
2575            .unwrap();
2576
2577        // Verify cron job is registered
2578        assert_eq!(cron_scheduler.job_count().await, 1);
2579
2580        // Remove cron job
2581        manager.remove_service("cleanup").await.unwrap();
2582
2583        // Verify cron job is unregistered
2584        assert_eq!(cron_scheduler.job_count().await, 0);
2585    }
2586
2587    #[tokio::test]
2588    async fn test_service_manager_job_without_executor() {
2589        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2590        let manager = ServiceManager::new(runtime);
2591
2592        // Try to trigger job without executor configured
2593        let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
2594        assert!(result.is_err());
2595        assert!(result.unwrap_err().to_string().contains("not configured"));
2596    }
2597
2598    #[tokio::test]
2599    async fn test_service_manager_cron_without_scheduler() {
2600        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2601        let manager = ServiceManager::new(runtime);
2602
2603        // Try to register cron job without scheduler configured
2604        let cron_spec = mock_cron_spec();
2605        let result = manager
2606            .upsert_service("cleanup".to_string(), cron_spec)
2607            .await;
2608        assert!(result.is_err());
2609        assert!(result.unwrap_err().to_string().contains("not configured"));
2610    }
2611
2612    #[tokio::test]
2613    async fn test_service_manager_list_job_executions() {
2614        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2615        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2616
2617        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2618
2619        // Register job
2620        let job_spec = mock_job_spec();
2621        manager
2622            .upsert_service("backup".to_string(), job_spec)
2623            .await
2624            .unwrap();
2625
2626        // Trigger job twice
2627        manager
2628            .trigger_job("backup", JobTrigger::Cli)
2629            .await
2630            .unwrap();
2631        manager
2632            .trigger_job("backup", JobTrigger::Scheduler)
2633            .await
2634            .unwrap();
2635
2636        // Give jobs time to start
2637        tokio::time::sleep(Duration::from_millis(50)).await;
2638
2639        // List executions
2640        let executions = manager.list_job_executions("backup").await;
2641        assert_eq!(executions.len(), 2);
2642    }
2643
2644    // ==================== Container Supervisor Integration Tests ====================
2645
2646    #[tokio::test]
2647    async fn test_service_manager_with_supervisor() {
2648        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2649        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2650
2651        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2652
2653        // Add service
2654        let spec = mock_spec();
2655        manager
2656            .upsert_service("api".to_string(), spec)
2657            .await
2658            .unwrap();
2659
2660        // Scale up - containers should be registered with supervisor
2661        manager.scale_service("api", 2).await.unwrap();
2662
2663        // Verify containers are supervised
2664        assert_eq!(supervisor.supervised_count().await, 2);
2665
2666        // Scale down - containers should be unregistered
2667        manager.scale_service("api", 1).await.unwrap();
2668        assert_eq!(supervisor.supervised_count().await, 1);
2669
2670        // Remove service - remaining containers should be unregistered
2671        manager.remove_service("api").await.unwrap();
2672        assert_eq!(supervisor.supervised_count().await, 0);
2673    }
2674
2675    #[tokio::test]
2676    async fn test_service_manager_supervisor_state() {
2677        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2678        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2679
2680        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
2681
2682        // Add and scale service
2683        let spec = mock_spec();
2684        manager
2685            .upsert_service("web".to_string(), spec)
2686            .await
2687            .unwrap();
2688        manager.scale_service("web", 1).await.unwrap();
2689
2690        // Check supervised state
2691        let container_id = ContainerId {
2692            service: "web".to_string(),
2693            replica: 1,
2694        };
2695        let state = manager.get_container_supervised_state(&container_id).await;
2696        assert_eq!(state, Some(SupervisedState::Running));
2697    }
2698
2699    #[tokio::test]
2700    async fn test_service_manager_start_supervisor() {
2701        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2702        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2703
2704        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2705
2706        // Start the supervisor
2707        let handle = manager.start_container_supervisor().unwrap();
2708
2709        // Give it time to start
2710        tokio::time::sleep(Duration::from_millis(50)).await;
2711        assert!(supervisor.is_running());
2712
2713        // Shutdown
2714        manager.shutdown_container_supervisor();
2715
2716        // Wait for it to stop
2717        tokio::time::timeout(Duration::from_secs(1), handle)
2718            .await
2719            .unwrap()
2720            .unwrap();
2721
2722        assert!(!supervisor.is_running());
2723    }
2724
2725    #[tokio::test]
2726    async fn test_service_manager_supervisor_not_configured() {
2727        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2728        let manager = ServiceManager::new(runtime);
2729
2730        // Try to start supervisor without configuring it
2731        let result = manager.start_container_supervisor();
2732        assert!(result.is_err());
2733        assert!(result.unwrap_err().to_string().contains("not configured"));
2734    }
2735
2736    // ==================== Stream Registry Integration Tests ====================
2737
2738    fn mock_tcp_spec() -> ServiceSpec {
2739        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2740            r"
2741version: v1
2742deployment: test
2743services:
2744  database:
2745    rtype: service
2746    image:
2747      name: postgres:latest
2748    endpoints:
2749      - name: postgresql
2750        protocol: tcp
2751        port: 5432
2752    scale:
2753      mode: fixed
2754      replicas: 1
2755",
2756        )
2757        .unwrap()
2758        .services
2759        .remove("database")
2760        .unwrap()
2761    }
2762
2763    fn mock_udp_spec() -> ServiceSpec {
2764        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2765            r"
2766version: v1
2767deployment: test
2768services:
2769  dns:
2770    rtype: service
2771    image:
2772      name: dns:latest
2773    endpoints:
2774      - name: dns
2775        protocol: udp
2776        port: 53
2777    scale:
2778      mode: fixed
2779      replicas: 1
2780",
2781        )
2782        .unwrap()
2783        .services
2784        .remove("dns")
2785        .unwrap()
2786    }
2787
2788    fn mock_mixed_spec() -> ServiceSpec {
2789        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2790            r"
2791version: v1
2792deployment: test
2793services:
2794  mixed:
2795    rtype: service
2796    image:
2797      name: mixed:latest
2798    endpoints:
2799      - name: http
2800        protocol: http
2801        port: 8080
2802      - name: grpc
2803        protocol: tcp
2804        port: 9000
2805      - name: metrics
2806        protocol: udp
2807        port: 8125
2808    scale:
2809      mode: fixed
2810      replicas: 1
2811",
2812        )
2813        .unwrap()
2814        .services
2815        .remove("mixed")
2816        .unwrap()
2817    }
2818
2819    #[tokio::test]
2820    async fn test_service_manager_with_stream_registry_tcp() {
2821        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2822        let stream_registry = Arc::new(StreamRegistry::new());
2823
2824        let mut manager = ServiceManager::new(runtime);
2825        manager.set_stream_registry(stream_registry.clone());
2826        manager.set_deployment_name("test".to_string());
2827
2828        // Add TCP-only service
2829        let spec = mock_tcp_spec();
2830        manager
2831            .upsert_service("database".to_string(), spec)
2832            .await
2833            .unwrap();
2834
2835        // Verify TCP route was registered
2836        assert_eq!(stream_registry.tcp_count(), 1);
2837        assert!(stream_registry.tcp_ports().contains(&5432));
2838
2839        // Remove service and verify cleanup
2840        manager.remove_service("database").await.unwrap();
2841        assert_eq!(stream_registry.tcp_count(), 0);
2842    }
2843
2844    #[tokio::test]
2845    async fn test_service_manager_with_stream_registry_udp() {
2846        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2847        let stream_registry = Arc::new(StreamRegistry::new());
2848
2849        let mut manager = ServiceManager::new(runtime);
2850        manager.set_stream_registry(stream_registry.clone());
2851        manager.set_deployment_name("test".to_string());
2852
2853        // Add UDP-only service
2854        let spec = mock_udp_spec();
2855        manager
2856            .upsert_service("dns".to_string(), spec)
2857            .await
2858            .unwrap();
2859
2860        // Verify UDP route was registered
2861        assert_eq!(stream_registry.udp_count(), 1);
2862        assert!(stream_registry.udp_ports().contains(&53));
2863
2864        // Remove service and verify cleanup
2865        manager.remove_service("dns").await.unwrap();
2866        assert_eq!(stream_registry.udp_count(), 0);
2867    }
2868
2869    #[tokio::test]
2870    async fn test_service_manager_with_stream_registry_mixed() {
2871        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2872        let stream_registry = Arc::new(StreamRegistry::new());
2873
2874        let mut manager = ServiceManager::new(runtime);
2875        manager.set_stream_registry(stream_registry.clone());
2876        manager.set_deployment_name("test".to_string());
2877
2878        // Add mixed service (HTTP + TCP + UDP)
2879        let spec = mock_mixed_spec();
2880        manager
2881            .upsert_service("mixed".to_string(), spec)
2882            .await
2883            .unwrap();
2884
2885        // Verify stream routes were registered
2886        assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
2887        assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
2888
2889        assert!(stream_registry.tcp_ports().contains(&9000));
2890        assert!(stream_registry.udp_ports().contains(&8125));
2891
2892        // Remove service and verify stream cleanup
2893        manager.remove_service("mixed").await.unwrap();
2894        assert_eq!(stream_registry.tcp_count(), 0);
2895        assert_eq!(stream_registry.udp_count(), 0);
2896    }
2897
2898    #[tokio::test]
2899    async fn test_service_manager_stream_registry_builder() {
2900        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2901        let stream_registry = Arc::new(StreamRegistry::new());
2902
2903        // Test builder pattern
2904        let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
2905
2906        // Verify stream registry is accessible
2907        assert!(manager.stream_registry().is_some());
2908    }
2909
2910    #[tokio::test]
2911    async fn test_tcp_service_without_stream_registry() {
2912        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2913
2914        // Manager without stream registry
2915        let mut manager = ServiceManager::new(runtime);
2916        manager.set_deployment_name("test".to_string());
2917
2918        // Add TCP service - should log warning but not fail
2919        let spec = mock_tcp_spec();
2920        manager
2921            .upsert_service("database".to_string(), spec)
2922            .await
2923            .unwrap();
2924
2925        // No stream registry to check, but service should be tracked
2926        let services = manager.list_services().await;
2927        assert!(services.contains(&"database".to_string()));
2928    }
2929}