Skip to main content

zlayer_agent/
service.rs

1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6    DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, ResourceType, ServiceSpec};
24
25/// Service instance manages a single service's containers
26pub struct ServiceInstance {
27    pub service_name: String,
28    pub spec: ServiceSpec,
29    runtime: Arc<dyn Runtime + Send + Sync>,
30    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31    /// Overlay network manager for container networking (optional, not needed for Docker runtime)
32    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33    /// Proxy manager for updating backend health (optional)
34    proxy_manager: Option<Arc<ProxyManager>>,
35    /// DNS server for service discovery (optional)
36    dns_server: Option<Arc<DnsServer>>,
37    /// Shared health states map so callbacks can update ServiceManager-level health
38    health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
39}
40
41impl ServiceInstance {
42    /// Create a new service instance
43    pub fn new(
44        service_name: String,
45        spec: ServiceSpec,
46        runtime: Arc<dyn Runtime + Send + Sync>,
47        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
48    ) -> Self {
49        Self {
50            service_name,
51            spec,
52            runtime,
53            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
54            overlay_manager,
55            proxy_manager: None,
56            dns_server: None,
57            health_states: None,
58        }
59    }
60
61    /// Create a new service instance with proxy manager for health-aware load balancing
62    pub fn with_proxy(
63        service_name: String,
64        spec: ServiceSpec,
65        runtime: Arc<dyn Runtime + Send + Sync>,
66        overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
67        proxy_manager: Arc<ProxyManager>,
68    ) -> Self {
69        Self {
70            service_name,
71            spec,
72            runtime,
73            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
74            overlay_manager,
75            proxy_manager: Some(proxy_manager),
76            dns_server: None,
77            health_states: None,
78        }
79    }
80
81    /// Builder method to add DNS server for service discovery
82    #[must_use]
83    pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
84        self.dns_server = Some(dns_server);
85        self
86    }
87
88    /// Set the DNS server for service discovery
89    pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
90        self.dns_server = Some(dns_server);
91    }
92
93    /// Set the proxy manager for health-aware load balancing
94    pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
95        self.proxy_manager = Some(proxy_manager);
96    }
97
98    /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
99    pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
100        self.health_states = Some(states);
101    }
102
103    /// Scale to the desired number of replicas
104    ///
105    /// This method uses short-lived locks to avoid blocking concurrent operations.
106    /// I/O operations (pull, create, start, stop, remove) are performed without
107    /// holding the containers lock to allow other operations to proceed.
108    ///
109    /// # Errors
110    /// Returns an error if image pull, container creation, or container lifecycle operations fail.
111    #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
112    pub async fn scale_to(&self, replicas: u32) -> Result<()> {
113        // Phase 1: Determine current state (short read lock)
114        let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
115
116        // Phase 2: Scale up - create new containers (no lock held during I/O)
117        if replicas > current_replicas {
118            // Pull image ONCE before creating any replicas (cached layers are reused)
119            let image_str = self.spec.image.name.to_string();
120            self.runtime
121                .pull_image_with_policy(&image_str, self.spec.image.pull_policy, None)
122                .await
123                .map_err(|e| AgentError::PullFailed {
124                    image: self.spec.image.name.to_string(),
125                    reason: e.to_string(),
126                })?;
127
128            for i in current_replicas..replicas {
129                let id = ContainerId {
130                    service: self.service_name.clone(),
131                    replica: i + 1,
132                };
133
134                // Create container (no lock needed - I/O operation)
135                //
136                // RouteToPeer must propagate unchanged: the scheduler uses it
137                // to re-place the workload on a capable peer, and wrapping it
138                // in `CreateFailed` would hide the signal and mark the service
139                // dead instead of rescheduling it. All other errors are
140                // normalised to `CreateFailed` for upstream handling.
141                self.runtime
142                    .create_container(&id, &self.spec)
143                    .await
144                    .map_err(|e| match e {
145                        AgentError::RouteToPeer { .. } => e,
146                        other => AgentError::CreateFailed {
147                            id: id.to_string(),
148                            reason: other.to_string(),
149                        },
150                    })?;
151
152                // Run init actions with error policy enforcement (no lock needed)
153                let init_orchestrator = InitOrchestrator::with_error_policy(
154                    id.clone(),
155                    self.spec.init.clone(),
156                    self.spec.errors.clone(),
157                );
158                init_orchestrator.run().await?;
159
160                // Start container (no lock needed - I/O operation)
161                self.runtime
162                    .start_container(&id)
163                    .await
164                    .map_err(|e| AgentError::StartFailed {
165                        id: id.to_string(),
166                        reason: e.to_string(),
167                    })?;
168
169                // Get container PID with retries (may not be immediately available)
170                let mut container_pid = None;
171                for attempt in 1..=5u32 {
172                    match self.runtime.get_container_pid(&id).await {
173                        Ok(Some(pid)) => {
174                            container_pid = Some(pid);
175                            break;
176                        }
177                        Ok(None) if attempt < 5 => {
178                            tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
179                            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
180                        }
181                        Ok(None) => {
182                            tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
183                        }
184                        Err(e) => {
185                            tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
186                            if attempt < 5 {
187                                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
188                            }
189                        }
190                    }
191                }
192
193                // Verify the container is still running before attempting
194                // overlay attach. If the init process crashed during start
195                // (bad image, missing libs, failed mount), the PID above is
196                // now dead and every `ip link set ... netns {pid}` will
197                // return a cryptic RTNETLINK error. Surface the real cause
198                // from the container's log tail instead of the cascade.
199                if container_pid.is_some() {
200                    let alive = match self.runtime.container_state(&id).await {
201                        Ok(
202                            ContainerState::Running
203                            | ContainerState::Pending
204                            | ContainerState::Initializing,
205                        ) => true,
206                        Ok(state) => {
207                            tracing::warn!(
208                                container = %id,
209                                ?state,
210                                "container exited before overlay attach could run"
211                            );
212                            false
213                        }
214                        Err(e) => {
215                            // State query failed — don't block the attach on
216                            // it. The overlay manager's own cleanup-on-error
217                            // path now handles the dead-PID case cleanly.
218                            tracing::warn!(
219                                container = %id,
220                                error = %e,
221                                "container state query failed before overlay attach, proceeding"
222                            );
223                            true
224                        }
225                    };
226                    if !alive {
227                        let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
228                            || "  <log read failed>".to_string(),
229                            |entries| {
230                                if entries.is_empty() {
231                                    "  <no log output>".to_string()
232                                } else {
233                                    entries
234                                        .into_iter()
235                                        .map(|e| format!("  {}", e.message))
236                                        .collect::<Vec<_>>()
237                                        .join("\n")
238                                }
239                            },
240                        );
241                        return Err(AgentError::StartFailed {
242                            id: id.to_string(),
243                            reason: format!("container exited during startup:\n{log_tail}"),
244                        });
245                    }
246                }
247
248                // Attach to overlay network if manager is available.
249                //
250                // Linux uses the container PID to enter the netns and attach a
251                // veth. Windows has no PID-addressable netns — the HCN namespace
252                // GUID (obtained from `get_container_namespace_id`) is used
253                // instead, and the endpoint's IP has already been populated by
254                // `EndpointAttachment::create_overlay` during container creation.
255                // We simply register that IP with the slice allocator so host
256                // accounting stays in sync.
257                let overlay_ip = if let Some(overlay) = &self.overlay_manager {
258                    let overlay_guard = overlay.read().await;
259                    #[cfg(target_os = "windows")]
260                    let attach_result: Option<std::net::IpAddr> = {
261                        let _ = container_pid; // unused on Windows
262                        match self.runtime.get_container_namespace_id(&id).await {
263                            Ok(Some(ns_id)) => {
264                                let ip_override =
265                                    self.runtime.get_container_ip(&id).await.ok().flatten();
266                                let dns_server = overlay_guard.dns_server_addr().map(|sa| sa.ip());
267                                let dns_domain =
268                                    overlay_guard.dns_domain().map(ToString::to_string);
269                                match overlay_guard
270                                    .attach_container_hcn(
271                                        ns_id,
272                                        &self.service_name,
273                                        ip_override,
274                                        true,
275                                        dns_server,
276                                        dns_domain,
277                                    )
278                                    .await
279                                {
280                                    Ok(ip) => Some(ip),
281                                    Err(e) => {
282                                        tracing::warn!(
283                                            container = %id,
284                                            error = %e,
285                                            "HCN overlay attach failed"
286                                        );
287                                        None
288                                    }
289                                }
290                            }
291                            Ok(None) => {
292                                tracing::debug!(
293                                    container = %id,
294                                    "skipping HCN overlay attach - no namespace id available"
295                                );
296                                None
297                            }
298                            Err(e) => {
299                                tracing::warn!(
300                                    container = %id,
301                                    error = %e,
302                                    "failed to fetch HCN namespace id"
303                                );
304                                None
305                            }
306                        }
307                    };
308                    #[cfg(not(target_os = "windows"))]
309                    let attach_result: Option<std::net::IpAddr> = {
310                        if let Some(pid) = container_pid {
311                            match overlay_guard
312                                .attach_container(pid, &self.service_name, true)
313                                .await
314                            {
315                                Ok(ip) => Some(ip),
316                                Err(e) => {
317                                    tracing::warn!(
318                                        container = %id,
319                                        error = %e,
320                                        "failed to attach container to overlay network"
321                                    );
322                                    None
323                                }
324                            }
325                        } else {
326                            // No PID available (e.g. WASM runtime) - skip overlay attachment
327                            tracing::debug!(
328                                container = %id,
329                                "skipping overlay attachment - no PID available"
330                            );
331                            None
332                        }
333                    };
334
335                    if let Some(ip) = attach_result {
336                        tracing::info!(
337                            container = %id,
338                            overlay_ip = %ip,
339                            "attached container to overlay network"
340                        );
341
342                        // Register DNS for service discovery
343                        if let Some(dns) = &self.dns_server {
344                            // Register service-level hostname: {service}.service.local
345                            let service_hostname = format!("{}.service.local", self.service_name);
346
347                            // Register replica-specific hostname: {replica}.{service}.service.local
348                            let replica_hostname =
349                                format!("{}.{}.service.local", id.replica, self.service_name);
350
351                            match dns.add_record(&service_hostname, ip).await {
352                                Ok(()) => tracing::debug!(
353                                    hostname = %service_hostname,
354                                    ip = %ip,
355                                    "registered DNS for service"
356                                ),
357                                Err(e) => tracing::warn!(
358                                    hostname = %service_hostname,
359                                    error = %e,
360                                    "failed to register DNS for service"
361                                ),
362                            }
363
364                            // Also register replica-specific entry
365                            if let Err(e) = dns.add_record(&replica_hostname, ip).await {
366                                tracing::warn!(
367                                    hostname = %replica_hostname,
368                                    error = %e,
369                                    "failed to register replica DNS"
370                                );
371                            } else {
372                                tracing::debug!(
373                                    hostname = %replica_hostname,
374                                    ip = %ip,
375                                    "registered DNS for replica"
376                                );
377                            }
378                        }
379
380                        Some(ip)
381                    } else {
382                        None
383                    }
384                } else {
385                    None
386                };
387
388                // If overlay failed, try the container runtime's own IP as fallback
389                let effective_ip = if overlay_ip.is_none() {
390                    match self.runtime.get_container_ip(&id).await {
391                        Ok(Some(ip)) => {
392                            tracing::info!(
393                                container = %id,
394                                ip = %ip,
395                                "using runtime container IP for proxy (overlay unavailable)"
396                            );
397                            Some(ip)
398                        }
399                        Ok(None) => {
400                            tracing::warn!(
401                                container = %id,
402                                "no container IP available from runtime, proxy routing will be unavailable"
403                            );
404                            None
405                        }
406                        Err(e) => {
407                            tracing::warn!(
408                                container = %id,
409                                error = %e,
410                                "failed to get container IP from runtime"
411                            );
412                            None
413                        }
414                    }
415                } else {
416                    overlay_ip
417                };
418
419                tracing::info!(
420                    container = %id,
421                    service = %self.service_name,
422                    overlay_ip = ?overlay_ip,
423                    effective_ip = ?effective_ip,
424                    "Container IP resolution complete"
425                );
426
427                // Query port override from the runtime.
428                // On macOS sandbox, each container is assigned a unique port since
429                // all processes share the host network (no network namespaces).
430                // The runtime passes the port to the process via the PORT env var.
431                let port_override = match self.runtime.get_container_port_override(&id).await {
432                    Ok(Some(port)) => {
433                        tracing::info!(
434                            container = %id,
435                            port = port,
436                            "runtime assigned dynamic port override for this container"
437                        );
438                        Some(port)
439                    }
440                    Ok(None) => None,
441                    Err(e) => {
442                        tracing::warn!(
443                            container = %id,
444                            error = %e,
445                            "failed to query port override from runtime, using spec port"
446                        );
447                        None
448                    }
449                };
450
451                // Start health monitoring and store handle (no lock needed during start)
452                let health_monitor_handle = {
453                    let mut check = self.spec.health.check.clone();
454
455                    // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
456                    // port the container is listening on. With mac-sandbox, each
457                    // replica gets a unique assigned port via port_override.
458                    if let HealthCheck::Tcp { ref mut port } = check {
459                        if *port == 0 {
460                            *port = port_override.unwrap_or_else(|| {
461                                self.spec
462                                    .endpoints
463                                    .iter()
464                                    .find(|ep| {
465                                        matches!(
466                                            ep.protocol,
467                                            Protocol::Http | Protocol::Https | Protocol::Websocket
468                                        )
469                                    })
470                                    .map_or(8080, zlayer_spec::EndpointSpec::target_port)
471                            });
472                        }
473                    }
474
475                    let start_grace = self
476                        .spec
477                        .health
478                        .start_grace
479                        .unwrap_or(Duration::from_secs(5));
480                    let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
481                    let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
482                    let retries = self.spec.health.retries;
483
484                    let checker = HealthChecker::new(check, effective_ip);
485                    let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
486                        .with_start_grace(start_grace)
487                        .with_check_timeout(check_timeout);
488
489                    // Create health callback to update proxy backend health if proxy is configured
490                    // and we have an overlay IP for this container
491                    if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
492                        let proxy = Arc::clone(proxy);
493                        let service_name = self.service_name.clone();
494                        // Get the container's target port, using the runtime override if present.
495                        // On macOS sandbox, port_override gives each replica a unique port
496                        // so the proxy can distinguish backends sharing 127.0.0.1.
497                        let port = port_override.unwrap_or_else(|| {
498                            self.spec
499                                .endpoints
500                                .iter()
501                                .find(|ep| {
502                                    matches!(
503                                        ep.protocol,
504                                        Protocol::Http | Protocol::Https | Protocol::Websocket
505                                    )
506                                })
507                                .map_or(8080, zlayer_spec::EndpointSpec::target_port)
508                        });
509
510                        let backend_addr = SocketAddr::new(ip, port);
511
512                        // Register backend with load balancer so proxy can route to it.
513                        // This must happen before the health callback is created, because
514                        // update_backend_health only updates *existing* backends.
515                        proxy.add_backend(&self.service_name, backend_addr).await;
516
517                        let health_states_opt = self.health_states.clone();
518                        let svc_name_for_states = self.service_name.clone();
519
520                        let health_callback: HealthCallback =
521                            Arc::new(move |container_id: ContainerId, is_healthy: bool| {
522                                let proxy = Arc::clone(&proxy);
523                                let service_name = service_name.clone();
524                                tracing::info!(
525                                    container = %container_id,
526                                    service = %service_name,
527                                    backend = %backend_addr,
528                                    healthy = is_healthy,
529                                    "health status changed, updating proxy backend"
530                                );
531                                // Spawn a task to update the proxy (callback is sync, proxy update is async)
532                                tokio::spawn(async move {
533                                    proxy
534                                        .update_backend_health(
535                                            &service_name,
536                                            backend_addr,
537                                            is_healthy,
538                                        )
539                                        .await;
540                                });
541                                // Bridge health state back to ServiceManager's health_states map
542                                if let Some(ref health_states) = health_states_opt {
543                                    let states = Arc::clone(health_states);
544                                    let svc = svc_name_for_states.clone();
545                                    tokio::spawn(async move {
546                                        let state = if is_healthy {
547                                            HealthState::Healthy
548                                        } else {
549                                            HealthState::Unhealthy {
550                                                failures: 0,
551                                                reason: "health check failed".into(),
552                                            }
553                                        };
554                                        states.write().await.insert(svc, state);
555                                    });
556                                }
557                            });
558
559                        monitor = monitor.with_callback(health_callback);
560                    }
561
562                    monitor.start()
563                };
564
565                // Update state (short write lock)
566                {
567                    let mut containers = self.containers.write().await;
568                    containers.insert(
569                        id.clone(),
570                        Container {
571                            id: id.clone(),
572                            state: ContainerState::Running,
573                            pid: None,
574                            task: None,
575                            overlay_ip: effective_ip,
576                            health_monitor: Some(health_monitor_handle),
577                            port_override,
578                        },
579                    );
580                } // Lock released here
581            }
582        }
583
584        // Phase 3: Scale down - remove containers (short write lock per removal)
585        if replicas < current_replicas {
586            for i in replicas..current_replicas {
587                let id = ContainerId {
588                    service: self.service_name.clone(),
589                    replica: i + 1,
590                };
591
592                // Remove from state first and get the container to abort health monitor (short write lock)
593                let removed_container = {
594                    let mut containers = self.containers.write().await;
595                    containers.remove(&id)
596                }; // Lock released here
597
598                // Then perform cleanup (no lock held - I/O operations)
599                if let Some(container) = removed_container {
600                    // Abort the health monitor task if it exists
601                    if let Some(handle) = container.health_monitor {
602                        handle.abort();
603                    }
604
605                    // Remove DNS records for this container
606                    if let Some(dns) = &self.dns_server {
607                        // Remove replica-specific DNS entry
608                        let replica_hostname =
609                            format!("{}.{}.service.local", id.replica, self.service_name);
610                        if let Err(e) = dns.remove_record(&replica_hostname).await {
611                            tracing::warn!(
612                                hostname = %replica_hostname,
613                                error = %e,
614                                "failed to remove replica DNS record"
615                            );
616                        } else {
617                            tracing::debug!(
618                                hostname = %replica_hostname,
619                                "removed replica DNS record"
620                            );
621                        }
622
623                        // Note: We don't remove the service-level hostname here because
624                        // other replicas may still be using it. The service-level record
625                        // should be cleaned up when the entire service is removed.
626                    }
627
628                    // Stop container
629                    self.runtime
630                        .stop_container(&id, Duration::from_secs(30))
631                        .await?;
632
633                    // Sync volumes to S3 before removal (no-op if not configured)
634                    if let Err(e) = self.runtime.sync_container_volumes(&id).await {
635                        tracing::warn!(
636                            container = %id,
637                            error = %e,
638                            "failed to sync volumes before removal"
639                        );
640                    }
641
642                    // Remove container
643                    self.runtime.remove_container(&id).await?;
644                }
645            }
646        }
647
648        Ok(())
649    }
650
651    /// Get current number of replicas
652    pub async fn replica_count(&self) -> usize {
653        self.containers.read().await.len()
654    }
655
656    /// Get all container IDs
657    pub async fn container_ids(&self) -> Vec<ContainerId> {
658        self.containers.read().await.keys().cloned().collect()
659    }
660
661    /// Get read access to the containers map
662    ///
663    /// This allows callers to access container overlay IPs and other metadata
664    /// without copying the entire map.
665    pub fn containers(
666        &self,
667    ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
668        &self.containers
669    }
670
671    /// Check if this service instance has an overlay manager configured
672    pub fn has_overlay_manager(&self) -> bool {
673        self.overlay_manager.is_some()
674    }
675
676    /// Check if this service instance has a proxy manager configured
677    pub fn has_proxy_manager(&self) -> bool {
678        self.proxy_manager.is_some()
679    }
680
681    /// Check if this service instance has a DNS server configured
682    pub fn has_dns_server(&self) -> bool {
683        self.dns_server.is_some()
684    }
685}
686
687/// Service manager for multiple services
688pub struct ServiceManager {
689    runtime: Arc<dyn Runtime + Send + Sync>,
690    services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
691    scale_semaphore: Arc<Semaphore>,
692    /// Overlay network manager for container networking
693    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
694    /// Stream registry for L4 proxy route registration (TCP/UDP)
695    stream_registry: Option<Arc<StreamRegistry>>,
696    /// Proxy manager for health-aware load balancing (hyper-based proxy)
697    proxy_manager: Option<Arc<ProxyManager>>,
698    /// DNS server for service discovery
699    dns_server: Option<Arc<DnsServer>>,
700    /// Deployment name (used for generating hostnames)
701    deployment_name: Option<String>,
702    /// Health states for dependency condition checking
703    health_states: Arc<RwLock<HashMap<String, HealthState>>>,
704    /// Job executor for run-to-completion workloads
705    job_executor: Option<Arc<JobExecutor>>,
706    /// Cron scheduler for time-based job triggers
707    cron_scheduler: Option<Arc<CronScheduler>>,
708    /// Container supervisor for crash/panic policy enforcement
709    container_supervisor: Option<Arc<ContainerSupervisor>>,
710}
711
712// ---------------------------------------------------------------------------
713// ServiceManagerBuilder
714// ---------------------------------------------------------------------------
715
716/// Builder for constructing a [`ServiceManager`] with optional subsystems.
717///
718/// Prefer using `ServiceManager::builder(runtime)` to start building.
719///
720/// # Example
721///
722/// ```ignore
723/// let manager = ServiceManager::builder(runtime)
724///     .overlay_manager(om)
725///     .proxy_manager(proxy)
726///     .deployment_name("prod")
727///     .build();
728/// ```
729pub struct ServiceManagerBuilder {
730    runtime: Arc<dyn Runtime + Send + Sync>,
731    overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
732    proxy_manager: Option<Arc<ProxyManager>>,
733    stream_registry: Option<Arc<StreamRegistry>>,
734    dns_server: Option<Arc<DnsServer>>,
735    deployment_name: Option<String>,
736    job_executor: Option<Arc<JobExecutor>>,
737    cron_scheduler: Option<Arc<CronScheduler>>,
738    container_supervisor: Option<Arc<ContainerSupervisor>>,
739}
740
741impl ServiceManagerBuilder {
742    /// Create a new builder with the required runtime.
743    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
744        Self {
745            runtime,
746            overlay_manager: None,
747            proxy_manager: None,
748            stream_registry: None,
749            dns_server: None,
750            deployment_name: None,
751            job_executor: None,
752            cron_scheduler: None,
753            container_supervisor: None,
754        }
755    }
756
757    /// Set the overlay network manager for container networking.
758    #[must_use]
759    pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
760        self.overlay_manager = Some(om);
761        self
762    }
763
764    /// Set the proxy manager for health-aware load balancing.
765    #[must_use]
766    pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
767        self.proxy_manager = Some(pm);
768        self
769    }
770
771    /// Set the stream registry for TCP/UDP L4 proxy route registration.
772    #[must_use]
773    pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
774        self.stream_registry = Some(sr);
775        self
776    }
777
778    /// Set the DNS server for service discovery.
779    #[must_use]
780    pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
781        self.dns_server = Some(dns);
782        self
783    }
784
785    /// Set the deployment name (used for hostname generation).
786    #[must_use]
787    pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
788        self.deployment_name = Some(name.into());
789        self
790    }
791
792    /// Set the job executor for run-to-completion workloads.
793    #[must_use]
794    pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
795        self.job_executor = Some(je);
796        self
797    }
798
799    /// Set the cron scheduler for time-based job triggers.
800    #[must_use]
801    pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
802        self.cron_scheduler = Some(cs);
803        self
804    }
805
806    /// Set the container supervisor for crash/panic policy enforcement.
807    #[must_use]
808    pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
809        self.container_supervisor = Some(cs);
810        self
811    }
812
813    /// Consume the builder and produce a fully-wired [`ServiceManager`].
814    ///
815    /// Logs warnings for missing recommended subsystems (proxy,
816    /// `stream_registry`, `container_supervisor`, `deployment_name`).
817    pub fn build(self) -> ServiceManager {
818        if self.proxy_manager.is_none() {
819            tracing::warn!("ServiceManager built without proxy_manager");
820        }
821        if self.stream_registry.is_none() {
822            tracing::warn!("ServiceManager built without stream_registry");
823        }
824        if self.container_supervisor.is_none() {
825            tracing::warn!("ServiceManager built without container_supervisor");
826        }
827        if self.deployment_name.is_none() {
828            tracing::warn!("ServiceManager built without deployment_name");
829        }
830
831        ServiceManager {
832            runtime: self.runtime,
833            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
834            scale_semaphore: Arc::new(Semaphore::new(10)),
835            overlay_manager: self.overlay_manager,
836            stream_registry: self.stream_registry,
837            proxy_manager: self.proxy_manager,
838            dns_server: self.dns_server,
839            deployment_name: self.deployment_name,
840            health_states: Arc::new(RwLock::new(HashMap::new())),
841            job_executor: self.job_executor,
842            cron_scheduler: self.cron_scheduler,
843            container_supervisor: self.container_supervisor,
844        }
845    }
846}
847
848impl ServiceManager {
849    /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
850    ///
851    /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
852    ///
853    /// # Example
854    ///
855    /// ```ignore
856    /// let manager = ServiceManager::builder(runtime)
857    ///     .overlay_manager(om)
858    ///     .proxy_manager(proxy)
859    ///     .build();
860    /// ```
861    pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
862        ServiceManagerBuilder::new(runtime)
863    }
864
865    /// Create a new service manager
866    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
867    pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
868        Self {
869            runtime,
870            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
871            scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
872            overlay_manager: None,
873            stream_registry: None,
874            proxy_manager: None,
875            dns_server: None,
876            deployment_name: None,
877            health_states: Arc::new(RwLock::new(HashMap::new())),
878            job_executor: None,
879            cron_scheduler: None,
880            container_supervisor: None,
881        }
882    }
883
884    /// Create a service manager with overlay network support
885    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
886    pub fn with_overlay(
887        runtime: Arc<dyn Runtime + Send + Sync>,
888        overlay_manager: Arc<RwLock<OverlayManager>>,
889    ) -> Self {
890        Self {
891            runtime,
892            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
893            scale_semaphore: Arc::new(Semaphore::new(10)),
894            overlay_manager: Some(overlay_manager),
895            stream_registry: None,
896            proxy_manager: None,
897            dns_server: None,
898            deployment_name: None,
899            health_states: Arc::new(RwLock::new(HashMap::new())),
900            job_executor: None,
901            cron_scheduler: None,
902            container_supervisor: None,
903        }
904    }
905
906    /// Create a fully-configured service manager with overlay and proxy support
907    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
908    pub fn with_full_config(
909        runtime: Arc<dyn Runtime + Send + Sync>,
910        overlay_manager: Arc<RwLock<OverlayManager>>,
911        deployment_name: String,
912    ) -> Self {
913        Self {
914            runtime,
915            services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
916            scale_semaphore: Arc::new(Semaphore::new(10)),
917            overlay_manager: Some(overlay_manager),
918            stream_registry: None,
919            proxy_manager: None,
920            dns_server: None,
921            deployment_name: Some(deployment_name),
922            health_states: Arc::new(RwLock::new(HashMap::new())),
923            job_executor: None,
924            cron_scheduler: None,
925            container_supervisor: None,
926        }
927    }
928
929    /// Get the health states map for external monitoring
930    pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
931        Arc::clone(&self.health_states)
932    }
933
934    /// Update health state for a service
935    pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
936        let mut states = self.health_states.write().await;
937        states.insert(service_name.to_string(), state);
938    }
939
940    /// Set the deployment name (used for generating hostnames)
941    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
942    pub fn set_deployment_name(&mut self, name: String) {
943        self.deployment_name = Some(name);
944    }
945
946    /// Set the stream registry for L4 proxy integration (TCP/UDP)
947    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
948    pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
949        self.stream_registry = Some(registry);
950    }
951
952    /// Builder pattern: add stream registry for L4 proxy integration
953    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
954    #[must_use]
955    pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
956        self.stream_registry = Some(registry);
957        self
958    }
959
960    /// Get the stream registry (if configured)
961    pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
962        self.stream_registry.as_ref()
963    }
964
965    /// Set the overlay manager for container networking
966    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
967    pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
968        self.overlay_manager = Some(manager);
969    }
970
971    /// Set the proxy manager for health-aware load balancing
972    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
973    pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
974        self.proxy_manager = Some(proxy);
975    }
976
977    /// Builder pattern: add proxy manager for health-aware load balancing
978    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
979    #[must_use]
980    pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
981        self.proxy_manager = Some(proxy);
982        self
983    }
984
985    /// Get the proxy manager (if configured)
986    pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
987        self.proxy_manager.as_ref()
988    }
989
990    /// Set the DNS server for service discovery
991    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
992    pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
993        self.dns_server = Some(dns);
994    }
995
996    /// Builder pattern: add DNS server for service discovery
997    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
998    #[must_use]
999    pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1000        self.dns_server = Some(dns);
1001        self
1002    }
1003
1004    /// Get the DNS server (if configured)
1005    pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1006        self.dns_server.as_ref()
1007    }
1008
1009    /// Set the job executor for run-to-completion workloads
1010    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1011    pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1012        self.job_executor = Some(executor);
1013    }
1014
1015    /// Set the cron scheduler for time-based job triggers
1016    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1017    pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1018        self.cron_scheduler = Some(scheduler);
1019    }
1020
1021    /// Builder pattern: add job executor
1022    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1023    #[must_use]
1024    pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1025        self.job_executor = Some(executor);
1026        self
1027    }
1028
1029    /// Builder pattern: add cron scheduler
1030    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1031    #[must_use]
1032    pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1033        self.cron_scheduler = Some(scheduler);
1034        self
1035    }
1036
1037    /// Get the job executor (if configured)
1038    pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1039        self.job_executor.as_ref()
1040    }
1041
1042    /// Get the cron scheduler (if configured)
1043    pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1044        self.cron_scheduler.as_ref()
1045    }
1046
1047    /// Set the container supervisor for crash/panic policy enforcement
1048    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1049    pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1050        self.container_supervisor = Some(supervisor);
1051    }
1052
1053    /// Builder pattern: add container supervisor
1054    #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1055    #[must_use]
1056    pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1057        self.container_supervisor = Some(supervisor);
1058        self
1059    }
1060
1061    /// Get the container supervisor (if configured)
1062    pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1063        self.container_supervisor.as_ref()
1064    }
1065
1066    /// Start the container supervisor background task
1067    ///
1068    /// This spawns a background task that monitors containers for crashes
1069    /// and enforces the `on_panic` error policy.
1070    ///
1071    /// # Errors
1072    /// Returns an error if no container supervisor is configured.
1073    ///
1074    /// # Returns
1075    /// A `JoinHandle` for the supervisor task.
1076    pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1077        let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1078            AgentError::Configuration("Container supervisor not configured".to_string())
1079        })?;
1080
1081        let supervisor = Arc::clone(supervisor);
1082        Ok(tokio::spawn(async move {
1083            supervisor.run_loop().await;
1084        }))
1085    }
1086
1087    /// Shutdown the container supervisor
1088    pub fn shutdown_container_supervisor(&self) {
1089        if let Some(supervisor) = &self.container_supervisor {
1090            supervisor.shutdown();
1091        }
1092    }
1093
1094    /// Get the supervised state of a container
1095    pub async fn get_container_supervised_state(
1096        &self,
1097        container_id: &ContainerId,
1098    ) -> Option<SupervisedState> {
1099        if let Some(supervisor) = &self.container_supervisor {
1100            supervisor.get_state(container_id).await
1101        } else {
1102            None
1103        }
1104    }
1105
1106    /// Get supervisor events receiver
1107    ///
1108    /// Note: This can only be called once; the receiver is moved to the caller.
1109    pub async fn take_supervisor_events(
1110        &self,
1111    ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1112        if let Some(supervisor) = &self.container_supervisor {
1113            supervisor.take_event_receiver().await
1114        } else {
1115            None
1116        }
1117    }
1118
1119    // ==================== Dependency Orchestration ====================
1120
1121    /// Deploy multiple services respecting their dependency order
1122    ///
1123    /// This method:
1124    /// 1. Builds a dependency graph from the services
1125    /// 2. Validates no cycles exist
1126    /// 3. Computes topological order (services with no deps first)
1127    /// 4. For each service in order, waits for dependencies then starts the service
1128    ///
1129    /// # Arguments
1130    /// * `services` - Map of service name to service specification
1131    ///
1132    /// # Errors
1133    /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1134    /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1135    pub async fn deploy_with_dependencies(
1136        &self,
1137        services: HashMap<String, ServiceSpec>,
1138    ) -> Result<()> {
1139        if services.is_empty() {
1140            return Ok(());
1141        }
1142
1143        // Build dependency graph
1144        let graph = DependencyGraph::build(&services)?;
1145
1146        tracing::info!(
1147            service_count = services.len(),
1148            "Starting deployment with dependency ordering"
1149        );
1150
1151        // Get startup order
1152        let order = graph.startup_order();
1153        tracing::debug!(order = ?order, "Computed startup order");
1154
1155        // Start services in dependency order
1156        for service_name in order {
1157            let service_spec = services
1158                .get(service_name)
1159                .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1160
1161            // Wait for dependencies first
1162            if !service_spec.depends.is_empty() {
1163                tracing::info!(
1164                    service = %service_name,
1165                    dependency_count = service_spec.depends.len(),
1166                    "Waiting for dependencies"
1167                );
1168                self.wait_for_dependencies(service_name, &service_spec.depends)
1169                    .await?;
1170            }
1171
1172            // Register and start service
1173            tracing::info!(service = %service_name, "Starting service");
1174            self.upsert_service(service_name.clone(), service_spec.clone())
1175                .await?;
1176
1177            // Get the desired replica count from scale config
1178            let replicas = match &service_spec.scale {
1179                zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1180                zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1181                zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1182            };
1183            self.scale_service(service_name, replicas).await?;
1184
1185            // Mark service as started in health states (Unknown until health check runs)
1186            self.update_health_state(service_name, HealthState::Unknown)
1187                .await;
1188
1189            tracing::info!(
1190                service = %service_name,
1191                replicas = replicas,
1192                "Service started"
1193            );
1194        }
1195
1196        tracing::info!(service_count = services.len(), "Deployment complete");
1197
1198        Ok(())
1199    }
1200
1201    /// Wait for all dependencies of a service to be satisfied
1202    ///
1203    /// # Arguments
1204    /// * `service` - Name of the service waiting for dependencies
1205    /// * `deps` - Slice of dependency specifications
1206    ///
1207    /// # Errors
1208    /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1209    async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1210        let condition_checker = DependencyConditionChecker::new(
1211            Arc::clone(&self.runtime),
1212            Arc::clone(&self.health_states),
1213            None,
1214        );
1215
1216        let waiter = DependencyWaiter::new(condition_checker);
1217        let results = waiter.wait_for_all(deps).await?;
1218
1219        // Check results for failures
1220        for result in results {
1221            match result {
1222                WaitResult::TimedOutFail {
1223                    service: dep_service,
1224                    condition,
1225                    timeout,
1226                } => {
1227                    return Err(AgentError::DependencyTimeout {
1228                        service: service.to_string(),
1229                        dependency: dep_service,
1230                        condition: format!("{condition:?}"),
1231                        timeout,
1232                    });
1233                }
1234                WaitResult::TimedOutWarn {
1235                    service: dep_service,
1236                    condition,
1237                } => {
1238                    tracing::warn!(
1239                        service = %service,
1240                        dependency = %dep_service,
1241                        condition = ?condition,
1242                        "Dependency timed out but continuing"
1243                    );
1244                }
1245                WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1246                    // Continue silently
1247                }
1248            }
1249        }
1250
1251        Ok(())
1252    }
1253
1254    /// Check if all dependencies for a service are currently satisfied
1255    ///
1256    /// This is a one-shot check (no waiting). Useful for pre-flight validation.
1257    ///
1258    /// # Errors
1259    /// Returns an error if a dependency check fails unexpectedly.
1260    pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1261        let condition_checker = DependencyConditionChecker::new(
1262            Arc::clone(&self.runtime),
1263            Arc::clone(&self.health_states),
1264            None,
1265        );
1266
1267        for dep in deps {
1268            if !condition_checker.check(dep).await? {
1269                return Ok(false);
1270            }
1271        }
1272
1273        Ok(true)
1274    }
1275
1276    /// Add or update a workload (service, job, or cron)
1277    ///
1278    /// This method handles different resource types appropriately:
1279    /// - **Service**: Traditional long-running containers with scaling and health checks
1280    /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
1281    /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
1282    ///
1283    /// # Errors
1284    /// Returns an error if service creation, scaling, or cron registration fails.
1285    pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1286        match spec.rtype {
1287            ResourceType::Service => {
1288                // Long-running service: create/update instance
1289                let mut services = self.services.write().await;
1290
1291                if let Some(instance) = services.get_mut(&name) {
1292                    // Update existing service
1293                    instance.spec = spec;
1294                    // Update DNS server if configured
1295                    if let Some(dns) = &self.dns_server {
1296                        instance.set_dns_server(Arc::clone(dns));
1297                    }
1298                } else {
1299                    // Create new service with proxy manager for health-aware load balancing
1300                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1301                    let mut instance = if let Some(proxy) = &self.proxy_manager {
1302                        ServiceInstance::with_proxy(
1303                            name.clone(),
1304                            spec,
1305                            self.runtime.clone(),
1306                            overlay,
1307                            Arc::clone(proxy),
1308                        )
1309                    } else {
1310                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1311                    };
1312                    // Set DNS server if configured
1313                    if let Some(dns) = &self.dns_server {
1314                        instance.set_dns_server(Arc::clone(dns));
1315                    }
1316                    // Wire shared health states so callbacks bridge back to ServiceManager
1317                    instance.set_health_states(Arc::clone(&self.health_states));
1318                    // Register HTTP routes via proxy manager
1319                    if let Some(proxy) = &self.proxy_manager {
1320                        proxy.add_service(&name, &instance.spec).await;
1321                    }
1322                    // Register TCP/UDP endpoints in stream registry
1323                    if let Some(stream_registry) = &self.stream_registry {
1324                        for endpoint in &instance.spec.endpoints {
1325                            let svc = StreamService::new(
1326                                name.clone(),
1327                                Vec::new(), // No backends yet; added on scale-up
1328                            );
1329                            match endpoint.protocol {
1330                                Protocol::Tcp => {
1331                                    stream_registry.register_tcp(endpoint.port, svc);
1332                                    tracing::debug!(
1333                                        service = %name,
1334                                        port = endpoint.port,
1335                                        "Registered TCP stream route"
1336                                    );
1337                                }
1338                                Protocol::Udp => {
1339                                    stream_registry.register_udp(endpoint.port, svc);
1340                                    tracing::debug!(
1341                                        service = %name,
1342                                        port = endpoint.port,
1343                                        "Registered UDP stream route"
1344                                    );
1345                                }
1346                                _ => {} // HTTP routes handled by proxy manager
1347                            }
1348                        }
1349                    }
1350                    services.insert(name, instance);
1351                }
1352            }
1353            ResourceType::Job => {
1354                // Job: Just store the spec for later triggering
1355                // Jobs don't start containers immediately; they're triggered on-demand
1356                if let Some(executor) = &self.job_executor {
1357                    executor.register_job(&name, spec).await;
1358                    tracing::info!(job = %name, "Registered job spec");
1359                } else {
1360                    tracing::warn!(
1361                        job = %name,
1362                        "Job executor not configured, storing as service for reference"
1363                    );
1364                    // Fallback: store as service instance for reference
1365                    let mut services = self.services.write().await;
1366                    let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1367                    let mut instance = if let Some(proxy) = &self.proxy_manager {
1368                        ServiceInstance::with_proxy(
1369                            name.clone(),
1370                            spec,
1371                            self.runtime.clone(),
1372                            overlay,
1373                            Arc::clone(proxy),
1374                        )
1375                    } else {
1376                        ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1377                    };
1378                    // Set DNS server if configured
1379                    if let Some(dns) = &self.dns_server {
1380                        instance.set_dns_server(Arc::clone(dns));
1381                    }
1382                    services.insert(name, instance);
1383                }
1384            }
1385            ResourceType::Cron => {
1386                // Cron: Register with the cron scheduler
1387                if let Some(scheduler) = &self.cron_scheduler {
1388                    scheduler.register(&name, &spec).await?;
1389                    tracing::info!(cron = %name, "Registered cron job with scheduler");
1390                } else {
1391                    return Err(AgentError::Configuration(format!(
1392                        "Cron scheduler not configured for cron job '{name}'"
1393                    )));
1394                }
1395            }
1396        }
1397
1398        Ok(())
1399    }
1400
1401    /// Update backend addresses via `ProxyManager` after scaling
1402    async fn update_proxy_backends(&self, service_name: &str, addrs: Vec<SocketAddr>) {
1403        if let Some(proxy) = &self.proxy_manager {
1404            proxy.update_backends(service_name, addrs).await;
1405        }
1406    }
1407
1408    /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
1409    ///
1410    /// For containers with a port override (macOS sandbox), the addresses already
1411    /// carry the runtime-assigned port. In that case, the container listens on the
1412    /// override port for all traffic, so we use the address port directly. For
1413    /// containers without a port override (Linux, VMs), we reconstruct addresses
1414    /// using the endpoint's declared port, since each container has its own IP
1415    /// and can bind any port independently.
1416    fn update_stream_backends(&self, spec: &ServiceSpec, addrs: &[SocketAddr]) {
1417        let Some(stream_registry) = &self.stream_registry else {
1418            return;
1419        };
1420
1421        // Determine if any addresses have a port override by checking whether
1422        // all addresses use the same port as the primary spec endpoint. If not,
1423        // they carry per-container port overrides and should be used as-is.
1424        let primary_spec_port = spec
1425            .endpoints
1426            .iter()
1427            .find(|ep| {
1428                matches!(
1429                    ep.protocol,
1430                    Protocol::Http | Protocol::Https | Protocol::Websocket
1431                )
1432            })
1433            .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1434
1435        let has_port_overrides = addrs.iter().any(|addr| addr.port() != primary_spec_port);
1436
1437        for endpoint in &spec.endpoints {
1438            match endpoint.protocol {
1439                Protocol::Tcp => {
1440                    let tcp_backends: Vec<SocketAddr> = if has_port_overrides {
1441                        // Port overrides active (macOS sandbox): the container listens
1442                        // on its assigned port for all traffic. Use addresses as-is.
1443                        addrs.to_vec()
1444                    } else {
1445                        // Normal case: each container has its own IP, construct
1446                        // addresses using the TCP endpoint's container target port.
1447                        addrs
1448                            .iter()
1449                            .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1450                            .collect()
1451                    };
1452
1453                    stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
1454
1455                    tracing::debug!(
1456                        endpoint = %endpoint.name,
1457                        port = endpoint.port,
1458                        backend_count = addrs.len(),
1459                        "Updated TCP stream backends"
1460                    );
1461                }
1462                Protocol::Udp => {
1463                    let udp_backends: Vec<SocketAddr> = if has_port_overrides {
1464                        addrs.to_vec()
1465                    } else {
1466                        addrs
1467                            .iter()
1468                            .map(|addr| SocketAddr::new(addr.ip(), endpoint.target_port()))
1469                            .collect()
1470                    };
1471
1472                    stream_registry.update_udp_backends(endpoint.port, udp_backends);
1473
1474                    tracing::debug!(
1475                        endpoint = %endpoint.name,
1476                        port = endpoint.port,
1477                        backend_count = addrs.len(),
1478                        "Updated UDP stream backends"
1479                    );
1480                }
1481                _ => {} // HTTP endpoints handled by update_proxy_backends
1482            }
1483        }
1484    }
1485
1486    /// Scale a service to desired replica count
1487    ///
1488    /// # Errors
1489    /// Returns an error if the service is not found or scaling fails.
1490    #[allow(clippy::cast_possible_truncation)]
1491    pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
1492        let _permit = self.scale_semaphore.acquire().await;
1493
1494        let services = self.services.read().await;
1495        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1496            container: name.to_string(),
1497            reason: "service not found".to_string(),
1498        })?;
1499
1500        // Get current replica count before scaling
1501        let current_replicas = instance.replica_count().await as u32;
1502
1503        // Perform the scaling operation
1504        instance.scale_to(replicas).await?;
1505
1506        // After scaling, update proxy backends with new container addresses
1507        // Note: In a real implementation, we would get actual container IPs
1508        // from the overlay network or container runtime. For now, we construct
1509        // backend addresses based on the endpoint port and localhost (for same-node).
1510        // TODO: Get actual container addresses from overlay_manager or runtime
1511        let addrs = self.collect_backend_addrs(instance, replicas).await;
1512
1513        // Update HTTP backends via ProxyManager
1514        if self.proxy_manager.is_some() && !addrs.is_empty() {
1515            self.update_proxy_backends(name, addrs.clone()).await;
1516        }
1517
1518        // Update TCP/UDP backends in StreamRegistry
1519        if self.stream_registry.is_some() {
1520            self.update_stream_backends(&instance.spec, &addrs);
1521        }
1522
1523        // Register new containers with supervisor for crash monitoring
1524        if let Some(supervisor) = &self.container_supervisor {
1525            // For scale-up, register new containers
1526            if replicas > current_replicas {
1527                for i in current_replicas..replicas {
1528                    let container_id = ContainerId {
1529                        service: name.to_string(),
1530                        replica: i + 1,
1531                    };
1532                    supervisor.supervise(&container_id, &instance.spec).await;
1533                }
1534            }
1535            // For scale-down, unregister removed containers
1536            if replicas < current_replicas {
1537                for i in replicas..current_replicas {
1538                    let container_id = ContainerId {
1539                        service: name.to_string(),
1540                        replica: i + 1,
1541                    };
1542                    supervisor.unsupervise(&container_id).await;
1543                }
1544            }
1545        }
1546
1547        Ok(())
1548    }
1549
1550    /// Collect backend addresses for a service's containers
1551    ///
1552    /// This queries the service instance's containers for their overlay network
1553    /// IP addresses and constructs backend addresses using those IPs with the
1554    /// service's endpoint port.
1555    ///
1556    /// If a container has a `port_override` (e.g., macOS sandbox where all
1557    /// containers share the host network), that port is used instead of the
1558    /// spec-declared endpoint port. This allows multiple replicas on the same
1559    /// IP (`127.0.0.1`) to be distinguished by port.
1560    async fn collect_backend_addrs(
1561        &self,
1562        instance: &ServiceInstance,
1563        _replicas: u32, // No longer needed - we iterate containers directly
1564    ) -> Vec<SocketAddr> {
1565        let mut addrs = Vec::new();
1566
1567        // Get the primary container target port (first HTTP endpoint) as the default
1568        let spec_port = instance
1569            .spec
1570            .endpoints
1571            .iter()
1572            .find(|ep| {
1573                matches!(
1574                    ep.protocol,
1575                    Protocol::Http | Protocol::Https | Protocol::Websocket
1576                )
1577            })
1578            .map_or(8080, zlayer_spec::EndpointSpec::target_port);
1579
1580        // Collect backend addresses from containers with overlay IPs
1581        let containers = instance.containers().read().await;
1582
1583        for container in containers.values() {
1584            if let Some(ip) = container.overlay_ip {
1585                // Use the runtime-assigned port override if present (macOS sandbox),
1586                // otherwise fall back to the spec-declared endpoint port.
1587                let port = container.port_override.unwrap_or(spec_port);
1588                addrs.push(SocketAddr::new(ip, port));
1589            }
1590        }
1591
1592        // If no overlay IPs available, this might be Docker runtime or failed attachments
1593        // Log a warning but don't fallback to localhost in production
1594        if addrs.is_empty() && !containers.is_empty() {
1595            tracing::warn!(
1596                service = %instance.service_name,
1597                container_count = containers.len(),
1598                "no overlay IPs available for backends - containers may not be reachable via proxy"
1599            );
1600        }
1601
1602        addrs
1603    }
1604
1605    /// Get service replica count
1606    ///
1607    /// # Errors
1608    /// Returns an error if the service is not found.
1609    pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
1610        let services = self.services.read().await;
1611        let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
1612            container: name.to_string(),
1613            reason: "service not found".to_string(),
1614        })?;
1615
1616        Ok(instance.replica_count().await)
1617    }
1618
1619    /// Remove a workload (service, job, or cron)
1620    ///
1621    /// This method handles cleanup for different resource types:
1622    /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
1623    /// - **Job**: Unregisters from job executor
1624    /// - **Cron**: Unregisters from cron scheduler
1625    ///
1626    /// # Errors
1627    /// Returns an error if the service cannot be removed or scale-down fails.
1628    pub async fn remove_service(&self, name: &str) -> Result<()> {
1629        // Try to unregister from cron scheduler first
1630        if let Some(scheduler) = &self.cron_scheduler {
1631            scheduler.unregister(name).await;
1632        }
1633
1634        // Try to unregister from job executor
1635        if let Some(executor) = &self.job_executor {
1636            executor.unregister_job(name).await;
1637        }
1638
1639        // Unregister stream routes (TCP/UDP) from the stream registry
1640        if let Some(stream_registry) = &self.stream_registry {
1641            // Need to get the service spec to know which ports to unregister
1642            let services = self.services.read().await;
1643            if let Some(instance) = services.get(name) {
1644                for endpoint in &instance.spec.endpoints {
1645                    match endpoint.protocol {
1646                        Protocol::Tcp => {
1647                            let _ = stream_registry.unregister_tcp(endpoint.port);
1648                            tracing::debug!(
1649                                service = %name,
1650                                port = endpoint.port,
1651                                "Unregistered TCP stream route"
1652                            );
1653                        }
1654                        Protocol::Udp => {
1655                            let _ = stream_registry.unregister_udp(endpoint.port);
1656                            tracing::debug!(
1657                                service = %name,
1658                                port = endpoint.port,
1659                                "Unregistered UDP stream route"
1660                            );
1661                        }
1662                        _ => {} // HTTP routes handled above
1663                    }
1664                }
1665            }
1666            drop(services); // Release read lock
1667        }
1668
1669        // Unregister containers from the supervisor
1670        if let Some(supervisor) = &self.container_supervisor {
1671            let containers = self.get_service_containers(name).await;
1672            for container_id in containers {
1673                supervisor.unsupervise(&container_id).await;
1674            }
1675            tracing::debug!(service = %name, "Unregistered containers from supervisor");
1676        }
1677
1678        // Clean up DNS records for the service
1679        if let Some(dns) = &self.dns_server {
1680            // Remove the service-level DNS entry
1681            let service_hostname = format!("{name}.service.local");
1682            if let Err(e) = dns.remove_record(&service_hostname).await {
1683                tracing::warn!(
1684                    hostname = %service_hostname,
1685                    error = %e,
1686                    "failed to remove service DNS record"
1687                );
1688            } else {
1689                tracing::debug!(
1690                    hostname = %service_hostname,
1691                    "removed service DNS record"
1692                );
1693            }
1694
1695            // Also remove any remaining replica-specific DNS entries
1696            let services = self.services.read().await;
1697            if let Some(instance) = services.get(name) {
1698                let containers = instance.containers().read().await;
1699                for (id, _) in containers.iter() {
1700                    let replica_hostname = format!("{}.{}.service.local", id.replica, name);
1701                    if let Err(e) = dns.remove_record(&replica_hostname).await {
1702                        tracing::warn!(
1703                            hostname = %replica_hostname,
1704                            error = %e,
1705                            "failed to remove replica DNS record during service removal"
1706                        );
1707                    }
1708                }
1709            }
1710            drop(services); // Release read lock before write lock
1711        }
1712
1713        // Remove from services map (may or may not exist depending on rtype)
1714        let mut services = self.services.write().await;
1715        if services.remove(name).is_some() {
1716            tracing::debug!(service = %name, "Removed service from manager");
1717        }
1718
1719        Ok(())
1720    }
1721
1722    /// Introspect service infrastructure wiring.
1723    /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
1724    pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
1725        let services = self.services.read().await;
1726        services.get(name).map(|i| {
1727            (
1728                i.has_overlay_manager(),
1729                i.has_proxy_manager(),
1730                i.has_dns_server(),
1731            )
1732        })
1733    }
1734
1735    /// List all services
1736    pub async fn list_services(&self) -> Vec<String> {
1737        self.services.read().await.keys().cloned().collect()
1738    }
1739
1740    /// Get logs for a service, aggregated from all container replicas.
1741    ///
1742    /// # Arguments
1743    /// * `service_name` - Name of the service to fetch logs for
1744    /// * `tail` - Number of lines to return per container (0 = all)
1745    /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
1746    ///
1747    /// # Errors
1748    /// Returns an error if the service or instance is not found.
1749    ///
1750    /// # Returns
1751    /// Structured log entries from all (or specific) container replicas. Each
1752    /// entry has its `service` and `deployment` fields populated when available.
1753    pub async fn get_service_logs(
1754        &self,
1755        service_name: &str,
1756        tail: usize,
1757        instance: Option<&str>,
1758    ) -> Result<Vec<LogEntry>> {
1759        let container_ids = self.get_service_containers(service_name).await;
1760
1761        if container_ids.is_empty() {
1762            return Err(AgentError::NotFound {
1763                container: service_name.to_string(),
1764                reason: "no containers found for service".to_string(),
1765            });
1766        }
1767
1768        // If a specific instance is requested, filter to just that one
1769        let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
1770            if let Ok(replica_num) = inst.parse::<u32>() {
1771                container_ids
1772                    .iter()
1773                    .filter(|id| id.replica == replica_num)
1774                    .collect()
1775            } else {
1776                // Try matching by full container ID string suffix
1777                container_ids
1778                    .iter()
1779                    .filter(|id| id.to_string().contains(inst))
1780                    .collect()
1781            }
1782        } else {
1783            container_ids.iter().collect()
1784        };
1785
1786        if target_ids.is_empty() {
1787            return Err(AgentError::NotFound {
1788                container: format!("{}/{}", service_name, instance.unwrap_or("?")),
1789                reason: "instance not found".to_string(),
1790            });
1791        }
1792
1793        let mut all_entries: Vec<LogEntry> = Vec::new();
1794
1795        for id in &target_ids {
1796            match self.runtime.container_logs(id, tail).await {
1797                Ok(mut entries) => {
1798                    // Populate service and deployment metadata on each entry
1799                    for entry in &mut entries {
1800                        if entry.service.is_none() {
1801                            entry.service = Some(service_name.to_string());
1802                        }
1803                        if entry.deployment.is_none() {
1804                            entry.deployment.clone_from(&self.deployment_name);
1805                        }
1806                    }
1807                    all_entries.extend(entries);
1808                }
1809                Err(e) => {
1810                    tracing::warn!(
1811                        service = service_name,
1812                        container = %id,
1813                        error = %e,
1814                        "Failed to read container logs"
1815                    );
1816                }
1817            }
1818        }
1819
1820        Ok(all_entries)
1821    }
1822
1823    /// Get all container IDs for a specific service
1824    ///
1825    /// Returns an empty vector if the service doesn't exist.
1826    ///
1827    /// # Arguments
1828    /// * `service_name` - Name of the service to query
1829    ///
1830    /// # Returns
1831    /// Vector of `ContainerIds` for all replicas of the service
1832    pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
1833        let services = self.services.read().await;
1834        if let Some(instance) = services.get(service_name) {
1835            instance.container_ids().await
1836        } else {
1837            Vec::new()
1838        }
1839    }
1840
1841    /// Execute a command inside a running container for a service
1842    ///
1843    /// Picks a specific replica if provided, otherwise uses the first available container.
1844    ///
1845    /// # Arguments
1846    /// * `service_name` - Name of the service
1847    /// * `replica` - Optional replica number to target
1848    /// * `cmd` - Command and arguments to execute
1849    ///
1850    /// # Errors
1851    /// Returns an error if the service or replica is not found, or if exec fails.
1852    ///
1853    /// # Panics
1854    /// Panics if no replica is specified and the container list is unexpectedly empty
1855    /// after the emptiness check (should not happen in practice).
1856    ///
1857    /// # Returns
1858    /// Tuple of (`exit_code`, stdout, stderr)
1859    pub async fn exec_in_container(
1860        &self,
1861        service_name: &str,
1862        replica: Option<u32>,
1863        cmd: &[String],
1864    ) -> Result<(i32, String, String)> {
1865        let container_ids = self.get_service_containers(service_name).await;
1866
1867        if container_ids.is_empty() {
1868            return Err(AgentError::NotFound {
1869                container: service_name.to_string(),
1870                reason: "no containers found for service".to_string(),
1871            });
1872        }
1873
1874        // Pick the target container
1875        let target = if let Some(rep) = replica {
1876            container_ids
1877                .into_iter()
1878                .find(|cid| cid.replica == rep)
1879                .ok_or_else(|| AgentError::NotFound {
1880                    container: format!("{service_name}-rep-{rep}"),
1881                    reason: format!("replica {rep} not found for service"),
1882                })?
1883        } else {
1884            // Use the first container (lowest replica number)
1885            container_ids.into_iter().next().unwrap()
1886        };
1887
1888        self.runtime.exec(&target, cmd).await
1889    }
1890
1891    // ==================== Job Management ====================
1892
1893    /// Trigger a job execution
1894    ///
1895    /// # Arguments
1896    /// * `name` - Name of the registered job
1897    /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
1898    ///
1899    /// # Returns
1900    /// The execution ID for tracking the job
1901    ///
1902    /// # Errors
1903    /// - Returns error if job executor is not configured
1904    /// - Returns error if the job is not registered
1905    pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
1906        let executor = self
1907            .job_executor
1908            .as_ref()
1909            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
1910
1911        let spec = executor
1912            .get_job_spec(name)
1913            .await
1914            .ok_or_else(|| AgentError::NotFound {
1915                container: name.to_string(),
1916                reason: "job not registered".to_string(),
1917            })?;
1918
1919        executor.trigger(name, &spec, trigger).await
1920    }
1921
1922    /// Get the status of a job execution
1923    ///
1924    /// # Arguments
1925    /// * `id` - The execution ID returned from `trigger_job`
1926    ///
1927    /// # Returns
1928    /// The job execution details, or None if not found
1929    pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
1930        if let Some(executor) = &self.job_executor {
1931            executor.get_execution(id).await
1932        } else {
1933            None
1934        }
1935    }
1936
1937    /// List all executions for a specific job
1938    ///
1939    /// # Arguments
1940    /// * `name` - Name of the job
1941    ///
1942    /// # Returns
1943    /// Vector of job executions for the specified job
1944    pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
1945        if let Some(executor) = &self.job_executor {
1946            executor.list_executions(name).await
1947        } else {
1948            Vec::new()
1949        }
1950    }
1951
1952    /// Cancel a running job execution
1953    ///
1954    /// # Arguments
1955    /// * `id` - The execution ID to cancel
1956    ///
1957    /// # Errors
1958    /// Returns error if job executor is not configured or if cancellation fails
1959    pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
1960        let executor = self
1961            .job_executor
1962            .as_ref()
1963            .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
1964
1965        executor.cancel(id).await
1966    }
1967
1968    // ==================== Cron Management ====================
1969
1970    /// Manually trigger a cron job (outside of its schedule)
1971    ///
1972    /// # Arguments
1973    /// * `name` - Name of the cron job
1974    ///
1975    /// # Returns
1976    /// The execution ID for tracking the triggered job
1977    ///
1978    /// # Errors
1979    /// Returns error if cron scheduler is not configured or job not found
1980    pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
1981        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
1982            AgentError::Configuration("Cron scheduler not configured".to_string())
1983        })?;
1984
1985        scheduler.trigger_now(name).await
1986    }
1987
1988    /// Enable or disable a cron job
1989    ///
1990    /// # Arguments
1991    /// * `name` - Name of the cron job
1992    /// * `enabled` - Whether to enable or disable the job
1993    pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
1994        if let Some(scheduler) = &self.cron_scheduler {
1995            scheduler.set_enabled(name, enabled).await;
1996        }
1997    }
1998
1999    /// List all registered cron jobs
2000    pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2001        if let Some(scheduler) = &self.cron_scheduler {
2002            scheduler.list_jobs().await
2003        } else {
2004            Vec::new()
2005        }
2006    }
2007
2008    /// Start the cron scheduler background task
2009    ///
2010    /// This spawns a background task that checks for due cron jobs every second.
2011    /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
2012    ///
2013    /// # Errors
2014    /// Returns error if cron scheduler is not configured
2015    pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2016        let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2017            AgentError::Configuration("Cron scheduler not configured".to_string())
2018        })?;
2019
2020        let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2021        Ok(tokio::spawn(async move {
2022            scheduler.run_loop().await;
2023        }))
2024    }
2025
2026    /// Shutdown the cron scheduler
2027    pub fn shutdown_cron(&self) {
2028        if let Some(scheduler) = &self.cron_scheduler {
2029            scheduler.shutdown();
2030        }
2031    }
2032}
2033
2034#[cfg(test)]
2035#[allow(deprecated)]
2036mod tests {
2037    use super::*;
2038    use crate::runtime::MockRuntime;
2039
2040    #[tokio::test]
2041    async fn test_service_manager() {
2042        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2043        let manager = ServiceManager::new(runtime);
2044
2045        // Add service
2046        let spec = mock_spec();
2047        manager
2048            .upsert_service("test".to_string(), spec)
2049            .await
2050            .unwrap();
2051
2052        // Scale up
2053        manager.scale_service("test", 3).await.unwrap();
2054
2055        // Check count
2056        let count = manager.service_replica_count("test").await.unwrap();
2057        assert_eq!(count, 3);
2058
2059        // List services
2060        let services = manager.list_services().await;
2061        assert_eq!(services, vec!["test".to_string()]);
2062    }
2063
2064    #[tokio::test]
2065    async fn test_service_manager_basic_lifecycle() {
2066        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2067        let manager = ServiceManager::new(runtime);
2068
2069        // Add service with HTTP endpoint
2070        let spec = mock_spec();
2071        manager
2072            .upsert_service("api".to_string(), spec)
2073            .await
2074            .unwrap();
2075
2076        // Scale up
2077        manager.scale_service("api", 2).await.unwrap();
2078
2079        // Check count
2080        let count = manager.service_replica_count("api").await.unwrap();
2081        assert_eq!(count, 2);
2082
2083        // Remove service
2084        manager.remove_service("api").await.unwrap();
2085
2086        // Verify service is gone
2087        let services = manager.list_services().await;
2088        assert!(!services.contains(&"api".to_string()));
2089    }
2090
2091    #[tokio::test]
2092    async fn test_service_manager_with_full_config() {
2093        use tokio::sync::RwLock;
2094
2095        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2096
2097        // Create a mock overlay manager (skip actual network setup)
2098        let overlay_manager = Arc::new(RwLock::new(
2099            OverlayManager::new("test-deployment".to_string())
2100                .await
2101                .unwrap(),
2102        ));
2103
2104        let manager =
2105            ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
2106
2107        // Add service
2108        let spec = mock_spec();
2109        manager
2110            .upsert_service("web".to_string(), spec)
2111            .await
2112            .unwrap();
2113
2114        // Verify service is registered
2115        let services = manager.list_services().await;
2116        assert!(services.contains(&"web".to_string()));
2117    }
2118
2119    fn mock_spec() -> ServiceSpec {
2120        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2121            r"
2122version: v1
2123deployment: test
2124services:
2125  test:
2126    rtype: service
2127    image:
2128      name: test:latest
2129    endpoints:
2130      - name: http
2131        protocol: http
2132        port: 8080
2133    scale:
2134      mode: fixed
2135      replicas: 1
2136",
2137        )
2138        .unwrap()
2139        .services
2140        .remove("test")
2141        .unwrap()
2142    }
2143
2144    /// Helper to create a `ServiceSpec` with dependencies
2145    fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
2146        let mut spec = mock_spec();
2147        spec.depends = deps;
2148        spec
2149    }
2150
2151    /// Helper to create a `DependsSpec`
2152    fn dep(
2153        service: &str,
2154        condition: zlayer_spec::DependencyCondition,
2155        timeout_ms: u64,
2156        on_timeout: zlayer_spec::TimeoutAction,
2157    ) -> DependsSpec {
2158        DependsSpec {
2159            service: service.to_string(),
2160            condition,
2161            timeout: Some(Duration::from_millis(timeout_ms)),
2162            on_timeout,
2163        }
2164    }
2165
2166    #[tokio::test]
2167    async fn test_deploy_with_dependencies_no_deps() {
2168        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2169        let manager = ServiceManager::new(runtime);
2170
2171        // Services with no dependencies
2172        let mut services = HashMap::new();
2173        services.insert("a".to_string(), mock_spec());
2174        services.insert("b".to_string(), mock_spec());
2175
2176        // Should deploy both without issue
2177        manager.deploy_with_dependencies(services).await.unwrap();
2178
2179        // Both services should be registered
2180        let service_list = manager.list_services().await;
2181        assert_eq!(service_list.len(), 2);
2182    }
2183
2184    #[tokio::test]
2185    async fn test_deploy_with_dependencies_linear() {
2186        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2187        let manager = ServiceManager::new(runtime);
2188
2189        // A -> B -> C (A depends on B, B depends on C)
2190        // All use "started" condition which is satisfied when container is running
2191        let mut services = HashMap::new();
2192        services.insert("c".to_string(), mock_spec());
2193        services.insert(
2194            "b".to_string(),
2195            mock_spec_with_deps(vec![dep(
2196                "c",
2197                zlayer_spec::DependencyCondition::Started,
2198                5000,
2199                zlayer_spec::TimeoutAction::Fail,
2200            )]),
2201        );
2202        services.insert(
2203            "a".to_string(),
2204            mock_spec_with_deps(vec![dep(
2205                "b",
2206                zlayer_spec::DependencyCondition::Started,
2207                5000,
2208                zlayer_spec::TimeoutAction::Fail,
2209            )]),
2210        );
2211
2212        // Should deploy in order: c, b, a
2213        manager.deploy_with_dependencies(services).await.unwrap();
2214
2215        // All services should be registered
2216        let service_list = manager.list_services().await;
2217        assert_eq!(service_list.len(), 3);
2218    }
2219
2220    #[tokio::test]
2221    async fn test_deploy_with_dependencies_cycle_detection() {
2222        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2223        let manager = ServiceManager::new(runtime);
2224
2225        // A -> B -> A (cycle)
2226        let mut services = HashMap::new();
2227        services.insert(
2228            "a".to_string(),
2229            mock_spec_with_deps(vec![dep(
2230                "b",
2231                zlayer_spec::DependencyCondition::Started,
2232                5000,
2233                zlayer_spec::TimeoutAction::Fail,
2234            )]),
2235        );
2236        services.insert(
2237            "b".to_string(),
2238            mock_spec_with_deps(vec![dep(
2239                "a",
2240                zlayer_spec::DependencyCondition::Started,
2241                5000,
2242                zlayer_spec::TimeoutAction::Fail,
2243            )]),
2244        );
2245
2246        // Should fail with cycle detection
2247        let result = manager.deploy_with_dependencies(services).await;
2248        assert!(result.is_err());
2249        let err = result.unwrap_err().to_string();
2250        assert!(err.contains("Cyclic dependency"));
2251    }
2252
2253    #[tokio::test]
2254    async fn test_deploy_with_dependencies_timeout_continue() {
2255        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2256        let manager = ServiceManager::new(runtime);
2257
2258        // A depends on B (healthy), but B never becomes healthy
2259        // Using continue action, so it should proceed anyway
2260        let mut services = HashMap::new();
2261        services.insert("b".to_string(), mock_spec());
2262        services.insert(
2263            "a".to_string(),
2264            mock_spec_with_deps(vec![dep(
2265                "b",
2266                zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
2267                100,                                       // Short timeout
2268                zlayer_spec::TimeoutAction::Continue,      // But continue anyway
2269            )]),
2270        );
2271
2272        // Should deploy both despite timeout
2273        manager.deploy_with_dependencies(services).await.unwrap();
2274
2275        let service_list = manager.list_services().await;
2276        assert_eq!(service_list.len(), 2);
2277    }
2278
2279    #[tokio::test]
2280    async fn test_deploy_with_dependencies_timeout_warn() {
2281        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2282        let manager = ServiceManager::new(runtime);
2283
2284        // A depends on B (healthy), but B never becomes healthy
2285        // Using warn action, so it should proceed with a warning
2286        let mut services = HashMap::new();
2287        services.insert("b".to_string(), mock_spec());
2288        services.insert(
2289            "a".to_string(),
2290            mock_spec_with_deps(vec![dep(
2291                "b",
2292                zlayer_spec::DependencyCondition::Healthy,
2293                100,
2294                zlayer_spec::TimeoutAction::Warn,
2295            )]),
2296        );
2297
2298        // Should deploy both despite timeout (with warning)
2299        manager.deploy_with_dependencies(services).await.unwrap();
2300
2301        let service_list = manager.list_services().await;
2302        assert_eq!(service_list.len(), 2);
2303    }
2304
2305    #[tokio::test]
2306    async fn test_deploy_with_dependencies_timeout_fail() {
2307        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2308        let manager = ServiceManager::new(runtime);
2309
2310        // A depends on B (healthy), but B never becomes healthy
2311        // Using fail action, so deployment should fail
2312        let mut services = HashMap::new();
2313        services.insert("b".to_string(), mock_spec());
2314        services.insert(
2315            "a".to_string(),
2316            mock_spec_with_deps(vec![dep(
2317                "b",
2318                zlayer_spec::DependencyCondition::Healthy,
2319                100,
2320                zlayer_spec::TimeoutAction::Fail,
2321            )]),
2322        );
2323
2324        // Should fail after B is started but doesn't become healthy
2325        let result = manager.deploy_with_dependencies(services).await;
2326        assert!(result.is_err());
2327
2328        // B should be started (it has no deps), but A should fail
2329        let err = result.unwrap_err().to_string();
2330        assert!(err.contains("Dependency timeout"));
2331    }
2332
2333    #[tokio::test]
2334    async fn test_check_dependencies_all_satisfied() {
2335        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2336        let manager = ServiceManager::new(runtime);
2337
2338        // Mark a service as healthy
2339        manager
2340            .update_health_state("db", HealthState::Healthy)
2341            .await;
2342
2343        let deps = vec![DependsSpec {
2344            service: "db".to_string(),
2345            condition: zlayer_spec::DependencyCondition::Healthy,
2346            timeout: Some(Duration::from_secs(60)),
2347            on_timeout: zlayer_spec::TimeoutAction::Fail,
2348        }];
2349
2350        let satisfied = manager.check_dependencies(&deps).await.unwrap();
2351        assert!(satisfied);
2352    }
2353
2354    #[tokio::test]
2355    async fn test_check_dependencies_not_satisfied() {
2356        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2357        let manager = ServiceManager::new(runtime);
2358
2359        // Service not healthy (no state set = Unknown)
2360        let deps = vec![DependsSpec {
2361            service: "db".to_string(),
2362            condition: zlayer_spec::DependencyCondition::Healthy,
2363            timeout: Some(Duration::from_secs(60)),
2364            on_timeout: zlayer_spec::TimeoutAction::Fail,
2365        }];
2366
2367        let satisfied = manager.check_dependencies(&deps).await.unwrap();
2368        assert!(!satisfied);
2369    }
2370
2371    #[tokio::test]
2372    async fn test_health_state_tracking() {
2373        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2374        let manager = ServiceManager::new(runtime);
2375
2376        // Update health states
2377        manager
2378            .update_health_state("db", HealthState::Healthy)
2379            .await;
2380        manager
2381            .update_health_state("cache", HealthState::Unknown)
2382            .await;
2383
2384        // Verify states
2385        let states = manager.health_states();
2386        let states_read = states.read().await;
2387
2388        assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
2389        assert!(matches!(
2390            states_read.get("cache"),
2391            Some(HealthState::Unknown)
2392        ));
2393    }
2394
2395    // ==================== Job/Cron Integration Tests ====================
2396
2397    fn mock_job_spec() -> ServiceSpec {
2398        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2399            r"
2400version: v1
2401deployment: test
2402services:
2403  backup:
2404    rtype: job
2405    image:
2406      name: backup:latest
2407",
2408        )
2409        .unwrap()
2410        .services
2411        .remove("backup")
2412        .unwrap()
2413    }
2414
2415    fn mock_cron_spec() -> ServiceSpec {
2416        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2417            r#"
2418version: v1
2419deployment: test
2420services:
2421  cleanup:
2422    rtype: cron
2423    schedule: "0 0 * * * * *"
2424    image:
2425      name: cleanup:latest
2426"#,
2427        )
2428        .unwrap()
2429        .services
2430        .remove("cleanup")
2431        .unwrap()
2432    }
2433
2434    #[tokio::test]
2435    async fn test_service_manager_with_job_executor() {
2436        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2437        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2438
2439        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2440
2441        // Register job
2442        let job_spec = mock_job_spec();
2443        manager
2444            .upsert_service("backup".to_string(), job_spec)
2445            .await
2446            .unwrap();
2447
2448        // Trigger job
2449        let exec_id = manager
2450            .trigger_job("backup", JobTrigger::Cli)
2451            .await
2452            .unwrap();
2453
2454        // Give job time to start
2455        tokio::time::sleep(Duration::from_millis(50)).await;
2456
2457        // Check execution exists
2458        let execution = manager.get_job_execution(&exec_id).await;
2459        assert!(execution.is_some());
2460        assert_eq!(execution.unwrap().job_name, "backup");
2461    }
2462
2463    #[tokio::test]
2464    async fn test_service_manager_with_cron_scheduler() {
2465        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2466        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2467        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2468
2469        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2470
2471        // Register cron job
2472        let cron_spec = mock_cron_spec();
2473        manager
2474            .upsert_service("cleanup".to_string(), cron_spec)
2475            .await
2476            .unwrap();
2477
2478        // List cron jobs
2479        let cron_jobs = manager.list_cron_jobs().await;
2480        assert_eq!(cron_jobs.len(), 1);
2481        assert_eq!(cron_jobs[0].name, "cleanup");
2482        assert!(cron_jobs[0].enabled);
2483    }
2484
2485    #[tokio::test]
2486    async fn test_service_manager_trigger_cron() {
2487        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2488        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2489        let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
2490
2491        let manager = ServiceManager::new(runtime)
2492            .with_job_executor(job_executor)
2493            .with_cron_scheduler(cron_scheduler);
2494
2495        // Register cron job
2496        let cron_spec = mock_cron_spec();
2497        manager
2498            .upsert_service("cleanup".to_string(), cron_spec)
2499            .await
2500            .unwrap();
2501
2502        // Manually trigger the cron job
2503        let exec_id = manager.trigger_cron("cleanup").await.unwrap();
2504        assert!(!exec_id.0.is_empty());
2505    }
2506
2507    #[tokio::test]
2508    async fn test_service_manager_enable_disable_cron() {
2509        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2510        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2511        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2512
2513        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
2514
2515        // Register cron job
2516        let cron_spec = mock_cron_spec();
2517        manager
2518            .upsert_service("cleanup".to_string(), cron_spec)
2519            .await
2520            .unwrap();
2521
2522        // Initially enabled
2523        let cron_jobs = manager.list_cron_jobs().await;
2524        assert!(cron_jobs[0].enabled);
2525
2526        // Disable
2527        manager.set_cron_enabled("cleanup", false).await;
2528        let cron_jobs = manager.list_cron_jobs().await;
2529        assert!(!cron_jobs[0].enabled);
2530
2531        // Re-enable
2532        manager.set_cron_enabled("cleanup", true).await;
2533        let cron_jobs = manager.list_cron_jobs().await;
2534        assert!(cron_jobs[0].enabled);
2535    }
2536
2537    #[tokio::test]
2538    async fn test_service_manager_remove_cleans_up_job() {
2539        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2540        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2541
2542        let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
2543
2544        // Register job
2545        let job_spec = mock_job_spec();
2546        manager
2547            .upsert_service("backup".to_string(), job_spec)
2548            .await
2549            .unwrap();
2550
2551        // Verify job is registered
2552        let spec = job_executor.get_job_spec("backup").await;
2553        assert!(spec.is_some());
2554
2555        // Remove job
2556        manager.remove_service("backup").await.unwrap();
2557
2558        // Verify job is unregistered
2559        let spec = job_executor.get_job_spec("backup").await;
2560        assert!(spec.is_none());
2561    }
2562
2563    #[tokio::test]
2564    async fn test_service_manager_remove_cleans_up_cron() {
2565        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2566        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2567        let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
2568
2569        let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
2570
2571        // Register cron job
2572        let cron_spec = mock_cron_spec();
2573        manager
2574            .upsert_service("cleanup".to_string(), cron_spec)
2575            .await
2576            .unwrap();
2577
2578        // Verify cron job is registered
2579        assert_eq!(cron_scheduler.job_count().await, 1);
2580
2581        // Remove cron job
2582        manager.remove_service("cleanup").await.unwrap();
2583
2584        // Verify cron job is unregistered
2585        assert_eq!(cron_scheduler.job_count().await, 0);
2586    }
2587
2588    #[tokio::test]
2589    async fn test_service_manager_job_without_executor() {
2590        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2591        let manager = ServiceManager::new(runtime);
2592
2593        // Try to trigger job without executor configured
2594        let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
2595        assert!(result.is_err());
2596        assert!(result.unwrap_err().to_string().contains("not configured"));
2597    }
2598
2599    #[tokio::test]
2600    async fn test_service_manager_cron_without_scheduler() {
2601        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2602        let manager = ServiceManager::new(runtime);
2603
2604        // Try to register cron job without scheduler configured
2605        let cron_spec = mock_cron_spec();
2606        let result = manager
2607            .upsert_service("cleanup".to_string(), cron_spec)
2608            .await;
2609        assert!(result.is_err());
2610        assert!(result.unwrap_err().to_string().contains("not configured"));
2611    }
2612
2613    #[tokio::test]
2614    async fn test_service_manager_list_job_executions() {
2615        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2616        let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
2617
2618        let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
2619
2620        // Register job
2621        let job_spec = mock_job_spec();
2622        manager
2623            .upsert_service("backup".to_string(), job_spec)
2624            .await
2625            .unwrap();
2626
2627        // Trigger job twice
2628        manager
2629            .trigger_job("backup", JobTrigger::Cli)
2630            .await
2631            .unwrap();
2632        manager
2633            .trigger_job("backup", JobTrigger::Scheduler)
2634            .await
2635            .unwrap();
2636
2637        // Give jobs time to start
2638        tokio::time::sleep(Duration::from_millis(50)).await;
2639
2640        // List executions
2641        let executions = manager.list_job_executions("backup").await;
2642        assert_eq!(executions.len(), 2);
2643    }
2644
2645    // ==================== Container Supervisor Integration Tests ====================
2646
2647    #[tokio::test]
2648    async fn test_service_manager_with_supervisor() {
2649        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2650        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2651
2652        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2653
2654        // Add service
2655        let spec = mock_spec();
2656        manager
2657            .upsert_service("api".to_string(), spec)
2658            .await
2659            .unwrap();
2660
2661        // Scale up - containers should be registered with supervisor
2662        manager.scale_service("api", 2).await.unwrap();
2663
2664        // Verify containers are supervised
2665        assert_eq!(supervisor.supervised_count().await, 2);
2666
2667        // Scale down - containers should be unregistered
2668        manager.scale_service("api", 1).await.unwrap();
2669        assert_eq!(supervisor.supervised_count().await, 1);
2670
2671        // Remove service - remaining containers should be unregistered
2672        manager.remove_service("api").await.unwrap();
2673        assert_eq!(supervisor.supervised_count().await, 0);
2674    }
2675
2676    #[tokio::test]
2677    async fn test_service_manager_supervisor_state() {
2678        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2679        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2680
2681        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
2682
2683        // Add and scale service
2684        let spec = mock_spec();
2685        manager
2686            .upsert_service("web".to_string(), spec)
2687            .await
2688            .unwrap();
2689        manager.scale_service("web", 1).await.unwrap();
2690
2691        // Check supervised state
2692        let container_id = ContainerId {
2693            service: "web".to_string(),
2694            replica: 1,
2695        };
2696        let state = manager.get_container_supervised_state(&container_id).await;
2697        assert_eq!(state, Some(SupervisedState::Running));
2698    }
2699
2700    #[tokio::test]
2701    async fn test_service_manager_start_supervisor() {
2702        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2703        let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
2704
2705        let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
2706
2707        // Start the supervisor
2708        let handle = manager.start_container_supervisor().unwrap();
2709
2710        // Give it time to start
2711        tokio::time::sleep(Duration::from_millis(50)).await;
2712        assert!(supervisor.is_running());
2713
2714        // Shutdown
2715        manager.shutdown_container_supervisor();
2716
2717        // Wait for it to stop
2718        tokio::time::timeout(Duration::from_secs(1), handle)
2719            .await
2720            .unwrap()
2721            .unwrap();
2722
2723        assert!(!supervisor.is_running());
2724    }
2725
2726    #[tokio::test]
2727    async fn test_service_manager_supervisor_not_configured() {
2728        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2729        let manager = ServiceManager::new(runtime);
2730
2731        // Try to start supervisor without configuring it
2732        let result = manager.start_container_supervisor();
2733        assert!(result.is_err());
2734        assert!(result.unwrap_err().to_string().contains("not configured"));
2735    }
2736
2737    // ==================== Stream Registry Integration Tests ====================
2738
2739    fn mock_tcp_spec() -> ServiceSpec {
2740        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2741            r"
2742version: v1
2743deployment: test
2744services:
2745  database:
2746    rtype: service
2747    image:
2748      name: postgres:latest
2749    endpoints:
2750      - name: postgresql
2751        protocol: tcp
2752        port: 5432
2753    scale:
2754      mode: fixed
2755      replicas: 1
2756",
2757        )
2758        .unwrap()
2759        .services
2760        .remove("database")
2761        .unwrap()
2762    }
2763
2764    fn mock_udp_spec() -> ServiceSpec {
2765        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2766            r"
2767version: v1
2768deployment: test
2769services:
2770  dns:
2771    rtype: service
2772    image:
2773      name: dns:latest
2774    endpoints:
2775      - name: dns
2776        protocol: udp
2777        port: 53
2778    scale:
2779      mode: fixed
2780      replicas: 1
2781",
2782        )
2783        .unwrap()
2784        .services
2785        .remove("dns")
2786        .unwrap()
2787    }
2788
2789    fn mock_mixed_spec() -> ServiceSpec {
2790        serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
2791            r"
2792version: v1
2793deployment: test
2794services:
2795  mixed:
2796    rtype: service
2797    image:
2798      name: mixed:latest
2799    endpoints:
2800      - name: http
2801        protocol: http
2802        port: 8080
2803      - name: grpc
2804        protocol: tcp
2805        port: 9000
2806      - name: metrics
2807        protocol: udp
2808        port: 8125
2809    scale:
2810      mode: fixed
2811      replicas: 1
2812",
2813        )
2814        .unwrap()
2815        .services
2816        .remove("mixed")
2817        .unwrap()
2818    }
2819
2820    #[tokio::test]
2821    async fn test_service_manager_with_stream_registry_tcp() {
2822        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2823        let stream_registry = Arc::new(StreamRegistry::new());
2824
2825        let mut manager = ServiceManager::new(runtime);
2826        manager.set_stream_registry(stream_registry.clone());
2827        manager.set_deployment_name("test".to_string());
2828
2829        // Add TCP-only service
2830        let spec = mock_tcp_spec();
2831        manager
2832            .upsert_service("database".to_string(), spec)
2833            .await
2834            .unwrap();
2835
2836        // Verify TCP route was registered
2837        assert_eq!(stream_registry.tcp_count(), 1);
2838        assert!(stream_registry.tcp_ports().contains(&5432));
2839
2840        // Remove service and verify cleanup
2841        manager.remove_service("database").await.unwrap();
2842        assert_eq!(stream_registry.tcp_count(), 0);
2843    }
2844
2845    #[tokio::test]
2846    async fn test_service_manager_with_stream_registry_udp() {
2847        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2848        let stream_registry = Arc::new(StreamRegistry::new());
2849
2850        let mut manager = ServiceManager::new(runtime);
2851        manager.set_stream_registry(stream_registry.clone());
2852        manager.set_deployment_name("test".to_string());
2853
2854        // Add UDP-only service
2855        let spec = mock_udp_spec();
2856        manager
2857            .upsert_service("dns".to_string(), spec)
2858            .await
2859            .unwrap();
2860
2861        // Verify UDP route was registered
2862        assert_eq!(stream_registry.udp_count(), 1);
2863        assert!(stream_registry.udp_ports().contains(&53));
2864
2865        // Remove service and verify cleanup
2866        manager.remove_service("dns").await.unwrap();
2867        assert_eq!(stream_registry.udp_count(), 0);
2868    }
2869
2870    #[tokio::test]
2871    async fn test_service_manager_with_stream_registry_mixed() {
2872        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2873        let stream_registry = Arc::new(StreamRegistry::new());
2874
2875        let mut manager = ServiceManager::new(runtime);
2876        manager.set_stream_registry(stream_registry.clone());
2877        manager.set_deployment_name("test".to_string());
2878
2879        // Add mixed service (HTTP + TCP + UDP)
2880        let spec = mock_mixed_spec();
2881        manager
2882            .upsert_service("mixed".to_string(), spec)
2883            .await
2884            .unwrap();
2885
2886        // Verify stream routes were registered
2887        assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
2888        assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
2889
2890        assert!(stream_registry.tcp_ports().contains(&9000));
2891        assert!(stream_registry.udp_ports().contains(&8125));
2892
2893        // Remove service and verify stream cleanup
2894        manager.remove_service("mixed").await.unwrap();
2895        assert_eq!(stream_registry.tcp_count(), 0);
2896        assert_eq!(stream_registry.udp_count(), 0);
2897    }
2898
2899    #[tokio::test]
2900    async fn test_service_manager_stream_registry_builder() {
2901        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2902        let stream_registry = Arc::new(StreamRegistry::new());
2903
2904        // Test builder pattern
2905        let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
2906
2907        // Verify stream registry is accessible
2908        assert!(manager.stream_registry().is_some());
2909    }
2910
2911    #[tokio::test]
2912    async fn test_tcp_service_without_stream_registry() {
2913        let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
2914
2915        // Manager without stream registry
2916        let mut manager = ServiceManager::new(runtime);
2917        manager.set_deployment_name("test".to_string());
2918
2919        // Add TCP service - should log warning but not fail
2920        let spec = mock_tcp_spec();
2921        manager
2922            .upsert_service("database".to_string(), spec)
2923            .await
2924            .unwrap();
2925
2926        // No stream registry to check, but service should be tracked
2927        let services = manager.list_services().await;
2928        assert!(services.contains(&"database".to_string()));
2929    }
2930}