zlayer_agent/service.rs
1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6 DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::{IpAddr, SocketAddr};
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType, ServiceSpec};
24
25/// Service instance manages a single service's containers
26pub struct ServiceInstance {
27 pub service_name: String,
28 pub spec: ServiceSpec,
29 runtime: Arc<dyn Runtime + Send + Sync>,
30 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31 /// Overlay network manager for container networking (optional, not needed for Docker runtime)
32 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33 /// Proxy manager for updating backend health (optional)
34 proxy_manager: Option<Arc<ProxyManager>>,
35 /// DNS server for service discovery (optional)
36 dns_server: Option<Arc<DnsServer>>,
37 /// Container-injectable overlay resolver IP (optional). When set, this
38 /// node's overlay DNS server is reachable on `<ip>:53` and we inject it
39 /// into the container's resolv.conf so workloads resolve through the
40 /// overlay instead of inheriting the host's resolv.conf.
41 container_dns: Option<IpAddr>,
42 /// Shared health states map so callbacks can update ServiceManager-level health
43 health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
44 /// Most recently observed image digest after a successful pull. Used by
45 /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
46 /// requiring callers to track digest state externally. Wrapped in a
47 /// `RwLock` so `&self` methods (`scale_to`) can update it.
48 last_pulled_digest: tokio::sync::RwLock<Option<String>>,
49 /// Local cluster node id used when constructing new `ContainerId`s during
50 /// scale-up. `0` in single-node deployments or when the cluster handle is
51 /// not yet wired. Populated by `ServiceManager` from `Cluster::node_id()`
52 /// at instance construction time.
53 node_id: u64,
54}
55
56impl ServiceInstance {
57 /// Create a new service instance
58 pub fn new(
59 service_name: String,
60 spec: ServiceSpec,
61 runtime: Arc<dyn Runtime + Send + Sync>,
62 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
63 ) -> Self {
64 Self {
65 service_name,
66 spec,
67 runtime,
68 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
69 overlay_manager,
70 proxy_manager: None,
71 dns_server: None,
72 container_dns: None,
73 health_states: None,
74 last_pulled_digest: tokio::sync::RwLock::new(None),
75 node_id: 0,
76 }
77 }
78
79 /// Create a new service instance with proxy manager for health-aware load balancing
80 pub fn with_proxy(
81 service_name: String,
82 spec: ServiceSpec,
83 runtime: Arc<dyn Runtime + Send + Sync>,
84 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
85 proxy_manager: Arc<ProxyManager>,
86 ) -> Self {
87 Self {
88 service_name,
89 spec,
90 runtime,
91 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
92 overlay_manager,
93 proxy_manager: Some(proxy_manager),
94 dns_server: None,
95 container_dns: None,
96 health_states: None,
97 last_pulled_digest: tokio::sync::RwLock::new(None),
98 node_id: 0,
99 }
100 }
101
102 /// Set the local cluster node id. Used by `ServiceManager` to thread
103 /// `Cluster::node_id()` down to container construction so new
104 /// `ContainerId`s carry the owning node identity. Defaults to `0` (the
105 /// single-node sentinel) when unset.
106 pub fn set_node_id(&mut self, node_id: u64) {
107 self.node_id = node_id;
108 }
109
110 /// Derive the replica group role for a 1-based `replica_idx`.
111 ///
112 /// When `spec.replica_groups` is unset, returns `"default"` (the implicit
113 /// single-group case). Otherwise walks groups in declaration order,
114 /// accumulating each group's `count` until `replica_idx` falls within the
115 /// current group's range, and returns that group's `role`.
116 ///
117 /// Replicas beyond the declared total fall back to `"default"`.
118 #[must_use]
119 pub fn role_for_replica(&self, replica_idx: u32) -> String {
120 let Some(groups) = self.spec.replica_groups.as_ref() else {
121 return "default".to_string();
122 };
123 let mut cumulative = 0u32;
124 for group in groups {
125 cumulative = cumulative.saturating_add(group.count);
126 if replica_idx <= cumulative {
127 return group.role.clone();
128 }
129 }
130 "default".to_string()
131 }
132
133 /// Builder method to add DNS server for service discovery
134 #[must_use]
135 pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
136 self.dns_server = Some(dns_server);
137 self
138 }
139
140 /// Set the DNS server for service discovery
141 pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
142 self.dns_server = Some(dns_server);
143 }
144
145 /// Set the container-injectable overlay resolver IP and apply it to the
146 /// instance's spec.
147 ///
148 /// When `container_dns` is set and the spec is eligible (not host-network,
149 /// no user-supplied `dns`), this pre-populates `spec.dns` with the overlay
150 /// resolver so containers resolve through `<ip>:53` instead of inheriting
151 /// the host's `/etc/resolv.conf`.
152 ///
153 /// Why this exists: on overlay-enabled hosts the netbird `~.`
154 /// systemd-resolved hijack swallows the host resolver, so a container that
155 /// inherits the host resolv.conf cannot resolve anything. The overlay DNS
156 /// server forwards non-overlay queries upstream, so pointing the container
157 /// at it fixes resolution AND gives it service-name discovery.
158 ///
159 /// Port-53 constraint: `resolv.conf` `nameserver` lines (and Docker's
160 /// `--dns`) carry no port — they are always port 53. The injected IP is
161 /// therefore only useful because the daemon binds the overlay resolver on
162 /// `<ip>:53` (see `daemon.rs` Phase 4); the injected value is the bare IP,
163 /// not a `host:port`.
164 ///
165 /// User-supplied `spec.dns` is left untouched: an explicit resolver from
166 /// the deployment spec always wins.
167 pub fn set_container_dns(&mut self, container_dns: IpAddr) {
168 self.container_dns = Some(container_dns);
169 if !self.spec.host_network && self.spec.dns.is_empty() {
170 self.spec.dns = vec![container_dns.to_string()];
171 }
172 }
173
174 /// Set the proxy manager for health-aware load balancing
175 pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
176 self.proxy_manager = Some(proxy_manager);
177 }
178
179 /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
180 pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
181 self.health_states = Some(states);
182 }
183
184 /// Get the last observed image digest (after the most recent successful
185 /// pull). Returns `None` when no pull has happened yet, when the runtime
186 /// does not expose digests, or when no matching `ImageInfo` was found.
187 pub async fn last_pulled_digest(&self) -> Option<String> {
188 self.last_pulled_digest.read().await.clone()
189 }
190
191 /// Pull the service image using the spec's pull policy (literal Docker /
192 /// Kubernetes semantics — no silent auto-upgrade of `IfNotPresent` to
193 /// `Newer` for `:latest` tags) and refresh the cached digest from
194 /// `Runtime::list_images` when the runtime exposes it. Returns the digest
195 /// observed after the pull, when known.
196 ///
197 /// For `Never`, the runtime is still called so it can load the image
198 /// config from the local cache (without any remote round-trip); only the
199 /// remote digest refresh is skipped. Without this call the bundle builder
200 /// has no image entrypoint/cmd and falls back to `/bin/sh`.
201 async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
202 let image_str = self.spec.image.name.to_string();
203 let policy = self.spec.image.pull_policy;
204
205 self.runtime
206 .pull_image_with_policy(
207 &image_str,
208 policy,
209 None,
210 self.spec.image.source_policy.unwrap_or_default(),
211 )
212 .await
213 .map_err(|e| AgentError::PullFailed {
214 image: self.spec.image.name.to_string(),
215 reason: e.to_string(),
216 })?;
217
218 // Best-effort: try to discover the resolved digest via list_images.
219 // Runtimes that don't support introspection (Unsupported) leave the
220 // cached digest unchanged; drift detection then falls back to "always
221 // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
222 // when no digests are known".
223 let new_digest = match self.runtime.list_images().await {
224 Ok(images) => images
225 .into_iter()
226 .find(|info| info.reference == image_str)
227 .and_then(|info| info.digest),
228 Err(e) => {
229 tracing::debug!(
230 image = %image_str,
231 error = %e,
232 "list_images unavailable; cannot record post-pull digest"
233 );
234 None
235 }
236 };
237
238 if let Some(ref digest) = new_digest {
239 *self.last_pulled_digest.write().await = Some(digest.clone());
240 }
241
242 Ok(new_digest)
243 }
244
245 /// Scale to the desired number of replicas
246 ///
247 /// This method uses short-lived locks to avoid blocking concurrent operations.
248 /// I/O operations (pull, create, start, stop, remove) are performed without
249 /// holding the containers lock to allow other operations to proceed.
250 ///
251 /// # Errors
252 /// Returns an error if image pull, container creation, or container lifecycle operations fail.
253 #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
254 pub async fn scale_to(&self, replicas: u32) -> Result<()> {
255 // Phase 1: Determine current state (short read lock)
256 let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
257
258 // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
259 // here with replicas == current_replicas in the steady state) actually
260 // refreshes the cached digest. We skip the call only when scaling
261 // strictly down (no new containers needed). For `Never` the runtime
262 // still needs to load the image config from the local cache so the
263 // bundle builder gets entrypoint/cmd/env — without it the container
264 // falls back to `/bin/sh` and exits instantly. `pull_and_refresh_digest`
265 // itself handles the Never case (no remote round-trip, cache-only).
266 if replicas >= current_replicas {
267 let _ = self.pull_and_refresh_digest().await?;
268 }
269
270 // Phase 2: Scale up - create new containers (no lock held during I/O)
271 //
272 // Compute (role, replica_index) tuples for each new replica. When
273 // `spec.replica_groups` is set, expand groups in declaration order so
274 // each created replica maps to its declared `(role, intra_group_index)`.
275 // Otherwise fall back to the implicit single "default" group. The
276 // `local_node_id` is captured once so every new `ContainerId` carries
277 // the owning node identity for cross-node disambiguation.
278 let local_node_id = self.node_id;
279 if replicas > current_replicas {
280 let replica_specs: Vec<(String, u32)> =
281 if let Some(groups) = self.spec.replica_groups.as_ref() {
282 let mut specs: Vec<(String, u32)> = Vec::new();
283 for group in groups {
284 for idx in 0..group.count {
285 specs.push((group.role.clone(), idx + 1));
286 }
287 }
288 specs
289 .into_iter()
290 .skip(current_replicas as usize)
291 .take((replicas - current_replicas) as usize)
292 .collect()
293 } else {
294 (current_replicas..replicas)
295 .map(|i| ("default".to_string(), i + 1))
296 .collect()
297 };
298
299 for (role, replica_idx) in replica_specs {
300 let id = ContainerId::with_role_and_node(
301 self.service_name.clone(),
302 replica_idx,
303 role,
304 local_node_id,
305 );
306
307 // Create container (no lock needed - I/O operation)
308 //
309 // RouteToPeer must propagate unchanged: the scheduler uses it
310 // to re-place the workload on a capable peer, and wrapping it
311 // in `CreateFailed` would hide the signal and mark the service
312 // dead instead of rescheduling it. All other errors are
313 // normalised to `CreateFailed` for upstream handling.
314 self.runtime
315 .create_container(&id, &self.spec)
316 .await
317 .map_err(|e| match e {
318 AgentError::RouteToPeer { .. } => e,
319 other => AgentError::CreateFailed {
320 id: id.to_string(),
321 reason: other.to_string(),
322 },
323 })?;
324
325 // Run init actions with error policy enforcement (no lock needed)
326 let init_orchestrator = InitOrchestrator::with_error_policy(
327 id.clone(),
328 self.spec.init.clone(),
329 self.spec.errors.clone(),
330 );
331 init_orchestrator.run().await?;
332
333 // Start container (no lock needed - I/O operation)
334 self.runtime
335 .start_container(&id)
336 .await
337 .map_err(|e| AgentError::StartFailed {
338 id: id.to_string(),
339 reason: e.to_string(),
340 })?;
341
342 // Get container PID with retries (may not be immediately available)
343 let mut container_pid = None;
344 for attempt in 1..=5u32 {
345 match self.runtime.get_container_pid(&id).await {
346 Ok(Some(pid)) => {
347 container_pid = Some(pid);
348 break;
349 }
350 Ok(None) if attempt < 5 => {
351 tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
352 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
353 }
354 Ok(None) => {
355 tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
356 }
357 Err(e) => {
358 tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
359 if attempt < 5 {
360 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
361 }
362 }
363 }
364 }
365
366 // Verify the container is still running before attempting
367 // overlay attach. If the init process crashed during start
368 // (bad image, missing libs, failed mount), the PID above is
369 // now dead and every `ip link set ... netns {pid}` will
370 // return a cryptic RTNETLINK error. Surface the real cause
371 // from the container's log tail instead of the cascade.
372 if container_pid.is_some() {
373 let alive = match self.runtime.container_state(&id).await {
374 Ok(
375 ContainerState::Running
376 | ContainerState::Pending
377 | ContainerState::Initializing,
378 ) => true,
379 Ok(state) => {
380 tracing::warn!(
381 container = %id,
382 ?state,
383 "container exited before overlay attach could run"
384 );
385 false
386 }
387 Err(e) => {
388 // State query failed — don't block the attach on
389 // it. The overlay manager's own cleanup-on-error
390 // path now handles the dead-PID case cleanly.
391 tracing::warn!(
392 container = %id,
393 error = %e,
394 "container state query failed before overlay attach, proceeding"
395 );
396 true
397 }
398 };
399 if !alive {
400 let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
401 || " <log read failed>".to_string(),
402 |entries| {
403 if entries.is_empty() {
404 " <no log output>".to_string()
405 } else {
406 entries
407 .into_iter()
408 .map(|e| format!(" {}", e.message))
409 .collect::<Vec<_>>()
410 .join("\n")
411 }
412 },
413 );
414 return Err(AgentError::StartFailed {
415 id: id.to_string(),
416 reason: format!("container exited during startup:\n{log_tail}"),
417 });
418 }
419 }
420
421 // Attach to overlay network if manager is available.
422 //
423 // Linux uses the container PID to enter the netns and attach a
424 // veth. Windows has no PID-addressable netns — the HCN namespace
425 // GUID (obtained from `get_container_namespace_id`) is used
426 // instead, and the endpoint's IP has already been populated by
427 // `EndpointAttachment::create_overlay` during container creation.
428 // We simply register that IP with the slice allocator so host
429 // accounting stays in sync.
430 let overlay_ip = if let Some(overlay) = &self.overlay_manager {
431 let overlay_guard = overlay.read().await;
432 #[cfg(target_os = "windows")]
433 let attach_result: Option<std::net::IpAddr> = {
434 // On Windows the overlay attach (HCN endpoint + per-container
435 // namespace creation, via overlayd) already happened inside
436 // `HcsRuntime::create_container`. Here we only need the IP it
437 // assigned so we can register DNS for service discovery.
438 let _ = (container_pid, &overlay_guard); // unused on Windows
439 match self.runtime.get_container_ip(&id).await {
440 Ok(Some(ip)) => Some(ip),
441 Ok(None) => {
442 tracing::debug!(
443 container = %id,
444 "no overlay IP recorded for container (overlay attach skipped at create time)"
445 );
446 None
447 }
448 Err(e) => {
449 tracing::warn!(
450 container = %id,
451 error = %e,
452 "failed to fetch container overlay IP"
453 );
454 None
455 }
456 }
457 };
458 #[cfg(not(target_os = "windows"))]
459 let attach_result: Option<std::net::IpAddr> = {
460 match self.runtime.overlay_attach_kind() {
461 // VM guest (macOS VZ-Linux): no host netns/PID, so
462 // overlayd allocates the overlay identity and we push
463 // it into the guest over vsock, where it brings up its
464 // own kernel WireGuard device.
465 crate::runtime::OverlayAttachKind::InGuestVsock => {
466 let cid = id.to_string();
467 match overlay_guard
468 .attach_container_guest(&cid, &self.service_name, true)
469 .await
470 {
471 Ok(cfg) => {
472 let ip = cfg.overlay_ip;
473 match self.runtime.push_overlay_config(&id, &cfg).await {
474 Ok(()) => Some(ip),
475 Err(e) => {
476 tracing::warn!(
477 container = %id,
478 error = %e,
479 "failed to push overlay config into guest; rolling back allocation"
480 );
481 // Don't leak the overlayd IP/peer.
482 if let Err(de) =
483 overlay_guard.detach_container_guest(&cid).await
484 {
485 tracing::warn!(
486 container = %id,
487 error = %de,
488 "failed to roll back guest overlay allocation"
489 );
490 }
491 None
492 }
493 }
494 }
495 Err(e) => {
496 tracing::warn!(
497 container = %id,
498 error = %e,
499 "failed to allocate guest overlay config from overlayd"
500 );
501 None
502 }
503 }
504 }
505 // Host-process runtimes (Linux youki): plumb a veth
506 // into the container's netns by PID.
507 _ => {
508 if let Some(pid) = container_pid {
509 match overlay_guard
510 .attach_container(pid, &self.service_name, true)
511 .await
512 {
513 Ok(ip) => Some(ip),
514 Err(e) => {
515 tracing::warn!(
516 container = %id,
517 error = %e,
518 "failed to attach container to overlay network"
519 );
520 None
521 }
522 }
523 } else {
524 // No PID available (e.g. WASM runtime) - skip overlay attachment
525 tracing::debug!(
526 container = %id,
527 "skipping overlay attachment - no PID available"
528 );
529 None
530 }
531 }
532 }
533 };
534
535 if let Some(ip) = attach_result {
536 tracing::info!(
537 container = %id,
538 overlay_ip = %ip,
539 "attached container to overlay network"
540 );
541
542 // Register DNS for service discovery
543 if let Some(dns) = &self.dns_server {
544 // Register the BARE compose service name ({service}) so
545 // sibling containers can resolve each other by the name
546 // docker-compose uses (e.g. `postgres`), not just the
547 // FQDN. This is what `FORGEJO__database__HOST=postgres`
548 // and every other compose service reference depends on.
549 // Multiple replicas upsert the same name; the in-memory
550 // authority keeps the most recent A record (round-robin
551 // is not required for the single-replica compose case).
552 match dns.add_record(&self.service_name, ip).await {
553 Ok(()) => tracing::debug!(
554 hostname = %self.service_name,
555 ip = %ip,
556 "registered bare service-name DNS (compose discovery)"
557 ),
558 Err(e) => tracing::warn!(
559 hostname = %self.service_name,
560 error = %e,
561 "failed to register bare service-name DNS"
562 ),
563 }
564
565 // Register service-level hostname: {service}.service.local
566 let service_hostname = format!("{}.service.local", self.service_name);
567
568 // Register replica-specific hostname: {replica}.{service}.service.local
569 let replica_hostname =
570 format!("{}.{}.service.local", id.replica, self.service_name);
571
572 match dns.add_record(&service_hostname, ip).await {
573 Ok(()) => tracing::debug!(
574 hostname = %service_hostname,
575 ip = %ip,
576 "registered DNS for service"
577 ),
578 Err(e) => tracing::warn!(
579 hostname = %service_hostname,
580 error = %e,
581 "failed to register DNS for service"
582 ),
583 }
584
585 // Also register replica-specific entry
586 if let Err(e) = dns.add_record(&replica_hostname, ip).await {
587 tracing::warn!(
588 hostname = %replica_hostname,
589 error = %e,
590 "failed to register replica DNS"
591 );
592 } else {
593 tracing::debug!(
594 hostname = %replica_hostname,
595 ip = %ip,
596 "registered DNS for replica"
597 );
598 }
599
600 // Per-role DNS: register `{role}.{service}.service.local` when
601 // this container belongs to a non-default replica group. Lets
602 // intra-cluster clients reach a specific group (e.g.
603 // `read.db.service.local` for the read replicas of a postgres
604 // service with primary+read replica groups).
605 if id.role != "default" {
606 let role_hostname =
607 format!("{}.{}.service.local", id.role, self.service_name);
608 match dns.add_record(&role_hostname, ip).await {
609 Ok(()) => tracing::debug!(
610 hostname = %role_hostname,
611 ip = %ip,
612 role = %id.role,
613 "registered DNS for replica group role"
614 ),
615 Err(e) => tracing::warn!(
616 hostname = %role_hostname,
617 error = %e,
618 "failed to register role DNS"
619 ),
620 }
621 }
622 }
623
624 Some(ip)
625 } else {
626 None
627 }
628 } else {
629 None
630 };
631
632 // If overlay failed, try the container runtime's own IP as fallback
633 let effective_ip = if overlay_ip.is_none() {
634 match self.runtime.get_container_ip(&id).await {
635 Ok(Some(ip)) => {
636 tracing::info!(
637 container = %id,
638 ip = %ip,
639 "using runtime container IP for proxy (overlay unavailable)"
640 );
641 Some(ip)
642 }
643 Ok(None) => {
644 tracing::warn!(
645 container = %id,
646 "no container IP available from runtime, proxy routing will be unavailable"
647 );
648 None
649 }
650 Err(e) => {
651 tracing::warn!(
652 container = %id,
653 error = %e,
654 "failed to get container IP from runtime"
655 );
656 None
657 }
658 }
659 } else {
660 overlay_ip
661 };
662
663 tracing::info!(
664 container = %id,
665 service = %self.service_name,
666 overlay_ip = ?overlay_ip,
667 effective_ip = ?effective_ip,
668 "Container IP resolution complete"
669 );
670
671 // Query port override from the runtime.
672 // On macOS sandbox, each container is assigned a unique port since
673 // all processes share the host network (no network namespaces).
674 // The runtime passes the port to the process via the PORT env var.
675 let port_override = match self.runtime.get_container_port_override(&id).await {
676 Ok(Some(port)) => {
677 tracing::info!(
678 container = %id,
679 port = port,
680 "runtime assigned dynamic port override for this container"
681 );
682 Some(port)
683 }
684 Ok(None) => None,
685 Err(e) => {
686 tracing::warn!(
687 container = %id,
688 error = %e,
689 "failed to query port override from runtime, using spec port"
690 );
691 None
692 }
693 };
694
695 // Start health monitoring and store handle (no lock needed during start)
696 let health_monitor_handle = {
697 let mut check = self.spec.health.check.clone();
698
699 // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
700 // port the container is listening on. With mac-sandbox, each
701 // replica gets a unique assigned port via port_override.
702 if let HealthCheck::Tcp { ref mut port } = check {
703 if *port == 0 {
704 *port = port_override.unwrap_or_else(|| {
705 self.spec
706 .endpoints
707 .iter()
708 .find(|ep| {
709 matches!(
710 ep.protocol,
711 Protocol::Http | Protocol::Https | Protocol::Websocket
712 )
713 })
714 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
715 });
716 }
717 }
718
719 let start_grace = self
720 .spec
721 .health
722 .start_grace
723 .unwrap_or(Duration::from_secs(5));
724 let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
725 let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
726 let retries = self.spec.health.retries;
727
728 let checker = HealthChecker::new(check, effective_ip);
729 let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
730 .with_start_grace(start_grace)
731 .with_check_timeout(check_timeout);
732
733 // Build the optional proxy backend handle. This is only present
734 // when both a proxy manager AND a reachable overlay IP exist; in
735 // degraded-overlay / no-proxy deployments it stays None and the
736 // callback below skips all proxy work while STILL bridging health
737 // state back into ServiceManager.
738 let proxy_backend: Option<(Arc<ProxyManager>, SocketAddr)> =
739 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
740 let proxy = Arc::clone(proxy);
741 // Get the container's target port, using the runtime override if
742 // present. On macOS sandbox, port_override gives each replica a
743 // unique port so the proxy can distinguish backends sharing
744 // 127.0.0.1.
745 let port = port_override.unwrap_or_else(|| {
746 self.spec
747 .endpoints
748 .iter()
749 .find(|ep| {
750 matches!(
751 ep.protocol,
752 Protocol::Http | Protocol::Https | Protocol::Websocket
753 )
754 })
755 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
756 });
757
758 let backend_addr = SocketAddr::new(ip, port);
759
760 // Register backend with load balancer so proxy can route to it.
761 // This must happen before the health callback is created, because
762 // update_backend_health only updates *existing* backends.
763 proxy.add_backend(&self.service_name, backend_addr).await;
764
765 // Publish this container's exposed ports on the node
766 // loopback (`127.0.0.1:<endpoint.port>`) so a consumer
767 // sharing the node loopback can reach the service at
768 // `localhost:<port>`. Gated on the spec's policy
769 // (`Auto` publishes only for single-member services).
770 // Uses the SAME runtime-resolved `ip`/`port_override`
771 // as the backend above: on macOS each replica shares
772 // 127.0.0.1 with a unique override; on Linux/VM the
773 // overlay IP carries the declared target port.
774 if self.spec.publish_to_node_loopback() {
775 proxy
776 .publish_loopback_for_container(
777 &self.service_name,
778 &self.spec,
779 ip,
780 port_override,
781 )
782 .await;
783 }
784
785 Some((proxy, backend_addr))
786 } else {
787 None
788 };
789
790 // The health bridge is ALWAYS attached, independent of proxy/IP
791 // availability. stabilization::wait_for_stabilization only treats a
792 // service as ready when health_states[name] == Healthy, so this write
793 // must happen even when the overlay is degraded and no proxy backend
794 // exists — otherwise the service stays healthy=false forever and
795 // stabilization times out.
796 let health_states_opt = self.health_states.clone();
797 let svc_name_for_states = self.service_name.clone();
798 let svc_name_for_proxy = self.service_name.clone();
799 let svc_name_for_log = self.service_name.clone();
800
801 let health_callback: HealthCallback =
802 Arc::new(move |container_id: ContainerId, is_healthy: bool| {
803 tracing::info!(
804 container = %container_id,
805 service = %svc_name_for_log,
806 healthy = is_healthy,
807 has_proxy_backend = proxy_backend.is_some(),
808 "health status changed"
809 );
810
811 // Always bridge health state back to ServiceManager's
812 // health_states map (unconditional — no proxy/IP required).
813 if let Some(ref health_states) = health_states_opt {
814 let states = Arc::clone(health_states);
815 let svc = svc_name_for_states.clone();
816 tokio::spawn(async move {
817 let state = if is_healthy {
818 HealthState::Healthy
819 } else {
820 HealthState::Unhealthy {
821 failures: 0,
822 reason: "health check failed".into(),
823 }
824 };
825 states.write().await.insert(svc, state);
826 });
827 }
828
829 // Update proxy backend health only when a proxy backend was
830 // registered (proxy manager + reachable overlay IP present).
831 if let Some((proxy, backend_addr)) = proxy_backend.clone() {
832 let svc = svc_name_for_proxy.clone();
833 tokio::spawn(async move {
834 proxy
835 .update_backend_health(&svc, backend_addr, is_healthy)
836 .await;
837 });
838 }
839 });
840
841 monitor = monitor.with_callback(health_callback);
842
843 monitor.start()
844 };
845
846 // Update state (short write lock)
847 {
848 let mut containers = self.containers.write().await;
849 containers.insert(
850 id.clone(),
851 Container {
852 id: id.clone(),
853 image: self.spec.image.name.to_string(),
854 state: ContainerState::Running,
855 pid: None,
856 task: None,
857 overlay_ip: effective_ip,
858 health_monitor: Some(health_monitor_handle),
859 port_override,
860 },
861 );
862 } // Lock released here
863 }
864 }
865
866 // Phase 3: Scale down - remove containers (short write lock per removal)
867 //
868 // Containers were created with `with_role_and_node(role, local_node_id)`
869 // on scale-up, so we must reconstruct the same identity on scale-down
870 // — the role is derived from `replica_groups` via `role_for_replica`
871 // and the node id is the local cluster node. Mismatched ids would miss
872 // the live entry in `self.containers` and leak the container.
873 if replicas < current_replicas {
874 for i in replicas..current_replicas {
875 let replica_idx = i + 1;
876 let id = ContainerId::with_role_and_node(
877 self.service_name.clone(),
878 replica_idx,
879 self.role_for_replica(replica_idx),
880 local_node_id,
881 );
882
883 // Remove from state first and get the container to abort health monitor (short write lock)
884 let removed_container = {
885 let mut containers = self.containers.write().await;
886 containers.remove(&id)
887 }; // Lock released here
888
889 // Then perform cleanup (no lock held - I/O operations)
890 if let Some(container) = removed_container {
891 // Abort the health monitor task if it exists
892 if let Some(handle) = container.health_monitor {
893 handle.abort();
894 }
895
896 // Unpublish this container's node-loopback ports (mirror of
897 // the publish in the start path above). Recomputes the same
898 // backend from the container's stored runtime-resolved IP and
899 // port override; the last replica's removal frees the
900 // loopback listener. Gated identically to publish.
901 if self.spec.publish_to_node_loopback() {
902 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip)
903 {
904 proxy
905 .unpublish_loopback_for_container(
906 &self.spec,
907 ip,
908 container.port_override,
909 )
910 .await;
911 }
912 }
913
914 // Remove DNS records for this container
915 if let Some(dns) = &self.dns_server {
916 // Remove replica-specific DNS entry
917 let replica_hostname =
918 format!("{}.{}.service.local", id.replica, self.service_name);
919 if let Err(e) = dns.remove_record(&replica_hostname).await {
920 tracing::warn!(
921 hostname = %replica_hostname,
922 error = %e,
923 "failed to remove replica DNS record"
924 );
925 } else {
926 tracing::debug!(
927 hostname = %replica_hostname,
928 "removed replica DNS record"
929 );
930 }
931
932 // Remove per-role DNS entry if this was a non-default group.
933 // Note: this is best-effort and removes the record even if
934 // other replicas in the same role still need it — the DNS
935 // server's add/remove API is single-record so we can't keep
936 // it alive for siblings. P2.3-bis (round-robin per-role)
937 // can fix this later via a per-role refcount; for now the
938 // service-level hostname keeps cluster-internal clients
939 // working even when the role-specific record briefly
940 // disappears.
941 if id.role != "default" {
942 let role_hostname =
943 format!("{}.{}.service.local", id.role, self.service_name);
944 if let Err(e) = dns.remove_record(&role_hostname).await {
945 tracing::warn!(
946 hostname = %role_hostname,
947 error = %e,
948 "failed to remove role DNS record"
949 );
950 } else {
951 tracing::debug!(
952 hostname = %role_hostname,
953 "removed role DNS record"
954 );
955 }
956 }
957
958 // Note: We don't remove the service-level hostname here because
959 // other replicas may still be using it. The service-level record
960 // should be cleaned up when the entire service is removed.
961 }
962
963 // Detach from overlay network if manager available.
964 //
965 // Done BEFORE stop_container because:
966 // - The container init process must still be in
967 // /proc to look up its PID via `get_container_pid`.
968 // - `OverlayManager::detach_container` deletes host-side
969 // veth interfaces by name (`veth-<pid>-*`) and
970 // releases the allocated overlay IPs back to the
971 // per-node slice. Without this the IPs leak across
972 // container churn and the slice exhausts.
973 //
974 // Best-effort: failures are logged but never abort the
975 // scale-down. The periodic orphan sweep
976 // (`start_periodic_orphan_sweep`) catches anything we
977 // missed.
978 if let Some(overlay) = &self.overlay_manager {
979 // VM guests have no host veth/PID — release the overlayd
980 // allocation (IP + registered mesh peer) by container id
981 // instead of by PID.
982 if self.runtime.overlay_attach_kind()
983 == crate::runtime::OverlayAttachKind::InGuestVsock
984 {
985 let overlay_guard = overlay.read().await;
986 if let Err(e) =
987 overlay_guard.detach_container_guest(&id.to_string()).await
988 {
989 tracing::warn!(
990 container = %id,
991 error = %e,
992 "overlay detach_container_guest failed; relying on orphan sweep"
993 );
994 }
995 } else {
996 match self.runtime.get_container_pid(&id).await {
997 Ok(Some(pid)) => {
998 let overlay_guard = overlay.read().await;
999 if let Err(e) = overlay_guard.detach_container(pid).await {
1000 tracing::warn!(
1001 container = %id,
1002 pid,
1003 error = %e,
1004 "overlay detach_container failed; relying on orphan sweep"
1005 );
1006 }
1007 }
1008 Ok(None) => {
1009 tracing::debug!(
1010 container = %id,
1011 "no PID available for overlay detach (already exited or non-Linux runtime)"
1012 );
1013 }
1014 Err(e) => {
1015 tracing::warn!(
1016 container = %id,
1017 error = %e,
1018 "failed to query container PID for overlay detach"
1019 );
1020 }
1021 }
1022 }
1023 }
1024
1025 // Stop container
1026 self.runtime
1027 .stop_container(&id, Duration::from_secs(30))
1028 .await?;
1029
1030 // Sync volumes to S3 before removal (no-op if not configured)
1031 if let Err(e) = self.runtime.sync_container_volumes(&id).await {
1032 tracing::warn!(
1033 container = %id,
1034 error = %e,
1035 "failed to sync volumes before removal"
1036 );
1037 }
1038
1039 // Remove container
1040 self.runtime.remove_container(&id).await?;
1041 }
1042 }
1043 }
1044
1045 Ok(())
1046 }
1047
1048 /// Get current number of replicas
1049 pub async fn replica_count(&self) -> usize {
1050 self.containers.read().await.len()
1051 }
1052
1053 /// Get all container IDs
1054 pub async fn container_ids(&self) -> Vec<ContainerId> {
1055 self.containers.read().await.keys().cloned().collect()
1056 }
1057
1058 /// Get per-container info (id, image, state, pid, overlay IP) for every
1059 /// live container in this instance.
1060 ///
1061 /// Surfaces the REAL image reference each container was created from and its
1062 /// REAL lifecycle state (lowercased via [`ContainerState::as_str`]) so the
1063 /// API/`ps` no longer reports a hardcoded `"running"` with no image.
1064 pub async fn container_infos(&self) -> Vec<ContainerInfo> {
1065 self.containers
1066 .read()
1067 .await
1068 .values()
1069 .map(|c| ContainerInfo {
1070 id: c.id.clone(),
1071 image: c.image.clone(),
1072 state: c.state.as_str().to_string(),
1073 pid: c.pid,
1074 overlay_ip: c.overlay_ip.map(|ip| ip.to_string()),
1075 })
1076 .collect()
1077 }
1078
1079 /// Get read access to the containers map
1080 ///
1081 /// This allows callers to access container overlay IPs and other metadata
1082 /// without copying the entire map.
1083 pub fn containers(
1084 &self,
1085 ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
1086 &self.containers
1087 }
1088
1089 /// Check if this service instance has an overlay manager configured
1090 pub fn has_overlay_manager(&self) -> bool {
1091 self.overlay_manager.is_some()
1092 }
1093
1094 /// Check if this service instance has a proxy manager configured
1095 pub fn has_proxy_manager(&self) -> bool {
1096 self.proxy_manager.is_some()
1097 }
1098
1099 /// Get the proxy manager for this instance, if configured.
1100 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1101 self.proxy_manager.as_ref()
1102 }
1103
1104 /// Check if this service instance has a DNS server configured
1105 pub fn has_dns_server(&self) -> bool {
1106 self.dns_server.is_some()
1107 }
1108}
1109
1110/// Per-container summary surfaced to callers (API / `ps`).
1111///
1112/// Carries the REAL image reference and lifecycle state of a single live
1113/// container, replacing the previous id-only view that forced the API to
1114/// fabricate a hardcoded `"running"` state with no image.
1115#[derive(Debug, Clone)]
1116pub struct ContainerInfo {
1117 /// Container identity.
1118 pub id: ContainerId,
1119 /// Image reference the container was created from (canonical form).
1120 pub image: String,
1121 /// Lowercased lifecycle state (e.g. `"running"`, `"exited"`).
1122 pub state: String,
1123 /// Process ID, when the container is running.
1124 pub pid: Option<u32>,
1125 /// Overlay IP rendered as a string, when assigned.
1126 pub overlay_ip: Option<String>,
1127}
1128
1129/// A live deployment container enriched for Docker-compat `ps` rows and for
1130/// name resolution. Produced by [`ServiceManager::list_container_views`].
1131#[derive(Debug, Clone)]
1132pub struct DeploymentContainerView {
1133 /// Deployment (compose project) name, when known.
1134 pub deployment: Option<String>,
1135 /// Service name within the deployment.
1136 pub service: String,
1137 /// Concrete container identity.
1138 pub container_id: ContainerId,
1139 /// Compose `container_name:` (the user-facing Docker name), when set.
1140 pub container_name: Option<String>,
1141 /// Image reference the container was created from.
1142 pub image: String,
1143 /// Lowercased lifecycle state (e.g. `"running"`).
1144 pub state: String,
1145 /// Process id when running.
1146 pub pid: Option<u32>,
1147 /// The service's published port mappings.
1148 pub ports: Vec<zlayer_spec::PortMapping>,
1149}
1150
1151/// Service manager for multiple services
1152pub struct ServiceManager {
1153 runtime: Arc<dyn Runtime + Send + Sync>,
1154 services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
1155 scale_semaphore: Arc<Semaphore>,
1156 /// Overlay network manager for container networking
1157 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1158 /// Stream registry for L4 proxy route registration (TCP/UDP)
1159 stream_registry: Option<Arc<StreamRegistry>>,
1160 /// Proxy manager for health-aware load balancing (hyper-based proxy)
1161 proxy_manager: Option<Arc<ProxyManager>>,
1162 /// DNS server for service discovery
1163 dns_server: Option<Arc<DnsServer>>,
1164 /// Container-injectable overlay resolver IP. When set, new service
1165 /// instances inject `<ip>` into their `spec.dns` so containers resolve
1166 /// through the overlay DNS server (bound on `<ip>:53`) rather than the
1167 /// hijacked host resolv.conf.
1168 container_dns: Option<IpAddr>,
1169 /// Deployment name (used for generating hostnames)
1170 deployment_name: Option<String>,
1171 /// Health states for dependency condition checking
1172 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
1173 /// Job executor for run-to-completion workloads
1174 job_executor: Option<Arc<JobExecutor>>,
1175 /// Cron scheduler for time-based job triggers
1176 cron_scheduler: Option<Arc<CronScheduler>>,
1177 /// Container supervisor for crash/panic policy enforcement
1178 container_supervisor: Option<Arc<ContainerSupervisor>>,
1179 /// Cluster membership + dispatch handle. When `None`, scale operations
1180 /// run purely local (single-node mode). When `Some`, `scale_service`
1181 /// routes through the cluster (leader dispatches to peers; followers
1182 /// forward to the leader).
1183 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1184}
1185
1186// ---------------------------------------------------------------------------
1187// ServiceManagerBuilder
1188// ---------------------------------------------------------------------------
1189
1190/// Builder for constructing a [`ServiceManager`] with optional subsystems.
1191///
1192/// Prefer using `ServiceManager::builder(runtime)` to start building.
1193///
1194/// # Example
1195///
1196/// ```ignore
1197/// let manager = ServiceManager::builder(runtime)
1198/// .overlay_manager(om)
1199/// .proxy_manager(proxy)
1200/// .deployment_name("prod")
1201/// .build();
1202/// ```
1203pub struct ServiceManagerBuilder {
1204 runtime: Arc<dyn Runtime + Send + Sync>,
1205 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1206 proxy_manager: Option<Arc<ProxyManager>>,
1207 stream_registry: Option<Arc<StreamRegistry>>,
1208 dns_server: Option<Arc<DnsServer>>,
1209 container_dns: Option<IpAddr>,
1210 deployment_name: Option<String>,
1211 job_executor: Option<Arc<JobExecutor>>,
1212 cron_scheduler: Option<Arc<CronScheduler>>,
1213 container_supervisor: Option<Arc<ContainerSupervisor>>,
1214 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1215}
1216
1217impl ServiceManagerBuilder {
1218 /// Create a new builder with the required runtime.
1219 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1220 Self {
1221 runtime,
1222 overlay_manager: None,
1223 proxy_manager: None,
1224 stream_registry: None,
1225 dns_server: None,
1226 container_dns: None,
1227 deployment_name: None,
1228 job_executor: None,
1229 cron_scheduler: None,
1230 container_supervisor: None,
1231 cluster: None,
1232 }
1233 }
1234
1235 /// Set the overlay network manager for container networking.
1236 #[must_use]
1237 pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
1238 self.overlay_manager = Some(om);
1239 self
1240 }
1241
1242 /// Set the proxy manager for health-aware load balancing.
1243 #[must_use]
1244 pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
1245 self.proxy_manager = Some(pm);
1246 self
1247 }
1248
1249 /// Set the stream registry for TCP/UDP L4 proxy route registration.
1250 #[must_use]
1251 pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
1252 self.stream_registry = Some(sr);
1253 self
1254 }
1255
1256 /// Set the DNS server for service discovery.
1257 #[must_use]
1258 pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1259 self.dns_server = Some(dns);
1260 self
1261 }
1262
1263 /// Set the container-injectable overlay resolver IP.
1264 ///
1265 /// The daemon passes the IP it bound the overlay DNS server on at port 53
1266 /// (see `daemon.rs` Phase 4). New service instances inject it into
1267 /// `spec.dns` so containers resolve through the overlay instead of the
1268 /// hijacked host resolv.conf. The port is implicitly 53 (resolv.conf has no
1269 /// port syntax), which is why only the bare IP is threaded here.
1270 #[must_use]
1271 pub fn container_dns(mut self, ip: IpAddr) -> Self {
1272 self.container_dns = Some(ip);
1273 self
1274 }
1275
1276 /// Set the deployment name (used for hostname generation).
1277 #[must_use]
1278 pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
1279 self.deployment_name = Some(name.into());
1280 self
1281 }
1282
1283 /// Set the job executor for run-to-completion workloads.
1284 #[must_use]
1285 pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
1286 self.job_executor = Some(je);
1287 self
1288 }
1289
1290 /// Set the cron scheduler for time-based job triggers.
1291 #[must_use]
1292 pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
1293 self.cron_scheduler = Some(cs);
1294 self
1295 }
1296
1297 /// Set the container supervisor for crash/panic policy enforcement.
1298 #[must_use]
1299 pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
1300 self.container_supervisor = Some(cs);
1301 self
1302 }
1303
1304 /// Set the cluster membership + dispatch handle. When set,
1305 /// [`ServiceManager::scale_service`] will route through the cluster
1306 /// (leader dispatches to peers; followers forward to the leader).
1307 /// When unset (the default), scale operations remain local-only.
1308 #[must_use]
1309 pub fn cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1310 self.cluster = Some(cluster);
1311 self
1312 }
1313
1314 /// Consume the builder and produce a fully-wired [`ServiceManager`].
1315 ///
1316 /// Logs warnings for missing recommended subsystems (proxy,
1317 /// `stream_registry`, `container_supervisor`, `deployment_name`).
1318 pub fn build(self) -> ServiceManager {
1319 if self.proxy_manager.is_none() {
1320 tracing::warn!("ServiceManager built without proxy_manager");
1321 }
1322 if self.stream_registry.is_none() {
1323 tracing::warn!("ServiceManager built without stream_registry");
1324 }
1325 if self.container_supervisor.is_none() {
1326 tracing::warn!("ServiceManager built without container_supervisor");
1327 }
1328 if self.deployment_name.is_none() {
1329 tracing::warn!("ServiceManager built without deployment_name");
1330 }
1331
1332 ServiceManager {
1333 runtime: self.runtime,
1334 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1335 scale_semaphore: Arc::new(Semaphore::new(10)),
1336 overlay_manager: self.overlay_manager,
1337 stream_registry: self.stream_registry,
1338 proxy_manager: self.proxy_manager,
1339 dns_server: self.dns_server,
1340 container_dns: self.container_dns,
1341 deployment_name: self.deployment_name,
1342 health_states: Arc::new(RwLock::new(HashMap::new())),
1343 job_executor: self.job_executor,
1344 cron_scheduler: self.cron_scheduler,
1345 container_supervisor: self.container_supervisor,
1346 cluster: self.cluster,
1347 }
1348 }
1349}
1350
1351impl ServiceManager {
1352 /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
1353 ///
1354 /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
1355 ///
1356 /// # Example
1357 ///
1358 /// ```ignore
1359 /// let manager = ServiceManager::builder(runtime)
1360 /// .overlay_manager(om)
1361 /// .proxy_manager(proxy)
1362 /// .build();
1363 /// ```
1364 pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
1365 ServiceManagerBuilder::new(runtime)
1366 }
1367
1368 /// Create a new service manager
1369 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1370 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1371 Self {
1372 runtime,
1373 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1374 scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
1375 overlay_manager: None,
1376 stream_registry: None,
1377 proxy_manager: None,
1378 dns_server: None,
1379 container_dns: None,
1380 deployment_name: None,
1381 health_states: Arc::new(RwLock::new(HashMap::new())),
1382 job_executor: None,
1383 cron_scheduler: None,
1384 container_supervisor: None,
1385 cluster: None,
1386 }
1387 }
1388
1389 /// Create a service manager with overlay network support
1390 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1391 pub fn with_overlay(
1392 runtime: Arc<dyn Runtime + Send + Sync>,
1393 overlay_manager: Arc<RwLock<OverlayManager>>,
1394 ) -> Self {
1395 Self {
1396 runtime,
1397 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1398 scale_semaphore: Arc::new(Semaphore::new(10)),
1399 overlay_manager: Some(overlay_manager),
1400 stream_registry: None,
1401 proxy_manager: None,
1402 dns_server: None,
1403 container_dns: None,
1404 deployment_name: None,
1405 health_states: Arc::new(RwLock::new(HashMap::new())),
1406 job_executor: None,
1407 cron_scheduler: None,
1408 container_supervisor: None,
1409 cluster: None,
1410 }
1411 }
1412
1413 /// Create a fully-configured service manager with overlay and proxy support
1414 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1415 pub fn with_full_config(
1416 runtime: Arc<dyn Runtime + Send + Sync>,
1417 overlay_manager: Arc<RwLock<OverlayManager>>,
1418 deployment_name: String,
1419 ) -> Self {
1420 Self {
1421 runtime,
1422 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1423 scale_semaphore: Arc::new(Semaphore::new(10)),
1424 overlay_manager: Some(overlay_manager),
1425 stream_registry: None,
1426 proxy_manager: None,
1427 dns_server: None,
1428 container_dns: None,
1429 deployment_name: Some(deployment_name),
1430 health_states: Arc::new(RwLock::new(HashMap::new())),
1431 job_executor: None,
1432 cron_scheduler: None,
1433 container_supervisor: None,
1434 cluster: None,
1435 }
1436 }
1437
1438 /// Get the health states map for external monitoring
1439 pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
1440 Arc::clone(&self.health_states)
1441 }
1442
1443 /// Update health state for a service
1444 pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1445 let mut states = self.health_states.write().await;
1446 states.insert(service_name.to_string(), state);
1447 }
1448
1449 /// Set the deployment name (used for generating hostnames)
1450 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1451 pub fn set_deployment_name(&mut self, name: String) {
1452 self.deployment_name = Some(name);
1453 }
1454
1455 /// Set the stream registry for L4 proxy integration (TCP/UDP)
1456 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1457 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1458 self.stream_registry = Some(registry);
1459 }
1460
1461 /// Builder pattern: add stream registry for L4 proxy integration
1462 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1463 #[must_use]
1464 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1465 self.stream_registry = Some(registry);
1466 self
1467 }
1468
1469 /// Get the stream registry (if configured)
1470 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1471 self.stream_registry.as_ref()
1472 }
1473
1474 /// Set the overlay manager for container networking
1475 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1476 pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1477 self.overlay_manager = Some(manager);
1478 }
1479
1480 /// Set the proxy manager for health-aware load balancing
1481 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1482 pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1483 self.proxy_manager = Some(proxy);
1484 }
1485
1486 /// Builder pattern: add proxy manager for health-aware load balancing
1487 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1488 #[must_use]
1489 pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1490 self.proxy_manager = Some(proxy);
1491 self
1492 }
1493
1494 /// Get the proxy manager (if configured)
1495 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1496 self.proxy_manager.as_ref()
1497 }
1498
1499 /// Set the DNS server for service discovery
1500 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1501 pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1502 self.dns_server = Some(dns);
1503 }
1504
1505 /// Builder pattern: add DNS server for service discovery
1506 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1507 #[must_use]
1508 pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1509 self.dns_server = Some(dns);
1510 self
1511 }
1512
1513 /// Get the DNS server (if configured)
1514 pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1515 self.dns_server.as_ref()
1516 }
1517
1518 /// Set the job executor for run-to-completion workloads
1519 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1520 pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1521 self.job_executor = Some(executor);
1522 }
1523
1524 /// Set the cron scheduler for time-based job triggers
1525 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1526 pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1527 self.cron_scheduler = Some(scheduler);
1528 }
1529
1530 /// Builder pattern: add job executor
1531 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1532 #[must_use]
1533 pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1534 self.job_executor = Some(executor);
1535 self
1536 }
1537
1538 /// Builder pattern: add cron scheduler
1539 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1540 #[must_use]
1541 pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1542 self.cron_scheduler = Some(scheduler);
1543 self
1544 }
1545
1546 /// Set the cluster handle for cluster-aware scaling.
1547 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1548 pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
1549 self.cluster = Some(cluster);
1550 }
1551
1552 /// Builder pattern: add a cluster handle for cluster-aware scaling.
1553 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1554 #[must_use]
1555 pub fn with_cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1556 self.cluster = Some(cluster);
1557 self
1558 }
1559
1560 /// Get the cluster handle (if configured).
1561 pub fn cluster(&self) -> Option<&Arc<dyn zlayer_scheduler::cluster::Cluster>> {
1562 self.cluster.as_ref()
1563 }
1564
1565 /// Get the job executor (if configured)
1566 pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1567 self.job_executor.as_ref()
1568 }
1569
1570 /// Get the cron scheduler (if configured)
1571 pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1572 self.cron_scheduler.as_ref()
1573 }
1574
1575 /// Set the container supervisor for crash/panic policy enforcement
1576 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1577 pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1578 self.container_supervisor = Some(supervisor);
1579 }
1580
1581 /// Builder pattern: add container supervisor
1582 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1583 #[must_use]
1584 pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1585 self.container_supervisor = Some(supervisor);
1586 self
1587 }
1588
1589 /// Get the container supervisor (if configured)
1590 pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1591 self.container_supervisor.as_ref()
1592 }
1593
1594 /// Start the container supervisor background task
1595 ///
1596 /// This spawns a background task that monitors containers for crashes
1597 /// and enforces the `on_panic` error policy.
1598 ///
1599 /// # Errors
1600 /// Returns an error if no container supervisor is configured.
1601 ///
1602 /// # Returns
1603 /// A `JoinHandle` for the supervisor task.
1604 pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1605 let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1606 AgentError::Configuration("Container supervisor not configured".to_string())
1607 })?;
1608
1609 let supervisor = Arc::clone(supervisor);
1610 Ok(tokio::spawn(async move {
1611 supervisor.run_loop().await;
1612 }))
1613 }
1614
1615 /// Shutdown the container supervisor
1616 pub fn shutdown_container_supervisor(&self) {
1617 if let Some(supervisor) = &self.container_supervisor {
1618 supervisor.shutdown();
1619 }
1620 }
1621
1622 /// Get the supervised state of a container
1623 pub async fn get_container_supervised_state(
1624 &self,
1625 container_id: &ContainerId,
1626 ) -> Option<SupervisedState> {
1627 if let Some(supervisor) = &self.container_supervisor {
1628 supervisor.get_state(container_id).await
1629 } else {
1630 None
1631 }
1632 }
1633
1634 /// Get supervisor events receiver
1635 ///
1636 /// Note: This can only be called once; the receiver is moved to the caller.
1637 pub async fn take_supervisor_events(
1638 &self,
1639 ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1640 if let Some(supervisor) = &self.container_supervisor {
1641 supervisor.take_event_receiver().await
1642 } else {
1643 None
1644 }
1645 }
1646
1647 // ==================== Dependency Orchestration ====================
1648
1649 /// Deploy multiple services respecting their dependency order
1650 ///
1651 /// This method:
1652 /// 1. Builds a dependency graph from the services
1653 /// 2. Validates no cycles exist
1654 /// 3. Computes topological order (services with no deps first)
1655 /// 4. For each service in order, waits for dependencies then starts the service
1656 ///
1657 /// # Arguments
1658 /// * `services` - Map of service name to service specification
1659 ///
1660 /// # Errors
1661 /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1662 /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1663 pub async fn deploy_with_dependencies(
1664 &self,
1665 services: HashMap<String, ServiceSpec>,
1666 ) -> Result<()> {
1667 if services.is_empty() {
1668 return Ok(());
1669 }
1670
1671 // Build dependency graph
1672 let graph = DependencyGraph::build(&services)?;
1673
1674 tracing::info!(
1675 service_count = services.len(),
1676 "Starting deployment with dependency ordering"
1677 );
1678
1679 // Get startup order
1680 let order = graph.startup_order();
1681 tracing::debug!(order = ?order, "Computed startup order");
1682
1683 // Start services in dependency order
1684 for service_name in order {
1685 let service_spec = services
1686 .get(service_name)
1687 .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1688
1689 // Wait for dependencies first
1690 if !service_spec.depends.is_empty() {
1691 tracing::info!(
1692 service = %service_name,
1693 dependency_count = service_spec.depends.len(),
1694 "Waiting for dependencies"
1695 );
1696 self.wait_for_dependencies(service_name, &service_spec.depends)
1697 .await?;
1698 }
1699
1700 // Register and start service
1701 tracing::info!(service = %service_name, "Starting service");
1702 Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
1703
1704 // Get the desired replica count from scale config
1705 let replicas = match &service_spec.scale {
1706 zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1707 zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1708 zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1709 };
1710 self.scale_service(service_name, replicas).await?;
1711
1712 // Mark service as started in health states (Unknown until health check runs)
1713 self.update_health_state(service_name, HealthState::Unknown)
1714 .await;
1715
1716 tracing::info!(
1717 service = %service_name,
1718 replicas = replicas,
1719 "Service started"
1720 );
1721 }
1722
1723 tracing::info!(service_count = services.len(), "Deployment complete");
1724
1725 Ok(())
1726 }
1727
1728 /// Wait for all dependencies of a service to be satisfied
1729 ///
1730 /// # Arguments
1731 /// * `service` - Name of the service waiting for dependencies
1732 /// * `deps` - Slice of dependency specifications
1733 ///
1734 /// # Errors
1735 /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1736 async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1737 let condition_checker = DependencyConditionChecker::new(
1738 Arc::clone(&self.runtime),
1739 Arc::clone(&self.health_states),
1740 None,
1741 );
1742
1743 let waiter = DependencyWaiter::new(condition_checker);
1744 let results = waiter.wait_for_all(deps).await?;
1745
1746 // Check results for failures
1747 for result in results {
1748 match result {
1749 WaitResult::TimedOutFail {
1750 service: dep_service,
1751 condition,
1752 timeout,
1753 } => {
1754 return Err(AgentError::DependencyTimeout {
1755 service: service.to_string(),
1756 dependency: dep_service,
1757 condition: format!("{condition:?}"),
1758 timeout,
1759 });
1760 }
1761 WaitResult::TimedOutWarn {
1762 service: dep_service,
1763 condition,
1764 } => {
1765 tracing::warn!(
1766 service = %service,
1767 dependency = %dep_service,
1768 condition = ?condition,
1769 "Dependency timed out but continuing"
1770 );
1771 }
1772 WaitResult::TimedOutContinue | WaitResult::Satisfied => {
1773 // Continue silently
1774 }
1775 }
1776 }
1777
1778 Ok(())
1779 }
1780
1781 /// Check if all dependencies for a service are currently satisfied
1782 ///
1783 /// This is a one-shot check (no waiting). Useful for pre-flight validation.
1784 ///
1785 /// # Errors
1786 /// Returns an error if a dependency check fails unexpectedly.
1787 pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
1788 let condition_checker = DependencyConditionChecker::new(
1789 Arc::clone(&self.runtime),
1790 Arc::clone(&self.health_states),
1791 None,
1792 );
1793
1794 for dep in deps {
1795 if !condition_checker.check(dep).await? {
1796 return Ok(false);
1797 }
1798 }
1799
1800 Ok(true)
1801 }
1802
1803 /// Add or update a workload (service, job, or cron)
1804 ///
1805 /// This method handles different resource types appropriately:
1806 /// - **Service**: Traditional long-running containers with scaling and health checks
1807 /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
1808 /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
1809 ///
1810 /// # Errors
1811 /// Returns an error if service creation, scaling, or cron registration fails.
1812 #[allow(clippy::too_many_lines)]
1813 pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
1814 match spec.rtype {
1815 ResourceType::Service => {
1816 // Long-running service: create/update instance
1817 let mut services = self.services.write().await;
1818
1819 if let Some(instance) = services.get_mut(&name) {
1820 // Update existing service. We need to:
1821 // 1. Update the in-memory spec (so future scale-ups use the new image).
1822 // 2. Recreate the local replicas when the image actually changed —
1823 // either a different image *reference* (e.g. tag bump
1824 // 1.28 -> 1.29), which is a new image regardless of pull
1825 // policy, or, under Always/Newer, observed *digest* drift on
1826 // the same reference.
1827 // The recreate is LOCAL (`scale_service_local`): `upsert_service`
1828 // runs on whichever node owns the replicas (the leader for its
1829 // own share, each worker via the `/internal/scale` handler). Using
1830 // the cluster-routed `scale_service` here would bounce a worker's
1831 // recreate back to the leader and re-enter dispatch. Cluster-wide
1832 // distribution is the caller's job (orchestrate_deployment + the
1833 // scale dispatch that carries this spec to every node).
1834 let image_changed = instance.spec.image.name != spec.image.name;
1835 instance.spec = spec.clone();
1836 if let Some(dns) = &self.dns_server {
1837 instance.set_dns_server(Arc::clone(dns));
1838 }
1839 // Re-apply overlay resolver injection: the spec was just
1840 // replaced wholesale, so any prior injection on the old
1841 // spec is gone. Honors host_network / user-supplied dns.
1842 if let Some(ip) = self.container_dns {
1843 instance.set_container_dns(ip);
1844 }
1845
1846 let effective = spec.image.pull_policy;
1847 let old_digest = instance.last_pulled_digest().await;
1848 let current_replicas =
1849 u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
1850 drop(services); // Release write lock before pull / scale (which take their own locks).
1851
1852 // A changed image reference always recreates. Same-reference
1853 // refreshes are governed by pull policy + digest drift.
1854 let mut should_recreate = image_changed;
1855 let mut new_digest = old_digest.clone();
1856
1857 match effective {
1858 PullPolicy::Never | PullPolicy::IfNotPresent => {
1859 // No proactive pull. If the reference changed we still
1860 // recreate below; the scale-up path pulls the (absent) new
1861 // image per IfNotPresent. A same-reference redeploy under
1862 // these policies is a genuine no-op.
1863 tracing::debug!(
1864 service = %name,
1865 policy = ?effective,
1866 image_changed,
1867 "re-deploy under no-refresh pull policy"
1868 );
1869 }
1870 PullPolicy::Always | PullPolicy::Newer => {
1871 // Pull (this updates the cached digest as a side-effect).
1872 // We need a read guard to keep the instance alive while
1873 // calling its &self method.
1874 let services_ro = self.services.read().await;
1875 new_digest = if let Some(inst) = services_ro.get(&name) {
1876 inst.pull_and_refresh_digest().await?
1877 } else {
1878 // The service vanished between our write-lock release
1879 // and read-lock acquisition (race with remove_service).
1880 // Treat this as a no-op; the caller will see the removal.
1881 tracing::warn!(
1882 service = %name,
1883 "service removed during upsert; skipping drift recreate"
1884 );
1885 return Ok(());
1886 };
1887 drop(services_ro);
1888
1889 // Always forces a recreate. Newer recreates on digest
1890 // drift. When digests are unknown (runtime doesn't expose
1891 // them), we can't observe drift safely under Newer, so the
1892 // reference check above is the only trigger.
1893 should_recreate = should_recreate
1894 || match effective {
1895 PullPolicy::Always => true,
1896 PullPolicy::Newer => match (&old_digest, &new_digest) {
1897 (Some(old), Some(new)) => old != new,
1898 _ => false,
1899 },
1900 _ => false,
1901 };
1902 }
1903 }
1904
1905 if should_recreate && current_replicas > 0 {
1906 tracing::info!(
1907 service = %name,
1908 policy = ?effective,
1909 image_changed,
1910 old_digest = ?old_digest,
1911 new_digest = ?new_digest,
1912 replicas = current_replicas,
1913 "image changed; performing local rolling recreate"
1914 );
1915 self.scale_service_local(&name, 0).await?;
1916 self.scale_service_local(&name, current_replicas).await?;
1917 tracing::info!(
1918 service = %name,
1919 new_digest = ?new_digest,
1920 "service recreated with refreshed image"
1921 );
1922 } else {
1923 tracing::debug!(
1924 service = %name,
1925 policy = ?effective,
1926 old_digest = ?old_digest,
1927 new_digest = ?new_digest,
1928 "service up to date; no recreate required"
1929 );
1930 }
1931 return Ok(());
1932 }
1933 // Create new service with proxy manager for health-aware load balancing
1934 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
1935 let mut instance = if let Some(proxy) = &self.proxy_manager {
1936 ServiceInstance::with_proxy(
1937 name.clone(),
1938 spec,
1939 self.runtime.clone(),
1940 overlay,
1941 Arc::clone(proxy),
1942 )
1943 } else {
1944 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
1945 };
1946 // Thread the local cluster node id so new `ContainerId`s carry
1947 // owning-node identity. Defaults to `0` in single-node mode.
1948 instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
1949 // Set DNS server if configured
1950 if let Some(dns) = &self.dns_server {
1951 instance.set_dns_server(Arc::clone(dns));
1952 }
1953 // Inject the overlay resolver into the spec so containers use it
1954 // instead of the hijacked host resolv.conf (no-op for
1955 // host_network / user-supplied dns).
1956 if let Some(ip) = self.container_dns {
1957 instance.set_container_dns(ip);
1958 }
1959 // Wire shared health states so callbacks bridge back to ServiceManager
1960 instance.set_health_states(Arc::clone(&self.health_states));
1961 // Register HTTP routes via proxy manager
1962 if let Some(proxy) = &self.proxy_manager {
1963 proxy.add_service(&name, &instance.spec).await;
1964 }
1965 // Register TCP/UDP endpoints in stream registry
1966 if let Some(stream_registry) = &self.stream_registry {
1967 for endpoint in &instance.spec.endpoints {
1968 let svc = StreamService::new(
1969 name.clone(),
1970 Vec::new(), // No backends yet; added on scale-up
1971 );
1972 match endpoint.protocol {
1973 Protocol::Tcp => {
1974 stream_registry.register_tcp(endpoint.port, svc);
1975 tracing::debug!(
1976 service = %name,
1977 port = endpoint.port,
1978 "Registered TCP stream route"
1979 );
1980 }
1981 Protocol::Udp => {
1982 stream_registry.register_udp(endpoint.port, svc);
1983 tracing::debug!(
1984 service = %name,
1985 port = endpoint.port,
1986 "Registered UDP stream route"
1987 );
1988 }
1989 _ => {} // HTTP routes handled by proxy manager
1990 }
1991 }
1992 }
1993 services.insert(name, instance);
1994 }
1995 ResourceType::Job => {
1996 // Job: Just store the spec for later triggering
1997 // Jobs don't start containers immediately; they're triggered on-demand
1998 if let Some(executor) = &self.job_executor {
1999 executor.register_job(&name, spec).await;
2000 tracing::info!(job = %name, "Registered job spec");
2001 } else {
2002 tracing::warn!(
2003 job = %name,
2004 "Job executor not configured, storing as service for reference"
2005 );
2006 // Fallback: store as service instance for reference
2007 let mut services = self.services.write().await;
2008 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
2009 let mut instance = if let Some(proxy) = &self.proxy_manager {
2010 ServiceInstance::with_proxy(
2011 name.clone(),
2012 spec,
2013 self.runtime.clone(),
2014 overlay,
2015 Arc::clone(proxy),
2016 )
2017 } else {
2018 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
2019 };
2020 // Thread the local cluster node id (same as the Service
2021 // branch above) so the fallback-as-service Job entry also
2022 // carries owning-node identity.
2023 instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
2024 // Set DNS server if configured
2025 if let Some(dns) = &self.dns_server {
2026 instance.set_dns_server(Arc::clone(dns));
2027 }
2028 // Inject the overlay resolver (no-op for host_network /
2029 // user-supplied dns).
2030 if let Some(ip) = self.container_dns {
2031 instance.set_container_dns(ip);
2032 }
2033 services.insert(name, instance);
2034 }
2035 }
2036 ResourceType::Cron => {
2037 // Cron: Register with the cron scheduler
2038 if let Some(scheduler) = &self.cron_scheduler {
2039 scheduler.register(&name, &spec).await?;
2040 tracing::info!(cron = %name, "Registered cron job with scheduler");
2041 } else {
2042 return Err(AgentError::Configuration(format!(
2043 "Cron scheduler not configured for cron job '{name}'"
2044 )));
2045 }
2046 }
2047 }
2048
2049 Ok(())
2050 }
2051
2052 /// Update backend addresses via `ProxyManager` after scaling, applying
2053 /// per-endpoint `target_role` filtering.
2054 ///
2055 /// For each L7 endpoint of the service, this collects the subset of
2056 /// containers whose `ContainerId.role` matches `endpoint.target_role`
2057 /// (or all containers when `target_role` is `None`) and updates the
2058 /// proxy's backend pool for that specific endpoint via
2059 /// [`ProxyManager::update_endpoint_backends`].
2060 async fn update_proxy_backends(&self, instance: &ServiceInstance) {
2061 let Some(proxy) = &self.proxy_manager else {
2062 return;
2063 };
2064 for endpoint in &instance.spec.endpoints {
2065 // Only L7 endpoints flow through the proxy (HTTP/HTTPS/WS).
2066 if !matches!(
2067 endpoint.protocol,
2068 Protocol::Http | Protocol::Https | Protocol::Websocket
2069 ) {
2070 continue;
2071 }
2072 let addrs = self.collect_endpoint_backends(instance, endpoint).await;
2073 proxy
2074 .update_endpoint_backends(&instance.service_name, &endpoint.name, addrs)
2075 .await;
2076 }
2077 }
2078
2079 /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
2080 ///
2081 /// For containers with a port override (macOS sandbox), the addresses already
2082 /// carry the runtime-assigned port. In that case, the container listens on the
2083 /// override port for all traffic, so we use the address port directly. For
2084 /// containers without a port override (Linux, VMs), we reconstruct addresses
2085 /// using the endpoint's declared port, since each container has its own IP
2086 /// and can bind any port independently.
2087 async fn update_stream_backends(&self, instance: &ServiceInstance) {
2088 let Some(stream_registry) = &self.stream_registry else {
2089 return;
2090 };
2091
2092 for endpoint in &instance.spec.endpoints {
2093 match endpoint.protocol {
2094 Protocol::Tcp => {
2095 let tcp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2096 let backend_count = tcp_backends.len();
2097 stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
2098 tracing::debug!(
2099 endpoint = %endpoint.name,
2100 port = endpoint.port,
2101 backend_count = backend_count,
2102 target_role = ?endpoint.target_role,
2103 "Updated TCP stream backends"
2104 );
2105 }
2106 Protocol::Udp => {
2107 let udp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2108 let backend_count = udp_backends.len();
2109 stream_registry.update_udp_backends(endpoint.port, udp_backends);
2110 tracing::debug!(
2111 endpoint = %endpoint.name,
2112 port = endpoint.port,
2113 backend_count = backend_count,
2114 target_role = ?endpoint.target_role,
2115 "Updated UDP stream backends"
2116 );
2117 }
2118 _ => {} // HTTP endpoints handled by update_proxy_backends
2119 }
2120 }
2121 }
2122
2123 /// Scale a service. Cluster-aware: if this node has a `Cluster` handle
2124 /// and we're not the leader, forward to the leader; if leader, compute
2125 /// affinity-aware placement and dispatch each node its share via
2126 /// `dispatch_scale_distributed`; else (single-node) just scale locally.
2127 ///
2128 /// # Errors
2129 /// Returns an error if scaling fails on any participating node.
2130 #[allow(clippy::cast_possible_truncation)]
2131 pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
2132 use zlayer_scheduler::cluster::InternalScaleRequest;
2133
2134 tracing::info!(
2135 target: "zlayer::scale_distribute",
2136 service = name,
2137 replicas,
2138 has_cluster = self.cluster.is_some(),
2139 "scale_service ENTER"
2140 );
2141
2142 // Attach the current spec so every receiving node can register/update
2143 // the service before scaling. This is what propagates an image change
2144 // to worker containers and lets a fresh worker run a replica it has
2145 // never seen. `None` if the service isn't registered locally (the
2146 // receiver then falls back to its own cached spec).
2147 let spec = self
2148 .services
2149 .read()
2150 .await
2151 .get(name)
2152 .map(|inst| inst.spec.clone());
2153 let build_req = |replicas: u32| {
2154 let req = InternalScaleRequest::new(name, replicas);
2155 match spec.clone() {
2156 Some(s) => req.with_spec(s),
2157 None => req,
2158 }
2159 };
2160
2161 if let Some(cluster) = &self.cluster {
2162 let is_leader = cluster.is_leader().await;
2163 tracing::info!(
2164 target: "zlayer::scale_distribute",
2165 service = name,
2166 replicas,
2167 is_leader,
2168 spec_affinity = ?spec.as_ref().and_then(|s| s.affinity.clone()),
2169 "scale_service: cluster path"
2170 );
2171 if !is_leader {
2172 // Follower: forward to the leader and let it dispatch.
2173 return cluster
2174 .forward_scale(build_req(replicas))
2175 .await
2176 .map_err(|e| AgentError::CreateFailed {
2177 id: name.to_string(),
2178 reason: format!("cluster forward: {e}"),
2179 });
2180 }
2181
2182 // Leader path. Compute affinity-aware placement across the Ready
2183 // node set and dispatch each node its share. `dispatch_scale_distributed`
2184 // reuses the same placement machinery as one-off container placement
2185 // (`cluster_nodes_to_node_states` + `place_service_replicas`), honoring
2186 // `ServiceSpec.affinity` (`spread`/`pack`/`pin`). The leader's own
2187 // share short-circuits to a local call (no localhost HTTP round-trip),
2188 // and the attached spec lets fresh workers register the service before
2189 // scaling. Single-node clusters fall through the default impl, which
2190 // dispatches everything to this node (unchanged behavior).
2191 return cluster
2192 .dispatch_scale_distributed(build_req(replicas))
2193 .await
2194 .map_err(|e| AgentError::CreateFailed {
2195 id: name.to_string(),
2196 reason: format!("cluster dispatch: {e}"),
2197 });
2198 }
2199
2200 // No cluster handle — single-node mode.
2201 self.scale_service_local(name, replicas).await
2202 }
2203
2204 /// Local (single-node) scale: directly creates/destroys containers on
2205 /// this node only. Called by:
2206 /// - `scale_service` in single-node mode (when `self.cluster` is None).
2207 /// - The `/api/v1/internal/scale` handler (which the leader's
2208 /// `Cluster::dispatch_scale` HTTP-POSTs to, bottoming out the
2209 /// recursive loop on each receiving node).
2210 /// - The cluster impls' `local_dispatch` closure (for the leader's own
2211 /// share — short-circuited to avoid a localhost round-trip).
2212 ///
2213 /// # Errors
2214 /// Returns an error if the service is not found or scaling fails.
2215 #[allow(clippy::cast_possible_truncation)]
2216 pub async fn scale_service_local(&self, name: &str, replicas: u32) -> Result<()> {
2217 tracing::info!(
2218 target: "zlayer::scale_distribute",
2219 service = name,
2220 replicas,
2221 "scale_service_local ENTER"
2222 );
2223 let _permit = self.scale_semaphore.acquire().await;
2224
2225 let services = self.services.read().await;
2226 let Some(instance) = services.get(name) else {
2227 // Draining a service this node never hosted is a no-op (e.g. the
2228 // leader fans out `count=0` to a node to drain it during a
2229 // scale-down, but that node never ran the service).
2230 if replicas == 0 {
2231 return Ok(());
2232 }
2233 return Err(AgentError::NotFound {
2234 container: name.to_string(),
2235 reason: "service not found".to_string(),
2236 });
2237 };
2238
2239 // Get current replica count before scaling
2240 let current_replicas = instance.replica_count().await as u32;
2241
2242 // Perform the scaling operation
2243 instance.scale_to(replicas).await?;
2244
2245 // After scaling, update proxy and stream backends for each endpoint.
2246 // Per-endpoint collection (rather than a single service-wide list)
2247 // is what makes `EndpointSpec.target_role` filtering possible:
2248 // each endpoint receives only the containers whose
2249 // `ContainerId.role` matches its declared role.
2250 if self.proxy_manager.is_some() {
2251 self.update_proxy_backends(instance).await;
2252 }
2253 if self.stream_registry.is_some() {
2254 self.update_stream_backends(instance).await;
2255 }
2256
2257 // Register new containers with supervisor for crash monitoring.
2258 //
2259 // Container ids here must match what `ServiceInstance::scale_to`
2260 // constructed — same role (derived from `replica_groups`) and same
2261 // local node id. Otherwise supervise/unsupervise miss the live entry
2262 // and crash-restart bookkeeping leaks across scale events.
2263 let local_node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2264 if let Some(supervisor) = &self.container_supervisor {
2265 // For scale-up, register new containers
2266 if replicas > current_replicas {
2267 for i in current_replicas..replicas {
2268 let replica_idx = i + 1;
2269 let container_id = ContainerId::with_role_and_node(
2270 name.to_string(),
2271 replica_idx,
2272 instance.role_for_replica(replica_idx),
2273 local_node_id,
2274 );
2275 supervisor.supervise(&container_id, &instance.spec).await;
2276 }
2277 }
2278 // For scale-down, unregister removed containers
2279 if replicas < current_replicas {
2280 for i in replicas..current_replicas {
2281 let replica_idx = i + 1;
2282 let container_id = ContainerId::with_role_and_node(
2283 name.to_string(),
2284 replica_idx,
2285 instance.role_for_replica(replica_idx),
2286 local_node_id,
2287 );
2288 supervisor.unsupervise(&container_id).await;
2289 }
2290 }
2291 }
2292
2293 Ok(())
2294 }
2295
2296 /// Collect backend addresses for a single endpoint of a service.
2297 ///
2298 /// This queries the service instance's containers for their overlay
2299 /// network IP addresses and constructs backend addresses using the
2300 /// endpoint's container target port.
2301 ///
2302 /// Containers are filtered by `endpoint.target_role`:
2303 /// - `None` (default): all containers of the service are eligible
2304 /// (legacy behavior).
2305 /// - `Some(role)`: only containers whose `ContainerId.role` equals
2306 /// `role` are included. Implements
2307 /// [`zlayer_spec::EndpointSpec::target_role`].
2308 ///
2309 /// If a container has a `port_override` (e.g., macOS sandbox where all
2310 /// containers share the host network), that port is used instead of
2311 /// the spec-declared endpoint port. This allows multiple replicas on
2312 /// the same IP (`127.0.0.1`) to be distinguished by port.
2313 async fn collect_endpoint_backends(
2314 &self,
2315 instance: &ServiceInstance,
2316 endpoint: &zlayer_spec::EndpointSpec,
2317 ) -> Vec<SocketAddr> {
2318 let mut addrs = Vec::new();
2319 let endpoint_port = endpoint.target_port();
2320 let containers = instance.containers().read().await;
2321
2322 for (container_id, container) in containers.iter() {
2323 // target_role filter: skip containers whose role doesn't match.
2324 if let Some(required_role) = endpoint.target_role.as_ref() {
2325 if container_id.role != *required_role {
2326 continue;
2327 }
2328 }
2329 let Some(ip) = container.overlay_ip else {
2330 continue;
2331 };
2332 // Use the runtime-assigned port override if present (macOS
2333 // sandbox), otherwise fall back to the endpoint's declared
2334 // target port.
2335 let port = container.port_override.unwrap_or(endpoint_port);
2336 addrs.push(SocketAddr::new(ip, port));
2337 }
2338
2339 // If we expected backends but found none, log a hint so operators
2340 // can debug. Distinguish "no containers" from "role filter
2341 // excluded everything" from "no overlay IPs".
2342 if addrs.is_empty() && !containers.is_empty() {
2343 tracing::warn!(
2344 service = %instance.service_name,
2345 endpoint = %endpoint.name,
2346 target_role = ?endpoint.target_role,
2347 container_count = containers.len(),
2348 "no backends collected for endpoint - either no matching role, no overlay IPs, or filtering excluded all"
2349 );
2350 }
2351
2352 addrs
2353 }
2354
2355 /// Get service replica count
2356 ///
2357 /// # Errors
2358 /// Returns an error if the service is not found.
2359 pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
2360 let services = self.services.read().await;
2361 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
2362 container: name.to_string(),
2363 reason: "service not found".to_string(),
2364 })?;
2365
2366 Ok(instance.replica_count().await)
2367 }
2368
2369 /// Remove a workload (service, job, or cron)
2370 ///
2371 /// This method handles cleanup for different resource types:
2372 /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
2373 /// - **Job**: Unregisters from job executor
2374 /// - **Cron**: Unregisters from cron scheduler
2375 ///
2376 /// # Errors
2377 /// Returns an error if the service cannot be removed or scale-down fails.
2378 pub async fn remove_service(&self, name: &str) -> Result<()> {
2379 // Try to unregister from cron scheduler first
2380 if let Some(scheduler) = &self.cron_scheduler {
2381 scheduler.unregister(name).await;
2382 }
2383
2384 // Try to unregister from job executor
2385 if let Some(executor) = &self.job_executor {
2386 executor.unregister_job(name).await;
2387 }
2388
2389 // Unregister stream routes (TCP/UDP) from the stream registry
2390 if let Some(stream_registry) = &self.stream_registry {
2391 // Need to get the service spec to know which ports to unregister
2392 let services = self.services.read().await;
2393 if let Some(instance) = services.get(name) {
2394 for endpoint in &instance.spec.endpoints {
2395 match endpoint.protocol {
2396 Protocol::Tcp => {
2397 let _ = stream_registry.unregister_tcp(endpoint.port);
2398 tracing::debug!(
2399 service = %name,
2400 port = endpoint.port,
2401 "Unregistered TCP stream route"
2402 );
2403 }
2404 Protocol::Udp => {
2405 let _ = stream_registry.unregister_udp(endpoint.port);
2406 tracing::debug!(
2407 service = %name,
2408 port = endpoint.port,
2409 "Unregistered UDP stream route"
2410 );
2411 }
2412 _ => {} // HTTP routes handled above
2413 }
2414 }
2415 }
2416 drop(services); // Release read lock
2417 }
2418
2419 // Unpublish node-loopback ports for every live replica of this
2420 // service so the loopback listeners are freed (mirror of the
2421 // per-replica unpublish in `ServiceInstance::scale_to`). Gated on the
2422 // spec's policy; recomputes each backend from the container's stored
2423 // runtime-resolved IP and port override.
2424 {
2425 let services = self.services.read().await;
2426 if let Some(instance) = services.get(name) {
2427 if instance.spec.publish_to_node_loopback() {
2428 if let Some(proxy) = instance.proxy_manager() {
2429 let containers = instance.containers().read().await;
2430 for container in containers.values() {
2431 if let Some(ip) = container.overlay_ip {
2432 proxy
2433 .unpublish_loopback_for_container(
2434 &instance.spec,
2435 ip,
2436 container.port_override,
2437 )
2438 .await;
2439 }
2440 }
2441 }
2442 }
2443 }
2444 drop(services); // Release read lock
2445 }
2446
2447 // Unregister containers from the supervisor
2448 if let Some(supervisor) = &self.container_supervisor {
2449 let containers = self.get_service_containers(name).await;
2450 for container_id in containers {
2451 supervisor.unsupervise(&container_id).await;
2452 }
2453 tracing::debug!(service = %name, "Unregistered containers from supervisor");
2454 }
2455
2456 // Clean up DNS records for the service (bare name + FQDNs).
2457 self.cleanup_service_dns(name).await;
2458
2459 // Remove from services map (may or may not exist depending on rtype)
2460 let mut services = self.services.write().await;
2461 if services.remove(name).is_some() {
2462 tracing::debug!(service = %name, "Removed service from manager");
2463 }
2464
2465 Ok(())
2466 }
2467
2468 /// Remove every DNS record this service registered on attach: the bare
2469 /// compose service name (`{service}`), the service-level FQDN
2470 /// (`{service}.service.local`), and each replica's FQDN
2471 /// (`{replica}.{service}.service.local`). Best-effort; failures are logged.
2472 async fn cleanup_service_dns(&self, name: &str) {
2473 let Some(dns) = &self.dns_server else {
2474 return;
2475 };
2476
2477 // Bare compose service-name record (compose discovery).
2478 if let Err(e) = dns.remove_record(name).await {
2479 tracing::warn!(
2480 hostname = %name,
2481 error = %e,
2482 "failed to remove bare service-name DNS record"
2483 );
2484 }
2485
2486 // Service-level FQDN.
2487 let service_hostname = format!("{name}.service.local");
2488 if let Err(e) = dns.remove_record(&service_hostname).await {
2489 tracing::warn!(
2490 hostname = %service_hostname,
2491 error = %e,
2492 "failed to remove service DNS record"
2493 );
2494 } else {
2495 tracing::debug!(hostname = %service_hostname, "removed service DNS record");
2496 }
2497
2498 // Any remaining replica-specific FQDNs.
2499 let services = self.services.read().await;
2500 if let Some(instance) = services.get(name) {
2501 let containers = instance.containers().read().await;
2502 for (id, _) in containers.iter() {
2503 let replica_hostname = format!("{}.{}.service.local", id.replica, name);
2504 if let Err(e) = dns.remove_record(&replica_hostname).await {
2505 tracing::warn!(
2506 hostname = %replica_hostname,
2507 error = %e,
2508 "failed to remove replica DNS record during service removal"
2509 );
2510 }
2511 }
2512 }
2513 }
2514
2515 /// Introspect service infrastructure wiring.
2516 /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
2517 pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
2518 let services = self.services.read().await;
2519 services.get(name).map(|i| {
2520 (
2521 i.has_overlay_manager(),
2522 i.has_proxy_manager(),
2523 i.has_dns_server(),
2524 )
2525 })
2526 }
2527
2528 /// List all services
2529 pub async fn list_services(&self) -> Vec<String> {
2530 self.services.read().await.keys().cloned().collect()
2531 }
2532
2533 /// Get logs for a service, aggregated from all container replicas.
2534 ///
2535 /// # Arguments
2536 /// * `service_name` - Name of the service to fetch logs for
2537 /// * `tail` - Number of lines to return per container (0 = all)
2538 /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
2539 ///
2540 /// # Errors
2541 /// Returns an error if the service or instance is not found.
2542 ///
2543 /// # Returns
2544 /// Structured log entries from all (or specific) container replicas. Each
2545 /// entry has its `service` and `deployment` fields populated when available.
2546 pub async fn get_service_logs(
2547 &self,
2548 service_name: &str,
2549 tail: usize,
2550 instance: Option<&str>,
2551 ) -> Result<Vec<LogEntry>> {
2552 let container_ids = self.get_service_containers(service_name).await;
2553
2554 if container_ids.is_empty() {
2555 return Err(AgentError::NotFound {
2556 container: service_name.to_string(),
2557 reason: "no containers found for service".to_string(),
2558 });
2559 }
2560
2561 // If a specific instance is requested, filter to just that one
2562 let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
2563 if let Ok(replica_num) = inst.parse::<u32>() {
2564 container_ids
2565 .iter()
2566 .filter(|id| id.replica == replica_num)
2567 .collect()
2568 } else {
2569 // Try matching by full container ID string suffix
2570 container_ids
2571 .iter()
2572 .filter(|id| id.to_string().contains(inst))
2573 .collect()
2574 }
2575 } else {
2576 container_ids.iter().collect()
2577 };
2578
2579 if target_ids.is_empty() {
2580 return Err(AgentError::NotFound {
2581 container: format!("{}/{}", service_name, instance.unwrap_or("?")),
2582 reason: "instance not found".to_string(),
2583 });
2584 }
2585
2586 let mut all_entries: Vec<LogEntry> = Vec::new();
2587
2588 for id in &target_ids {
2589 match self.runtime.container_logs(id, tail).await {
2590 Ok(mut entries) => {
2591 // Populate service and deployment metadata on each entry
2592 for entry in &mut entries {
2593 if entry.service.is_none() {
2594 entry.service = Some(service_name.to_string());
2595 }
2596 if entry.deployment.is_none() {
2597 entry.deployment.clone_from(&self.deployment_name);
2598 }
2599 }
2600 all_entries.extend(entries);
2601 }
2602 Err(e) => {
2603 tracing::warn!(
2604 service = service_name,
2605 container = %id,
2606 error = %e,
2607 "Failed to read container logs"
2608 );
2609 }
2610 }
2611 }
2612
2613 Ok(all_entries)
2614 }
2615
2616 /// Get all container IDs for a specific service
2617 ///
2618 /// Returns an empty vector if the service doesn't exist.
2619 ///
2620 /// # Arguments
2621 /// * `service_name` - Name of the service to query
2622 ///
2623 /// # Returns
2624 /// Vector of `ContainerIds` for all replicas of the service
2625 pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
2626 let services = self.services.read().await;
2627 if let Some(instance) = services.get(service_name) {
2628 instance.container_ids().await
2629 } else {
2630 Vec::new()
2631 }
2632 }
2633
2634 /// Get per-container info (id, image, real state, pid, overlay IP) for a
2635 /// specific service.
2636 ///
2637 /// Unlike [`get_service_containers`](Self::get_service_containers) (which
2638 /// returns ids only), this surfaces the REAL image reference and lifecycle
2639 /// state of each live container so the API/`ps` can report them accurately.
2640 ///
2641 /// Returns an empty vector if the service doesn't exist.
2642 pub async fn get_service_container_infos(&self, service_name: &str) -> Vec<ContainerInfo> {
2643 let services = self.services.read().await;
2644 if let Some(instance) = services.get(service_name) {
2645 instance.container_infos().await
2646 } else {
2647 Vec::new()
2648 }
2649 }
2650
2651 /// This node's **local** view of `service` (running replica count, health,
2652 /// containers), used for cluster-wide aggregation. Served by the internal
2653 /// `/api/v1/internal/services/{svc}/state` endpoint and used as the local
2654 /// part of [`Self::cluster_service_states`].
2655 pub async fn local_service_state(
2656 &self,
2657 service: &str,
2658 ) -> zlayer_types::cluster::NodeServiceState {
2659 use zlayer_types::cluster::{ClusterContainerSummary, NodeServiceState};
2660 let node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2661 let infos = self.get_service_container_infos(service).await;
2662 #[allow(clippy::cast_possible_truncation)]
2663 let running = infos
2664 .iter()
2665 .filter(|i| i.state.eq_ignore_ascii_case("running"))
2666 .count() as u32;
2667 // A node running 0 replicas is trivially healthy (it can't drag the
2668 // cluster-wide aggregate). Otherwise require a Healthy health state.
2669 let healthy = if running == 0 {
2670 true
2671 } else {
2672 let states = self.health_states();
2673 let guard = states.read().await;
2674 matches!(guard.get(service), Some(HealthState::Healthy))
2675 };
2676 let containers = infos
2677 .into_iter()
2678 .map(|i| ClusterContainerSummary {
2679 node_id,
2680 id: i.id.to_string(),
2681 service: i.id.service.clone(),
2682 replica: i.id.replica,
2683 image: i.image,
2684 state: i.state,
2685 pid: i.pid,
2686 overlay_ip: i.overlay_ip,
2687 })
2688 .collect();
2689 NodeServiceState {
2690 node_id,
2691 running,
2692 healthy,
2693 containers,
2694 }
2695 }
2696
2697 /// Cluster-wide per-node states for `service`: this node's local view plus
2698 /// every other node's (fetched via the cluster handle's
2699 /// `fetch_remote_service_states`). When not clustered, returns just the
2700 /// local view. This is the source of truth for distributed-service replica
2701 /// counts, health, and the `ps` container listing on the leader.
2702 pub async fn cluster_service_states(
2703 &self,
2704 service: &str,
2705 ) -> Vec<zlayer_types::cluster::NodeServiceState> {
2706 let mut states = vec![self.local_service_state(service).await];
2707 if let Some(cluster) = &self.cluster {
2708 states.extend(cluster.fetch_remote_service_states(service).await);
2709 }
2710 states
2711 }
2712
2713 /// Execute a command inside a running container for a service
2714 ///
2715 /// Picks a specific replica if provided, otherwise uses the first available container.
2716 ///
2717 /// # Arguments
2718 /// * `service_name` - Name of the service
2719 /// * `replica` - Optional replica number to target
2720 /// * `cmd` - Command and arguments to execute
2721 ///
2722 /// # Errors
2723 /// Returns an error if the service or replica is not found, or if exec fails.
2724 ///
2725 /// # Panics
2726 /// Panics if no replica is specified and the container list is unexpectedly empty
2727 /// after the emptiness check (should not happen in practice).
2728 ///
2729 /// # Returns
2730 /// Tuple of (`exit_code`, stdout, stderr)
2731 pub async fn exec_in_container(
2732 &self,
2733 service_name: &str,
2734 replica: Option<u32>,
2735 cmd: &[String],
2736 ) -> Result<(i32, String, String)> {
2737 let container_ids = self.get_service_containers(service_name).await;
2738
2739 if container_ids.is_empty() {
2740 return Err(AgentError::NotFound {
2741 container: service_name.to_string(),
2742 reason: "no containers found for service".to_string(),
2743 });
2744 }
2745
2746 // Pick the target container
2747 let target = if let Some(rep) = replica {
2748 container_ids
2749 .into_iter()
2750 .find(|cid| cid.replica == rep)
2751 .ok_or_else(|| AgentError::NotFound {
2752 container: format!("{service_name}-rep-{rep}"),
2753 reason: format!("replica {rep} not found for service"),
2754 })?
2755 } else {
2756 // Use the first container (lowest replica number)
2757 container_ids.into_iter().next().unwrap()
2758 };
2759
2760 self.runtime.exec(&target, cmd).await
2761 }
2762
2763 /// List every live container across all services, enriched with the data a
2764 /// Docker `ps` row needs and the data the Docker-name resolver needs.
2765 ///
2766 /// For each running container this surfaces the deployment name, the service
2767 /// name, the concrete [`ContainerId`], the compose `container_name:` label
2768 /// (when set, the user-facing Docker name), the real image, the lifecycle
2769 /// state, and the service's published port mappings. Used by the unified
2770 /// container-name resolver and by `docker ps` so compose deployments show up
2771 /// and resolve by their `container_name`.
2772 pub async fn list_container_views(&self) -> Vec<DeploymentContainerView> {
2773 let deployment = self.deployment_name.clone();
2774 let services = self.services.read().await;
2775 let mut out = Vec::new();
2776 for (service_name, instance) in services.iter() {
2777 let container_name = instance
2778 .spec
2779 .labels
2780 .get("com.docker.compose.container_name")
2781 .cloned();
2782 let ports = instance.spec.port_mappings.clone();
2783 for info in instance.container_infos().await {
2784 out.push(DeploymentContainerView {
2785 deployment: deployment.clone(),
2786 service: service_name.clone(),
2787 container_id: info.id,
2788 container_name: container_name.clone(),
2789 image: info.image,
2790 state: info.state,
2791 pid: info.pid,
2792 ports: ports.clone(),
2793 });
2794 }
2795 }
2796 out
2797 }
2798
2799 /// Resolve a Docker-style container name/id to a live deployment
2800 /// [`ContainerId`].
2801 ///
2802 /// Matching precedence (first hit wins):
2803 /// 1. The compose `container_name:` label (e.g. `forgejo-e2e`).
2804 /// 2. The conventional compose names `{deployment}-{service}-{replica}` and
2805 /// `{deployment}_{service}_{replica}` (replica is 1-based, mirroring
2806 /// Docker Compose; `ContainerId.replica` is 0-based so we add 1).
2807 /// 3. The bare service name (`{service}`), targeting its first replica.
2808 /// 4. The [`ContainerId`] `Display` form.
2809 ///
2810 /// Returns `None` when nothing matches a *running* container.
2811 pub async fn resolve_container_name(&self, name: &str) -> Option<ContainerId> {
2812 let views = self.list_container_views().await;
2813 // 1. explicit container_name label.
2814 if let Some(v) = views
2815 .iter()
2816 .find(|v| v.container_name.as_deref() == Some(name))
2817 {
2818 return Some(v.container_id.clone());
2819 }
2820 // 2 & 3. conventional names + bare service name.
2821 for v in &views {
2822 let dep = v.deployment.as_deref().unwrap_or("");
2823 let svc = &v.service;
2824 let rep1 = v.container_id.replica + 1;
2825 let candidates = [
2826 format!("{dep}-{svc}-{rep1}"),
2827 format!("{dep}_{svc}_{rep1}"),
2828 svc.clone(),
2829 ];
2830 if candidates.iter().any(|c| c == name) {
2831 return Some(v.container_id.clone());
2832 }
2833 }
2834 // 4. ContainerId Display form.
2835 for v in &views {
2836 if v.container_id.to_string() == name {
2837 return Some(v.container_id.clone());
2838 }
2839 }
2840 None
2841 }
2842
2843 /// Execute a command in a specific deployment container (by its concrete
2844 /// [`ContainerId`]) honouring Docker `exec` options (`--user`, `-w`, `-e`).
2845 ///
2846 /// Routes through [`Runtime::exec_with_opts`] so runtimes that support
2847 /// dropping to a uid/gid + chdir + env injection (macOS VZ-Linux) apply
2848 /// them; others fall back to a plain buffered exec.
2849 ///
2850 /// # Errors
2851 /// Propagates the runtime's exec error.
2852 pub async fn exec_in_container_id_with_opts(
2853 &self,
2854 id: &ContainerId,
2855 opts: &crate::runtime::ExecOptions,
2856 ) -> Result<(i32, String, String)> {
2857 self.runtime.exec_with_opts(id, opts).await
2858 }
2859
2860 // ==================== Job Management ====================
2861
2862 /// Trigger a job execution
2863 ///
2864 /// # Arguments
2865 /// * `name` - Name of the registered job
2866 /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
2867 ///
2868 /// # Returns
2869 /// The execution ID for tracking the job
2870 ///
2871 /// # Errors
2872 /// - Returns error if job executor is not configured
2873 /// - Returns error if the job is not registered
2874 pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
2875 let executor = self
2876 .job_executor
2877 .as_ref()
2878 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2879
2880 let spec = executor
2881 .get_job_spec(name)
2882 .await
2883 .ok_or_else(|| AgentError::NotFound {
2884 container: name.to_string(),
2885 reason: "job not registered".to_string(),
2886 })?;
2887
2888 executor.trigger(name, &spec, trigger).await
2889 }
2890
2891 /// Get the status of a job execution
2892 ///
2893 /// # Arguments
2894 /// * `id` - The execution ID returned from `trigger_job`
2895 ///
2896 /// # Returns
2897 /// The job execution details, or None if not found
2898 pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
2899 if let Some(executor) = &self.job_executor {
2900 executor.get_execution(id).await
2901 } else {
2902 None
2903 }
2904 }
2905
2906 /// List all executions for a specific job
2907 ///
2908 /// # Arguments
2909 /// * `name` - Name of the job
2910 ///
2911 /// # Returns
2912 /// Vector of job executions for the specified job
2913 pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
2914 if let Some(executor) = &self.job_executor {
2915 executor.list_executions(name).await
2916 } else {
2917 Vec::new()
2918 }
2919 }
2920
2921 /// Cancel a running job execution
2922 ///
2923 /// # Arguments
2924 /// * `id` - The execution ID to cancel
2925 ///
2926 /// # Errors
2927 /// Returns error if job executor is not configured or if cancellation fails
2928 pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
2929 let executor = self
2930 .job_executor
2931 .as_ref()
2932 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
2933
2934 executor.cancel(id).await
2935 }
2936
2937 // ==================== Cron Management ====================
2938
2939 /// Manually trigger a cron job (outside of its schedule)
2940 ///
2941 /// # Arguments
2942 /// * `name` - Name of the cron job
2943 ///
2944 /// # Returns
2945 /// The execution ID for tracking the triggered job
2946 ///
2947 /// # Errors
2948 /// Returns error if cron scheduler is not configured or job not found
2949 pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
2950 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2951 AgentError::Configuration("Cron scheduler not configured".to_string())
2952 })?;
2953
2954 scheduler.trigger_now(name).await
2955 }
2956
2957 /// Enable or disable a cron job
2958 ///
2959 /// # Arguments
2960 /// * `name` - Name of the cron job
2961 /// * `enabled` - Whether to enable or disable the job
2962 pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
2963 if let Some(scheduler) = &self.cron_scheduler {
2964 scheduler.set_enabled(name, enabled).await;
2965 }
2966 }
2967
2968 /// List all registered cron jobs
2969 pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
2970 if let Some(scheduler) = &self.cron_scheduler {
2971 scheduler.list_jobs().await
2972 } else {
2973 Vec::new()
2974 }
2975 }
2976
2977 /// Start the cron scheduler background task
2978 ///
2979 /// This spawns a background task that checks for due cron jobs every second.
2980 /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
2981 ///
2982 /// # Errors
2983 /// Returns error if cron scheduler is not configured
2984 pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
2985 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
2986 AgentError::Configuration("Cron scheduler not configured".to_string())
2987 })?;
2988
2989 let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
2990 Ok(tokio::spawn(async move {
2991 scheduler.run_loop().await;
2992 }))
2993 }
2994
2995 /// Shutdown the cron scheduler
2996 pub fn shutdown_cron(&self) {
2997 if let Some(scheduler) = &self.cron_scheduler {
2998 scheduler.shutdown();
2999 }
3000 }
3001}
3002
3003#[cfg(test)]
3004#[allow(deprecated)]
3005mod tests {
3006 use super::*;
3007 use crate::runtime::MockRuntime;
3008
3009 #[tokio::test]
3010 async fn test_service_manager() {
3011 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3012 let manager = ServiceManager::new(runtime);
3013
3014 // Add service
3015 let spec = mock_spec();
3016 Box::pin(manager.upsert_service("test".to_string(), spec))
3017 .await
3018 .unwrap();
3019
3020 // Scale up
3021 manager.scale_service("test", 3).await.unwrap();
3022
3023 // Check count
3024 let count = manager.service_replica_count("test").await.unwrap();
3025 assert_eq!(count, 3);
3026
3027 // List services
3028 let services = manager.list_services().await;
3029 assert_eq!(services, vec!["test".to_string()]);
3030 }
3031
3032 #[tokio::test]
3033 async fn test_service_manager_basic_lifecycle() {
3034 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3035 let manager = ServiceManager::new(runtime);
3036
3037 // Add service with HTTP endpoint
3038 let spec = mock_spec();
3039 Box::pin(manager.upsert_service("api".to_string(), spec))
3040 .await
3041 .unwrap();
3042
3043 // Scale up
3044 manager.scale_service("api", 2).await.unwrap();
3045
3046 // Check count
3047 let count = manager.service_replica_count("api").await.unwrap();
3048 assert_eq!(count, 2);
3049
3050 // Remove service
3051 manager.remove_service("api").await.unwrap();
3052
3053 // Verify service is gone
3054 let services = manager.list_services().await;
3055 assert!(!services.contains(&"api".to_string()));
3056 }
3057
3058 #[tokio::test]
3059 async fn test_service_manager_with_full_config() {
3060 use tokio::sync::RwLock;
3061
3062 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3063
3064 // Create a mock overlay manager (skip actual network setup)
3065 let overlay_manager = Arc::new(RwLock::new(
3066 OverlayManager::new("test-deployment".to_string(), "test".to_string())
3067 .await
3068 .unwrap(),
3069 ));
3070
3071 let manager =
3072 ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
3073
3074 // Add service
3075 let spec = mock_spec();
3076 Box::pin(manager.upsert_service("web".to_string(), spec))
3077 .await
3078 .unwrap();
3079
3080 // Verify service is registered
3081 let services = manager.list_services().await;
3082 assert!(services.contains(&"web".to_string()));
3083 }
3084
3085 #[test]
3086 fn test_container_state_as_str() {
3087 use crate::runtime::ContainerState;
3088 assert_eq!(ContainerState::Pending.as_str(), "pending");
3089 assert_eq!(ContainerState::Initializing.as_str(), "initializing");
3090 assert_eq!(ContainerState::Running.as_str(), "running");
3091 assert_eq!(ContainerState::Stopping.as_str(), "stopping");
3092 assert_eq!(ContainerState::Exited { code: 0 }.as_str(), "exited");
3093 assert_eq!(
3094 ContainerState::Failed {
3095 reason: "boom".to_string()
3096 }
3097 .as_str(),
3098 "failed"
3099 );
3100 // Display delegates to as_str.
3101 assert_eq!(ContainerState::Running.to_string(), "running");
3102 }
3103
3104 /// A container created from image X must report image X and its real
3105 /// lifecycle state through the new `container_infos` accessor, replacing
3106 /// the previously hardcoded `"running"` / empty-image behavior.
3107 #[tokio::test]
3108 async fn test_container_infos_surfaces_image_and_state() {
3109 use crate::runtime::{Container, ContainerState};
3110
3111 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3112 let manager = ServiceManager::new(runtime);
3113
3114 let spec = mock_spec(); // image name = "test:latest"
3115 let image = spec.image.name.to_string();
3116 Box::pin(manager.upsert_service("web".to_string(), spec))
3117 .await
3118 .unwrap();
3119
3120 // Inject containers directly with distinct states.
3121 {
3122 let services = manager.services.read().await;
3123 let instance = services.get("web").unwrap();
3124 let mut containers = instance.containers().write().await;
3125
3126 let running_id = ContainerId::new("web", 0);
3127 containers.insert(
3128 running_id.clone(),
3129 Container {
3130 id: running_id,
3131 image: image.clone(),
3132 state: ContainerState::Running,
3133 pid: Some(4242),
3134 task: None,
3135 overlay_ip: None,
3136 health_monitor: None,
3137 port_override: None,
3138 },
3139 );
3140
3141 let exited_id = ContainerId::new("web", 1);
3142 containers.insert(
3143 exited_id.clone(),
3144 Container {
3145 id: exited_id,
3146 image: image.clone(),
3147 state: ContainerState::Exited { code: 1 },
3148 pid: None,
3149 task: None,
3150 overlay_ip: None,
3151 health_monitor: None,
3152 port_override: None,
3153 },
3154 );
3155 }
3156
3157 let mut infos = manager.get_service_container_infos("web").await;
3158 infos.sort_by_key(|i| i.id.replica);
3159 assert_eq!(infos.len(), 2);
3160
3161 // Every container reports the real image it was created from.
3162 assert!(infos.iter().all(|i| i.image == image));
3163 assert!(infos.iter().all(|i| i.image == "test:latest"));
3164
3165 // Real per-container state is surfaced (not a hardcoded "running").
3166 assert_eq!(infos[0].state, "running");
3167 assert_eq!(infos[0].pid, Some(4242));
3168 assert_eq!(infos[1].state, "exited");
3169 assert_eq!(infos[1].pid, None);
3170
3171 // Unknown service yields an empty list.
3172 assert!(manager
3173 .get_service_container_infos("missing")
3174 .await
3175 .is_empty());
3176 }
3177
3178 /// Bug 2 (`cluster_upgrade`): a changed image *reference* (tag bump) under
3179 /// `if_not_present` must still recreate the local replicas. Previously the
3180 /// recreate only fired on digest drift under `Always`/`Newer`, so a tag
3181 /// change was silently ignored and containers stayed on the old image.
3182 #[tokio::test]
3183 async fn upsert_recreates_local_replicas_on_image_reference_change() {
3184 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3185 let manager = ServiceManager::new(runtime);
3186
3187 // Deploy v1 with the e2e's pull policy (if_not_present) and scale up.
3188 let mut spec = mock_spec();
3189 spec.image.name = "docker.io/library/nginx:1.28-alpine".parse().unwrap();
3190 spec.image.pull_policy = zlayer_spec::PullPolicy::IfNotPresent;
3191 Box::pin(manager.upsert_service("web".to_string(), spec.clone()))
3192 .await
3193 .unwrap();
3194 manager.scale_service_local("web", 2).await.unwrap();
3195
3196 let v1: Vec<String> = manager
3197 .get_service_container_infos("web")
3198 .await
3199 .into_iter()
3200 .map(|i| i.image)
3201 .collect();
3202 assert_eq!(v1.len(), 2);
3203 assert!(
3204 v1.iter().all(|img| img.contains("1.28")),
3205 "expected v1 images, got {v1:?}"
3206 );
3207
3208 // Upgrade to v2 under the SAME if_not_present policy.
3209 let mut spec_v2 = spec;
3210 spec_v2.image.name = "docker.io/library/nginx:1.29-alpine".parse().unwrap();
3211 Box::pin(manager.upsert_service("web".to_string(), spec_v2))
3212 .await
3213 .unwrap();
3214
3215 let v2: Vec<String> = manager
3216 .get_service_container_infos("web")
3217 .await
3218 .into_iter()
3219 .map(|i| i.image)
3220 .collect();
3221 assert_eq!(v2.len(), 2, "replica count preserved across upgrade");
3222 assert!(
3223 v2.iter().all(|img| img.contains("1.29")),
3224 "containers must be recreated on the new image, got {v2:?}"
3225 );
3226 }
3227
3228 fn mock_spec() -> ServiceSpec {
3229 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3230 r"
3231version: v1
3232deployment: test
3233services:
3234 test:
3235 rtype: service
3236 image:
3237 name: test:latest
3238 endpoints:
3239 - name: http
3240 protocol: http
3241 port: 8080
3242 scale:
3243 mode: fixed
3244 replicas: 1
3245",
3246 )
3247 .unwrap()
3248 .services
3249 .remove("test")
3250 .unwrap()
3251 }
3252
3253 #[test]
3254 fn test_set_container_dns_injects_when_empty() {
3255 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3256 let spec = mock_spec(); // spec.dns defaults to empty, host_network false
3257 let mut instance =
3258 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3259 instance.set_container_dns("10.42.0.1".parse().unwrap());
3260 assert_eq!(instance.spec.dns, vec!["10.42.0.1".to_string()]);
3261 }
3262
3263 #[test]
3264 fn test_set_container_dns_skips_host_network() {
3265 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3266 let mut spec = mock_spec();
3267 spec.host_network = true;
3268 let mut instance =
3269 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3270 instance.set_container_dns("10.42.0.1".parse().unwrap());
3271 assert!(
3272 instance.spec.dns.is_empty(),
3273 "host_network containers must inherit the host resolv.conf"
3274 );
3275 }
3276
3277 #[test]
3278 fn test_set_container_dns_preserves_user_dns() {
3279 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3280 let mut spec = mock_spec();
3281 spec.dns = vec!["1.1.1.1".to_string()];
3282 let mut instance =
3283 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3284 instance.set_container_dns("10.42.0.1".parse().unwrap());
3285 assert_eq!(
3286 instance.spec.dns,
3287 vec!["1.1.1.1".to_string()],
3288 "user-supplied spec.dns must win over the overlay resolver"
3289 );
3290 }
3291
3292 /// Helper to create a `ServiceSpec` with dependencies
3293 fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
3294 let mut spec = mock_spec();
3295 spec.depends = deps;
3296 spec
3297 }
3298
3299 /// Helper to create a `DependsSpec`
3300 fn dep(
3301 service: &str,
3302 condition: zlayer_spec::DependencyCondition,
3303 timeout_ms: u64,
3304 on_timeout: zlayer_spec::TimeoutAction,
3305 ) -> DependsSpec {
3306 DependsSpec {
3307 service: service.to_string(),
3308 condition,
3309 timeout: Some(Duration::from_millis(timeout_ms)),
3310 on_timeout,
3311 }
3312 }
3313
3314 #[tokio::test]
3315 async fn test_deploy_with_dependencies_no_deps() {
3316 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3317 let manager = ServiceManager::new(runtime);
3318
3319 // Services with no dependencies
3320 let mut services = HashMap::new();
3321 services.insert("a".to_string(), mock_spec());
3322 services.insert("b".to_string(), mock_spec());
3323
3324 // Should deploy both without issue
3325 Box::pin(manager.deploy_with_dependencies(services))
3326 .await
3327 .unwrap();
3328
3329 // Both services should be registered
3330 let service_list = manager.list_services().await;
3331 assert_eq!(service_list.len(), 2);
3332 }
3333
3334 #[tokio::test]
3335 async fn test_deploy_with_dependencies_linear() {
3336 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3337 let manager = ServiceManager::new(runtime);
3338
3339 // A -> B -> C (A depends on B, B depends on C)
3340 // All use "started" condition which is satisfied when container is running
3341 let mut services = HashMap::new();
3342 services.insert("c".to_string(), mock_spec());
3343 services.insert(
3344 "b".to_string(),
3345 mock_spec_with_deps(vec![dep(
3346 "c",
3347 zlayer_spec::DependencyCondition::Started,
3348 5000,
3349 zlayer_spec::TimeoutAction::Fail,
3350 )]),
3351 );
3352 services.insert(
3353 "a".to_string(),
3354 mock_spec_with_deps(vec![dep(
3355 "b",
3356 zlayer_spec::DependencyCondition::Started,
3357 5000,
3358 zlayer_spec::TimeoutAction::Fail,
3359 )]),
3360 );
3361
3362 // Should deploy in order: c, b, a
3363 Box::pin(manager.deploy_with_dependencies(services))
3364 .await
3365 .unwrap();
3366
3367 // All services should be registered
3368 let service_list = manager.list_services().await;
3369 assert_eq!(service_list.len(), 3);
3370 }
3371
3372 #[tokio::test]
3373 async fn test_deploy_with_dependencies_cycle_detection() {
3374 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3375 let manager = ServiceManager::new(runtime);
3376
3377 // A -> B -> A (cycle)
3378 let mut services = HashMap::new();
3379 services.insert(
3380 "a".to_string(),
3381 mock_spec_with_deps(vec![dep(
3382 "b",
3383 zlayer_spec::DependencyCondition::Started,
3384 5000,
3385 zlayer_spec::TimeoutAction::Fail,
3386 )]),
3387 );
3388 services.insert(
3389 "b".to_string(),
3390 mock_spec_with_deps(vec![dep(
3391 "a",
3392 zlayer_spec::DependencyCondition::Started,
3393 5000,
3394 zlayer_spec::TimeoutAction::Fail,
3395 )]),
3396 );
3397
3398 // Should fail with cycle detection
3399 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3400 assert!(result.is_err());
3401 let err = result.unwrap_err().to_string();
3402 assert!(err.contains("Cyclic dependency"));
3403 }
3404
3405 #[tokio::test]
3406 async fn test_deploy_with_dependencies_timeout_continue() {
3407 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3408 let manager = ServiceManager::new(runtime);
3409
3410 // A depends on B (healthy), but B never becomes healthy
3411 // Using continue action, so it should proceed anyway
3412 let mut services = HashMap::new();
3413 services.insert("b".to_string(), mock_spec());
3414 services.insert(
3415 "a".to_string(),
3416 mock_spec_with_deps(vec![dep(
3417 "b",
3418 zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
3419 100, // Short timeout
3420 zlayer_spec::TimeoutAction::Continue, // But continue anyway
3421 )]),
3422 );
3423
3424 // Should deploy both despite timeout
3425 Box::pin(manager.deploy_with_dependencies(services))
3426 .await
3427 .unwrap();
3428
3429 let service_list = manager.list_services().await;
3430 assert_eq!(service_list.len(), 2);
3431 }
3432
3433 #[tokio::test]
3434 async fn test_deploy_with_dependencies_timeout_warn() {
3435 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3436 let manager = ServiceManager::new(runtime);
3437
3438 // A depends on B (healthy), but B never becomes healthy
3439 // Using warn action, so it should proceed with a warning
3440 let mut services = HashMap::new();
3441 services.insert("b".to_string(), mock_spec());
3442 services.insert(
3443 "a".to_string(),
3444 mock_spec_with_deps(vec![dep(
3445 "b",
3446 zlayer_spec::DependencyCondition::Healthy,
3447 100,
3448 zlayer_spec::TimeoutAction::Warn,
3449 )]),
3450 );
3451
3452 // Should deploy both despite timeout (with warning)
3453 Box::pin(manager.deploy_with_dependencies(services))
3454 .await
3455 .unwrap();
3456
3457 let service_list = manager.list_services().await;
3458 assert_eq!(service_list.len(), 2);
3459 }
3460
3461 #[tokio::test]
3462 async fn test_deploy_with_dependencies_timeout_fail() {
3463 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3464 let manager = ServiceManager::new(runtime);
3465
3466 // A depends on B (healthy), but B never becomes healthy
3467 // Using fail action, so deployment should fail
3468 let mut services = HashMap::new();
3469 services.insert("b".to_string(), mock_spec());
3470 services.insert(
3471 "a".to_string(),
3472 mock_spec_with_deps(vec![dep(
3473 "b",
3474 zlayer_spec::DependencyCondition::Healthy,
3475 100,
3476 zlayer_spec::TimeoutAction::Fail,
3477 )]),
3478 );
3479
3480 // Should fail after B is started but doesn't become healthy
3481 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3482 assert!(result.is_err());
3483
3484 // B should be started (it has no deps), but A should fail
3485 let err = result.unwrap_err().to_string();
3486 assert!(err.contains("Dependency timeout"));
3487 }
3488
3489 #[tokio::test]
3490 async fn test_check_dependencies_all_satisfied() {
3491 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3492 let manager = ServiceManager::new(runtime);
3493
3494 // Mark a service as healthy
3495 manager
3496 .update_health_state("db", HealthState::Healthy)
3497 .await;
3498
3499 let deps = vec![DependsSpec {
3500 service: "db".to_string(),
3501 condition: zlayer_spec::DependencyCondition::Healthy,
3502 timeout: Some(Duration::from_secs(60)),
3503 on_timeout: zlayer_spec::TimeoutAction::Fail,
3504 }];
3505
3506 let satisfied = manager.check_dependencies(&deps).await.unwrap();
3507 assert!(satisfied);
3508 }
3509
3510 #[tokio::test]
3511 async fn test_check_dependencies_not_satisfied() {
3512 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3513 let manager = ServiceManager::new(runtime);
3514
3515 // Service not healthy (no state set = Unknown)
3516 let deps = vec![DependsSpec {
3517 service: "db".to_string(),
3518 condition: zlayer_spec::DependencyCondition::Healthy,
3519 timeout: Some(Duration::from_secs(60)),
3520 on_timeout: zlayer_spec::TimeoutAction::Fail,
3521 }];
3522
3523 let satisfied = manager.check_dependencies(&deps).await.unwrap();
3524 assert!(!satisfied);
3525 }
3526
3527 #[tokio::test]
3528 async fn test_health_state_tracking() {
3529 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3530 let manager = ServiceManager::new(runtime);
3531
3532 // Update health states
3533 manager
3534 .update_health_state("db", HealthState::Healthy)
3535 .await;
3536 manager
3537 .update_health_state("cache", HealthState::Unknown)
3538 .await;
3539
3540 // Verify states
3541 let states = manager.health_states();
3542 let states_read = states.read().await;
3543
3544 assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
3545 assert!(matches!(
3546 states_read.get("cache"),
3547 Some(HealthState::Unknown)
3548 ));
3549 }
3550
3551 /// Regression test for the stabilization timeout that blocked the raft-e2e
3552 /// `cluster_scaling` / `cluster_upgrade` suites.
3553 ///
3554 /// Previously the callback that bridges a container's health result into the
3555 /// `ServiceManager` `health_states` map was only attached when BOTH a proxy
3556 /// manager AND a reachable overlay IP existed. In degraded-overlay / no-proxy
3557 /// deployments that `if let` was false, so `health_states` was never written,
3558 /// the service stayed `healthy=false` forever, and stabilization timed out
3559 /// even though the container was running and its health check passing.
3560 ///
3561 /// This test drives the real `scale_to` create path with:
3562 /// * NO `proxy_manager` (so `proxy_backend` resolves to None), and
3563 /// * a `Command { command: "true" }` health check (always passes host-side),
3564 /// then asserts the shared `health_states` map receives `Healthy` for the
3565 /// service — proving the bridge fires unconditionally.
3566 ///
3567 /// Gated to `#[cfg(unix)]` because `HealthCheck::Command` is executed via
3568 /// `sh -c <command>` in `crate::health::HealthChecker::check_command`. On
3569 /// Windows hosts without `sh` on PATH (the default Windows CI image), no
3570 /// Command-based health check can ever pass, so the test would fail for
3571 /// reasons unrelated to the bridge it is regression-testing. The bridge
3572 /// behavior under test is platform-agnostic; only the test fixture's
3573 /// "always-passes command" needs a Unix shell.
3574 #[cfg(unix)]
3575 #[tokio::test]
3576 async fn test_health_states_bridge_fires_without_proxy() {
3577 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3578
3579 // Service spec with a host-side command health check that always passes.
3580 // Zero start-grace + a short interval keep the test fast.
3581 let mut spec = mock_spec();
3582 spec.health = zlayer_spec::HealthSpec {
3583 start_grace: Some(Duration::from_millis(0)),
3584 interval: Some(Duration::from_millis(50)),
3585 timeout: Some(Duration::from_secs(5)),
3586 retries: 1,
3587 check: HealthCheck::Command {
3588 command: "true".to_string(),
3589 },
3590 };
3591
3592 // Build a ServiceInstance with NO proxy_manager and NO overlay_manager,
3593 // then wire in the shared health_states map exactly as ServiceManager does.
3594 let mut instance =
3595 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3596 let health_states: Arc<RwLock<HashMap<String, HealthState>>> =
3597 Arc::new(RwLock::new(HashMap::new()));
3598 instance.set_health_states(Arc::clone(&health_states));
3599
3600 // Drive the real create path (no proxy, MockRuntime IP present but proxy
3601 // absent => proxy_backend is None, hitting the previously-broken branch).
3602 instance.scale_to(1).await.unwrap();
3603
3604 // Poll for the bridged Healthy state (the monitor checks asynchronously
3605 // after its start grace). Bounded so a regression fails fast.
3606 let mut bridged = false;
3607 for _ in 0..100 {
3608 if matches!(
3609 health_states.read().await.get("web"),
3610 Some(HealthState::Healthy)
3611 ) {
3612 bridged = true;
3613 break;
3614 }
3615 tokio::time::sleep(Duration::from_millis(50)).await;
3616 }
3617
3618 assert!(
3619 bridged,
3620 "health_states must receive Healthy for the service even without a \
3621 proxy or overlay IP; the bridge regressed and stabilization would time out"
3622 );
3623 }
3624
3625 // ==================== Job/Cron Integration Tests ====================
3626
3627 fn mock_job_spec() -> ServiceSpec {
3628 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3629 r"
3630version: v1
3631deployment: test
3632services:
3633 backup:
3634 rtype: job
3635 image:
3636 name: backup:latest
3637",
3638 )
3639 .unwrap()
3640 .services
3641 .remove("backup")
3642 .unwrap()
3643 }
3644
3645 fn mock_cron_spec() -> ServiceSpec {
3646 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3647 r#"
3648version: v1
3649deployment: test
3650services:
3651 cleanup:
3652 rtype: cron
3653 schedule: "0 0 * * * * *"
3654 image:
3655 name: cleanup:latest
3656"#,
3657 )
3658 .unwrap()
3659 .services
3660 .remove("cleanup")
3661 .unwrap()
3662 }
3663
3664 #[tokio::test]
3665 async fn test_service_manager_with_job_executor() {
3666 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3667 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3668
3669 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3670
3671 // Register job
3672 let job_spec = mock_job_spec();
3673 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3674 .await
3675 .unwrap();
3676
3677 // Trigger job
3678 let exec_id = manager
3679 .trigger_job("backup", JobTrigger::Cli)
3680 .await
3681 .unwrap();
3682
3683 // Give job time to start
3684 tokio::time::sleep(Duration::from_millis(50)).await;
3685
3686 // Check execution exists
3687 let execution = manager.get_job_execution(&exec_id).await;
3688 assert!(execution.is_some());
3689 assert_eq!(execution.unwrap().job_name, "backup");
3690 }
3691
3692 #[tokio::test]
3693 async fn test_service_manager_with_cron_scheduler() {
3694 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3695 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3696 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3697
3698 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3699
3700 // Register cron job
3701 let cron_spec = mock_cron_spec();
3702 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3703 .await
3704 .unwrap();
3705
3706 // List cron jobs
3707 let cron_jobs = manager.list_cron_jobs().await;
3708 assert_eq!(cron_jobs.len(), 1);
3709 assert_eq!(cron_jobs[0].name, "cleanup");
3710 assert!(cron_jobs[0].enabled);
3711 }
3712
3713 #[tokio::test]
3714 async fn test_service_manager_trigger_cron() {
3715 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3716 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3717 let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
3718
3719 let manager = ServiceManager::new(runtime)
3720 .with_job_executor(job_executor)
3721 .with_cron_scheduler(cron_scheduler);
3722
3723 // Register cron job
3724 let cron_spec = mock_cron_spec();
3725 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3726 .await
3727 .unwrap();
3728
3729 // Manually trigger the cron job
3730 let exec_id = manager.trigger_cron("cleanup").await.unwrap();
3731 assert!(!exec_id.0.is_empty());
3732 }
3733
3734 #[tokio::test]
3735 async fn test_service_manager_enable_disable_cron() {
3736 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3737 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3738 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3739
3740 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3741
3742 // Register cron job
3743 let cron_spec = mock_cron_spec();
3744 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3745 .await
3746 .unwrap();
3747
3748 // Initially enabled
3749 let cron_jobs = manager.list_cron_jobs().await;
3750 assert!(cron_jobs[0].enabled);
3751
3752 // Disable
3753 manager.set_cron_enabled("cleanup", false).await;
3754 let cron_jobs = manager.list_cron_jobs().await;
3755 assert!(!cron_jobs[0].enabled);
3756
3757 // Re-enable
3758 manager.set_cron_enabled("cleanup", true).await;
3759 let cron_jobs = manager.list_cron_jobs().await;
3760 assert!(cron_jobs[0].enabled);
3761 }
3762
3763 #[tokio::test]
3764 async fn test_service_manager_remove_cleans_up_job() {
3765 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3766 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3767
3768 let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
3769
3770 // Register job
3771 let job_spec = mock_job_spec();
3772 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3773 .await
3774 .unwrap();
3775
3776 // Verify job is registered
3777 let spec = job_executor.get_job_spec("backup").await;
3778 assert!(spec.is_some());
3779
3780 // Remove job
3781 manager.remove_service("backup").await.unwrap();
3782
3783 // Verify job is unregistered
3784 let spec = job_executor.get_job_spec("backup").await;
3785 assert!(spec.is_none());
3786 }
3787
3788 #[tokio::test]
3789 async fn test_service_manager_remove_cleans_up_cron() {
3790 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3791 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3792 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3793
3794 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
3795
3796 // Register cron job
3797 let cron_spec = mock_cron_spec();
3798 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3799 .await
3800 .unwrap();
3801
3802 // Verify cron job is registered
3803 assert_eq!(cron_scheduler.job_count().await, 1);
3804
3805 // Remove cron job
3806 manager.remove_service("cleanup").await.unwrap();
3807
3808 // Verify cron job is unregistered
3809 assert_eq!(cron_scheduler.job_count().await, 0);
3810 }
3811
3812 #[tokio::test]
3813 async fn test_service_manager_job_without_executor() {
3814 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3815 let manager = ServiceManager::new(runtime);
3816
3817 // Try to trigger job without executor configured
3818 let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
3819 assert!(result.is_err());
3820 assert!(result.unwrap_err().to_string().contains("not configured"));
3821 }
3822
3823 #[tokio::test]
3824 async fn test_service_manager_cron_without_scheduler() {
3825 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3826 let manager = ServiceManager::new(runtime);
3827
3828 // Try to register cron job without scheduler configured
3829 let cron_spec = mock_cron_spec();
3830 let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
3831 assert!(result.is_err());
3832 assert!(result.unwrap_err().to_string().contains("not configured"));
3833 }
3834
3835 #[tokio::test]
3836 async fn test_service_manager_list_job_executions() {
3837 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3838 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3839
3840 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3841
3842 // Register job
3843 let job_spec = mock_job_spec();
3844 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3845 .await
3846 .unwrap();
3847
3848 // Trigger job twice
3849 manager
3850 .trigger_job("backup", JobTrigger::Cli)
3851 .await
3852 .unwrap();
3853 manager
3854 .trigger_job("backup", JobTrigger::Scheduler)
3855 .await
3856 .unwrap();
3857
3858 // Give jobs time to start
3859 tokio::time::sleep(Duration::from_millis(50)).await;
3860
3861 // List executions
3862 let executions = manager.list_job_executions("backup").await;
3863 assert_eq!(executions.len(), 2);
3864 }
3865
3866 // ==================== Container Supervisor Integration Tests ====================
3867
3868 #[tokio::test]
3869 async fn test_service_manager_with_supervisor() {
3870 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3871 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3872
3873 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
3874
3875 // Add service
3876 let spec = mock_spec();
3877 Box::pin(manager.upsert_service("api".to_string(), spec))
3878 .await
3879 .unwrap();
3880
3881 // Scale up - containers should be registered with supervisor
3882 manager.scale_service("api", 2).await.unwrap();
3883
3884 // Verify containers are supervised
3885 assert_eq!(supervisor.supervised_count().await, 2);
3886
3887 // Scale down - containers should be unregistered
3888 manager.scale_service("api", 1).await.unwrap();
3889 assert_eq!(supervisor.supervised_count().await, 1);
3890
3891 // Remove service - remaining containers should be unregistered
3892 manager.remove_service("api").await.unwrap();
3893 assert_eq!(supervisor.supervised_count().await, 0);
3894 }
3895
3896 #[tokio::test]
3897 async fn test_service_manager_supervisor_state() {
3898 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3899 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3900
3901 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
3902
3903 // Add and scale service
3904 let spec = mock_spec();
3905 Box::pin(manager.upsert_service("web".to_string(), spec))
3906 .await
3907 .unwrap();
3908 manager.scale_service("web", 1).await.unwrap();
3909
3910 // Check supervised state
3911 let container_id = ContainerId::new("web".to_string(), 1);
3912 let state = manager.get_container_supervised_state(&container_id).await;
3913 assert_eq!(state, Some(SupervisedState::Running));
3914 }
3915
3916 #[tokio::test]
3917 async fn test_service_manager_start_supervisor() {
3918 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3919 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
3920
3921 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
3922
3923 // Start the supervisor
3924 let handle = manager.start_container_supervisor().unwrap();
3925
3926 // Give it time to start
3927 tokio::time::sleep(Duration::from_millis(50)).await;
3928 assert!(supervisor.is_running());
3929
3930 // Shutdown
3931 manager.shutdown_container_supervisor();
3932
3933 // Wait for it to stop
3934 tokio::time::timeout(Duration::from_secs(1), handle)
3935 .await
3936 .unwrap()
3937 .unwrap();
3938
3939 assert!(!supervisor.is_running());
3940 }
3941
3942 #[tokio::test]
3943 async fn test_service_manager_supervisor_not_configured() {
3944 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3945 let manager = ServiceManager::new(runtime);
3946
3947 // Try to start supervisor without configuring it
3948 let result = manager.start_container_supervisor();
3949 assert!(result.is_err());
3950 assert!(result.unwrap_err().to_string().contains("not configured"));
3951 }
3952
3953 // ==================== Stream Registry Integration Tests ====================
3954
3955 fn mock_tcp_spec() -> ServiceSpec {
3956 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3957 r"
3958version: v1
3959deployment: test
3960services:
3961 database:
3962 rtype: service
3963 image:
3964 name: postgres:latest
3965 endpoints:
3966 - name: postgresql
3967 protocol: tcp
3968 port: 5432
3969 scale:
3970 mode: fixed
3971 replicas: 1
3972",
3973 )
3974 .unwrap()
3975 .services
3976 .remove("database")
3977 .unwrap()
3978 }
3979
3980 fn mock_udp_spec() -> ServiceSpec {
3981 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3982 r"
3983version: v1
3984deployment: test
3985services:
3986 dns:
3987 rtype: service
3988 image:
3989 name: dns:latest
3990 endpoints:
3991 - name: dns
3992 protocol: udp
3993 port: 53
3994 scale:
3995 mode: fixed
3996 replicas: 1
3997",
3998 )
3999 .unwrap()
4000 .services
4001 .remove("dns")
4002 .unwrap()
4003 }
4004
4005 fn mock_mixed_spec() -> ServiceSpec {
4006 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4007 r"
4008version: v1
4009deployment: test
4010services:
4011 mixed:
4012 rtype: service
4013 image:
4014 name: mixed:latest
4015 endpoints:
4016 - name: http
4017 protocol: http
4018 port: 8080
4019 - name: grpc
4020 protocol: tcp
4021 port: 9000
4022 - name: metrics
4023 protocol: udp
4024 port: 8125
4025 scale:
4026 mode: fixed
4027 replicas: 1
4028",
4029 )
4030 .unwrap()
4031 .services
4032 .remove("mixed")
4033 .unwrap()
4034 }
4035
4036 #[tokio::test]
4037 async fn test_service_manager_with_stream_registry_tcp() {
4038 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4039 let stream_registry = Arc::new(StreamRegistry::new());
4040
4041 let mut manager = ServiceManager::new(runtime);
4042 manager.set_stream_registry(stream_registry.clone());
4043 manager.set_deployment_name("test".to_string());
4044
4045 // Add TCP-only service
4046 let spec = mock_tcp_spec();
4047 Box::pin(manager.upsert_service("database".to_string(), spec))
4048 .await
4049 .unwrap();
4050
4051 // Verify TCP route was registered
4052 assert_eq!(stream_registry.tcp_count(), 1);
4053 assert!(stream_registry.tcp_ports().contains(&5432));
4054
4055 // Remove service and verify cleanup
4056 manager.remove_service("database").await.unwrap();
4057 assert_eq!(stream_registry.tcp_count(), 0);
4058 }
4059
4060 #[tokio::test]
4061 async fn test_service_manager_with_stream_registry_udp() {
4062 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4063 let stream_registry = Arc::new(StreamRegistry::new());
4064
4065 let mut manager = ServiceManager::new(runtime);
4066 manager.set_stream_registry(stream_registry.clone());
4067 manager.set_deployment_name("test".to_string());
4068
4069 // Add UDP-only service
4070 let spec = mock_udp_spec();
4071 Box::pin(manager.upsert_service("dns".to_string(), spec))
4072 .await
4073 .unwrap();
4074
4075 // Verify UDP route was registered
4076 assert_eq!(stream_registry.udp_count(), 1);
4077 assert!(stream_registry.udp_ports().contains(&53));
4078
4079 // Remove service and verify cleanup
4080 manager.remove_service("dns").await.unwrap();
4081 assert_eq!(stream_registry.udp_count(), 0);
4082 }
4083
4084 #[tokio::test]
4085 async fn test_service_manager_with_stream_registry_mixed() {
4086 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4087 let stream_registry = Arc::new(StreamRegistry::new());
4088
4089 let mut manager = ServiceManager::new(runtime);
4090 manager.set_stream_registry(stream_registry.clone());
4091 manager.set_deployment_name("test".to_string());
4092
4093 // Add mixed service (HTTP + TCP + UDP)
4094 let spec = mock_mixed_spec();
4095 Box::pin(manager.upsert_service("mixed".to_string(), spec))
4096 .await
4097 .unwrap();
4098
4099 // Verify stream routes were registered
4100 assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
4101 assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
4102
4103 assert!(stream_registry.tcp_ports().contains(&9000));
4104 assert!(stream_registry.udp_ports().contains(&8125));
4105
4106 // Remove service and verify stream cleanup
4107 manager.remove_service("mixed").await.unwrap();
4108 assert_eq!(stream_registry.tcp_count(), 0);
4109 assert_eq!(stream_registry.udp_count(), 0);
4110 }
4111
4112 #[tokio::test]
4113 async fn test_service_manager_stream_registry_builder() {
4114 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4115 let stream_registry = Arc::new(StreamRegistry::new());
4116
4117 // Test builder pattern
4118 let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
4119
4120 // Verify stream registry is accessible
4121 assert!(manager.stream_registry().is_some());
4122 }
4123
4124 #[tokio::test]
4125 async fn test_tcp_service_without_stream_registry() {
4126 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4127
4128 // Manager without stream registry
4129 let mut manager = ServiceManager::new(runtime);
4130 manager.set_deployment_name("test".to_string());
4131
4132 // Add TCP service - should log warning but not fail
4133 let spec = mock_tcp_spec();
4134 Box::pin(manager.upsert_service("database".to_string(), spec))
4135 .await
4136 .unwrap();
4137
4138 // No stream registry to check, but service should be tracked
4139 let services = manager.list_services().await;
4140 assert!(services.contains(&"database".to_string()));
4141 }
4142
4143 /// Verify `collect_endpoint_backends` filters containers by
4144 /// `EndpointSpec.target_role`.
4145 ///
4146 /// Given two replica groups (`primary` × 1, `read` × 2) and two
4147 /// endpoints — one with `target_role: primary` and one with
4148 /// `target_role: read` — each endpoint should receive only the
4149 /// matching containers' overlay addresses. The legacy no-filter
4150 /// endpoint (`target_role: None`) should receive all of them.
4151 #[tokio::test]
4152 #[allow(clippy::too_many_lines)]
4153 async fn test_collect_endpoint_backends_respects_target_role() {
4154 use crate::runtime::Container;
4155 use std::collections::HashMap as StdHashMap;
4156 use std::net::{IpAddr, Ipv4Addr};
4157 use zlayer_spec::{
4158 EndpointSpec, ExposeType, GroupAffinity, Protocol, ReplicaGroup, ScaleSpec,
4159 };
4160
4161 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4162 let manager = ServiceManager::new(runtime.clone());
4163
4164 // Build a spec with replica_groups and three endpoints:
4165 // - "write" targets role "primary"
4166 // - "read" targets role "read"
4167 // - "any" has no target_role (legacy)
4168 let mut spec = mock_spec();
4169 spec.replica_groups = Some(vec![
4170 ReplicaGroup {
4171 role: "primary".to_string(),
4172 count: 1,
4173 image: None,
4174 env: StdHashMap::new(),
4175 command: None,
4176 resources: None,
4177 affinity: GroupAffinity::default(),
4178 },
4179 ReplicaGroup {
4180 role: "read".to_string(),
4181 count: 2,
4182 image: None,
4183 env: StdHashMap::new(),
4184 command: None,
4185 resources: None,
4186 affinity: GroupAffinity::default(),
4187 },
4188 ]);
4189 spec.scale = ScaleSpec::Fixed { replicas: 3 };
4190 spec.endpoints = vec![
4191 EndpointSpec {
4192 name: "write".to_string(),
4193 protocol: Protocol::Tcp,
4194 port: 5432,
4195 target_port: Some(5432),
4196 path: None,
4197 host: None,
4198 expose: ExposeType::Internal,
4199 stream: None,
4200 tunnel: None,
4201 target_role: Some("primary".to_string()),
4202 },
4203 EndpointSpec {
4204 name: "read".to_string(),
4205 protocol: Protocol::Tcp,
4206 port: 5433,
4207 target_port: Some(5432),
4208 path: None,
4209 host: None,
4210 expose: ExposeType::Internal,
4211 stream: None,
4212 tunnel: None,
4213 target_role: Some("read".to_string()),
4214 },
4215 EndpointSpec {
4216 name: "any".to_string(),
4217 protocol: Protocol::Tcp,
4218 port: 5434,
4219 target_port: Some(5432),
4220 path: None,
4221 host: None,
4222 expose: ExposeType::Internal,
4223 stream: None,
4224 tunnel: None,
4225 target_role: None,
4226 },
4227 ];
4228
4229 let instance = ServiceInstance::new(
4230 "postgres".to_string(),
4231 spec.clone(),
4232 runtime,
4233 None, // overlay_manager — not exercised by this test
4234 );
4235
4236 // Inject three containers directly: one primary, two read replicas.
4237 let cid_primary = ContainerId::with_role_and_node("postgres", 1, "primary", 0);
4238 let cid_first_read = ContainerId::with_role_and_node("postgres", 2, "read", 0);
4239 let cid_second_read = ContainerId::with_role_and_node("postgres", 3, "read", 0);
4240
4241 let ip_primary = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
4242 let ip_first_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2));
4243 let ip_second_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3));
4244
4245 {
4246 let mut containers = instance.containers().write().await;
4247 containers.insert(
4248 cid_primary.clone(),
4249 Container {
4250 id: cid_primary,
4251 image: spec.image.name.to_string(),
4252 state: crate::runtime::ContainerState::Running,
4253 pid: None,
4254 task: None,
4255 overlay_ip: Some(ip_primary),
4256 health_monitor: None,
4257 port_override: None,
4258 },
4259 );
4260 containers.insert(
4261 cid_first_read.clone(),
4262 Container {
4263 id: cid_first_read,
4264 image: spec.image.name.to_string(),
4265 state: crate::runtime::ContainerState::Running,
4266 pid: None,
4267 task: None,
4268 overlay_ip: Some(ip_first_read),
4269 health_monitor: None,
4270 port_override: None,
4271 },
4272 );
4273 containers.insert(
4274 cid_second_read.clone(),
4275 Container {
4276 id: cid_second_read,
4277 image: spec.image.name.to_string(),
4278 state: crate::runtime::ContainerState::Running,
4279 pid: None,
4280 task: None,
4281 overlay_ip: Some(ip_second_read),
4282 health_monitor: None,
4283 port_override: None,
4284 },
4285 );
4286 }
4287
4288 let write_ep = &spec.endpoints[0];
4289 let read_ep = &spec.endpoints[1];
4290 let any_ep = &spec.endpoints[2];
4291
4292 let write_backends = manager.collect_endpoint_backends(&instance, write_ep).await;
4293 let read_backends = manager.collect_endpoint_backends(&instance, read_ep).await;
4294 let any_backends = manager.collect_endpoint_backends(&instance, any_ep).await;
4295
4296 // write endpoint -> only the primary container
4297 assert_eq!(write_backends.len(), 1, "write should match only primary");
4298 assert!(
4299 write_backends.iter().any(|a| a.ip() == ip_primary),
4300 "write backends missing primary IP: {write_backends:?}"
4301 );
4302
4303 // read endpoint -> both read containers, no primary
4304 assert_eq!(
4305 read_backends.len(),
4306 2,
4307 "read should match both read replicas"
4308 );
4309 assert!(read_backends.iter().any(|a| a.ip() == ip_first_read));
4310 assert!(read_backends.iter().any(|a| a.ip() == ip_second_read));
4311 assert!(
4312 !read_backends.iter().any(|a| a.ip() == ip_primary),
4313 "read backends must not contain primary: {read_backends:?}"
4314 );
4315
4316 // legacy endpoint (target_role = None) -> every container
4317 assert_eq!(
4318 any_backends.len(),
4319 3,
4320 "any-role endpoint should see all containers"
4321 );
4322 }
4323}