zlayer_agent/service.rs
1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6 DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::{IpAddr, SocketAddr};
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType, ServiceSpec};
24
25/// Service instance manages a single service's containers
26pub struct ServiceInstance {
27 pub service_name: String,
28 pub spec: ServiceSpec,
29 runtime: Arc<dyn Runtime + Send + Sync>,
30 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
31 /// Overlay network manager for container networking (optional, not needed for Docker runtime)
32 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
33 /// Proxy manager for updating backend health (optional)
34 proxy_manager: Option<Arc<ProxyManager>>,
35 /// DNS server for service discovery (optional)
36 dns_server: Option<Arc<DnsServer>>,
37 /// Container-injectable overlay resolver IP (optional). When set, this
38 /// node's overlay DNS server is reachable on `<ip>:53` and we inject it
39 /// into the container's resolv.conf so workloads resolve through the
40 /// overlay instead of inheriting the host's resolv.conf.
41 container_dns: Option<IpAddr>,
42 /// Shared health states map so callbacks can update ServiceManager-level health
43 health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
44 /// Most recently observed image digest after a successful pull. Used by
45 /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
46 /// requiring callers to track digest state externally. Wrapped in a
47 /// `RwLock` so `&self` methods (`scale_to`) can update it.
48 last_pulled_digest: tokio::sync::RwLock<Option<String>>,
49 /// Local cluster node id used when constructing new `ContainerId`s during
50 /// scale-up. `0` in single-node deployments or when the cluster handle is
51 /// not yet wired. Populated by `ServiceManager` from `Cluster::node_id()`
52 /// at instance construction time.
53 node_id: u64,
54 /// Owning deployment name (the `zlayer up` / deploy request's deployment),
55 /// when known. Threaded from `ServiceSpec.deployment` by `upsert_service`.
56 ///
57 /// Used to scope service-discovery DNS per-deployment: records are
58 /// registered as `{service}.{deployment}` / `{service}.service.{deployment}`
59 /// (within the daemon zone) and the container's resolv.conf gets a
60 /// per-deployment `search` domain so a bare `<svc>` / `<svc>.service` query
61 /// resolves to THIS deployment's instance and never clobbers another
62 /// deployment that happens to share a service name. `None` for standalone /
63 /// single-deployment callers (falls back to the daemon's global zone).
64 deployment: Option<String>,
65 /// Whether THIS node holds the standing HTTP/HTTPS ingress on
66 /// `0.0.0.0:80` / `0.0.0.0:443` (mirrors `NodeConfig.ingress`). When `true`
67 /// and the node has an overlay IP, external service domains
68 /// (`EndpointSpec.host`) are resolved to this node's own overlay IP;
69 /// otherwise the selected ingress peer's overlay IP is used. Defaults to
70 /// `false`. Threaded from `ServiceManager`.
71 ingress_enabled: bool,
72 /// Cluster handle used to select an ingress-capable peer's overlay IP when
73 /// THIS node is not itself the ingress. `None` in standalone / single-node
74 /// mode (the funnel is then THIS node when it is ingress-capable).
75 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
76}
77
78impl ServiceInstance {
79 /// Create a new service instance
80 pub fn new(
81 service_name: String,
82 spec: ServiceSpec,
83 runtime: Arc<dyn Runtime + Send + Sync>,
84 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
85 ) -> Self {
86 let deployment = spec.deployment.clone();
87 Self {
88 service_name,
89 spec,
90 runtime,
91 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
92 overlay_manager,
93 proxy_manager: None,
94 dns_server: None,
95 container_dns: None,
96 health_states: None,
97 last_pulled_digest: tokio::sync::RwLock::new(None),
98 node_id: 0,
99 deployment,
100 ingress_enabled: false,
101 cluster: None,
102 }
103 }
104
105 /// Create a new service instance with proxy manager for health-aware load balancing
106 pub fn with_proxy(
107 service_name: String,
108 spec: ServiceSpec,
109 runtime: Arc<dyn Runtime + Send + Sync>,
110 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
111 proxy_manager: Arc<ProxyManager>,
112 ) -> Self {
113 let deployment = spec.deployment.clone();
114 Self {
115 service_name,
116 spec,
117 runtime,
118 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
119 overlay_manager,
120 proxy_manager: Some(proxy_manager),
121 dns_server: None,
122 container_dns: None,
123 health_states: None,
124 last_pulled_digest: tokio::sync::RwLock::new(None),
125 node_id: 0,
126 deployment,
127 ingress_enabled: false,
128 cluster: None,
129 }
130 }
131
132 /// Set the local cluster node id. Used by `ServiceManager` to thread
133 /// `Cluster::node_id()` down to container construction so new
134 /// `ContainerId`s carry the owning node identity. Defaults to `0` (the
135 /// single-node sentinel) when unset.
136 pub fn set_node_id(&mut self, node_id: u64) {
137 self.node_id = node_id;
138 }
139
140 /// Set the owning deployment name for service-discovery DNS scoping.
141 ///
142 /// Idempotent with construction: the constructors already capture
143 /// `spec.deployment`, but `ServiceManager` calls this so a deployment
144 /// stamped after the fact (or via a different code path) is honored.
145 pub fn set_deployment(&mut self, deployment: Option<String>) {
146 self.deployment = deployment;
147 }
148
149 /// Set whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
150 /// `NodeConfig.ingress`). Threaded by `ServiceManager` so external service
151 /// domains can be resolved to this node's overlay IP when it is the funnel.
152 pub fn set_ingress_enabled(&mut self, enabled: bool) {
153 self.ingress_enabled = enabled;
154 }
155
156 /// Set the cluster handle used to select an ingress-capable peer's overlay
157 /// IP when THIS node is not itself the ingress.
158 pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
159 self.cluster = Some(cluster);
160 }
161
162 /// Resolve the overlay IP of the node that should serve external domains for
163 /// this service (the ingress funnel), as an `IpAddr`:
164 ///
165 /// 1. If THIS node is ingress-capable and has an overlay IP, use it.
166 /// 2. Otherwise ask the cluster for the deterministically-selected ingress
167 /// peer's overlay IP.
168 /// 3. Otherwise (standalone, ingress-disabled local node, no peer) return
169 /// `None` — the caller WARNs and skips registering the domain.
170 async fn resolve_ingress_ip(&self) -> Option<IpAddr> {
171 if self.ingress_enabled {
172 if let Some(om) = &self.overlay_manager {
173 if let Some(ip) = om.read().await.node_ip() {
174 return Some(ip);
175 }
176 }
177 }
178 // Not the local funnel (or no local overlay IP): defer to the cluster's
179 // chosen ingress peer.
180 if let Some(cluster) = &self.cluster {
181 if let Some(ip_str) = cluster.select_ingress_overlay_ip().await {
182 match ip_str.parse::<IpAddr>() {
183 Ok(ip) => return Some(ip),
184 Err(e) => tracing::warn!(
185 ingress_overlay_ip = %ip_str,
186 error = %e,
187 "selected ingress peer overlay IP is not a valid IP address; \
188 skipping external-domain DNS registration"
189 ),
190 }
191 }
192 }
193 None
194 }
195
196 /// Register an A/AAAA record for every endpoint's external vhost domain
197 /// (`EndpointSpec.host`) pointing at the ingress funnel's overlay IP, so a
198 /// client resolving `<host>` reaches an ingress-capable node whose 80/443
199 /// proxy fans out to this service's backends across the mesh.
200 ///
201 /// Wildcard host patterns (`*.example.com`) are routing matchers, not
202 /// resolvable names, so they are skipped. When no ingress node exists
203 /// anywhere in the mesh yet (no 80/443 entrypoint reachable), each host is
204 /// logged at WARN and skipped — never an error; it re-registers on the
205 /// next container attach once an entrypoint appears.
206 async fn register_external_domains(&self, dns: &Arc<DnsServer>) {
207 // Collect resolvable external domains for this service's endpoints.
208 let hosts: Vec<String> = self
209 .spec
210 .endpoints
211 .iter()
212 .filter_map(|ep| ep.host.as_ref())
213 .map(|h| h.trim().to_string())
214 .filter(|h| !h.is_empty() && !h.contains('*'))
215 .collect();
216 if hosts.is_empty() {
217 return;
218 }
219
220 let Some(ingress_ip) = self.resolve_ingress_ip().await else {
221 for host in &hosts {
222 tracing::warn!(
223 service = %self.service_name,
224 host = %host,
225 "no 80/443 entrypoint reachable yet — open one to serve {host}; \
226 skipping external-domain DNS registration (will retry on next attach)"
227 );
228 }
229 return;
230 };
231
232 for host in &hosts {
233 // `host` is already a fully-qualified external domain; pass it with
234 // a trailing dot so `DnsServer::add_record` treats it as an FQDN and
235 // does NOT append the daemon's internal zone origin.
236 let fqdn = if host.ends_with('.') {
237 host.clone()
238 } else {
239 format!("{host}.")
240 };
241 match dns.add_record(&fqdn, ingress_ip).await {
242 Ok(()) => tracing::info!(
243 service = %self.service_name,
244 host = %host,
245 ingress_ip = %ingress_ip,
246 "registered external-domain DNS record (host -> ingress overlay IP)"
247 ),
248 Err(e) => tracing::warn!(
249 service = %self.service_name,
250 host = %host,
251 error = %e,
252 "failed to register external-domain DNS record"
253 ),
254 }
255 }
256 }
257
258 /// The per-deployment resolv.conf `search` domain list for containers in
259 /// this service's deployment, given the daemon's global DNS `zone` (e.g.
260 /// `zlayer.local`).
261 ///
262 /// Returns a space-separated `search` value placing the deployment scope
263 /// FIRST so a guest's bare `<svc>` / `<svc>.service` query expands to THIS
264 /// deployment's record before anything else, with the bare zone last so
265 /// cross-deployment by-FQDN names (`<svc>.<otherdeployment>`) still resolve:
266 ///
267 /// ```text
268 /// search <deployment>.<zone> <zone>
269 /// ```
270 ///
271 /// When this instance has no deployment (standalone / single-deployment),
272 /// returns `None` so callers fall back to the daemon's global zone domain.
273 #[must_use]
274 pub fn dns_search_domain(&self, zone: &str) -> Option<String> {
275 let zone = zone.trim_end_matches('.');
276 self.deployment.as_deref().map(|d| {
277 // `<deployment>.<zone>` first (deployment scope wins), `<zone>` last
278 // (cross-deployment FQDN + global names still resolve).
279 format!("{d}.{zone} {zone}")
280 })
281 }
282
283 /// The set of service-discovery hostnames to register for one container,
284 /// relative to the daemon's DNS zone (each gets `<zone>` appended by
285 /// [`DnsServer::add_record`]).
286 ///
287 /// Two families are emitted:
288 ///
289 /// 1. **Deployment-scoped** (only when this instance carries a
290 /// `deployment`): `<svc>.<D>`, `<svc>.service.<D>`,
291 /// `<replica>.<svc>.service.<D>`, the documented example FQDN form
292 /// `<svc>.<D>.service`, and (for non-default replica groups)
293 /// `<role>.<svc>.service.<D>`. Paired with the per-deployment
294 /// `search <D>.<zone> <zone>` resolv.conf domain (see
295 /// [`Self::dns_search_domain`]), a guest's bare `<svc>` expands to
296 /// `<svc>.<D>.<zone>` and `<svc>.service` to `<svc>.service.<D>.<zone>`,
297 /// so both resolve to THIS deployment's instance and NEVER clobber a
298 /// different deployment that happens to share a service name.
299 ///
300 /// 2. **Unscoped / legacy** (always): the bare `<svc>` name plus
301 /// `<svc>.service.local`, `<replica>.<svc>.service.local`, and the role
302 /// form. These preserve the historical compose-style discovery used by
303 /// native containers (no per-deployment search domain) and existing
304 /// deployments. NOTE: the bare `<svc>` is the last-writer-wins,
305 /// cross-deployment-ambiguous key — it is kept for back-compat but the
306 /// deployment-scoped names above are what make discovery correct.
307 #[must_use]
308 fn dns_hostnames(&self, id: &ContainerId) -> Vec<String> {
309 let svc = &self.service_name;
310 let mut names: Vec<String> = Vec::new();
311
312 // --- Deployment-scoped family (correct, no cross-deployment leak) ---
313 if let Some(d) = self.deployment.as_deref() {
314 // bare `<svc>` -> `<svc>.<D>` (resolves via `search <D>.<zone>`)
315 names.push(format!("{svc}.{d}"));
316 // `<svc>.service` -> `<svc>.service.<D>`
317 names.push(format!("{svc}.service.{d}"));
318 // documented example FQDN form `<svc>.<D>.service`
319 names.push(format!("{svc}.{d}.service"));
320 // replica-specific `<replica>.<svc>.service` -> `.<D>`
321 names.push(format!("{}.{svc}.service.{d}", id.replica));
322 // per-role group form for non-default replica groups
323 if id.role != "default" {
324 names.push(format!("{}.{svc}.service.{d}", id.role));
325 }
326 }
327
328 // --- Unscoped / legacy family (compose back-compat) ---
329 // Bare compose service name (e.g. `postgres`); multiple replicas upsert
330 // the same key and the in-memory authority keeps the most recent A.
331 names.push(svc.clone());
332 names.push(format!("{svc}.service.local"));
333 names.push(format!("{}.{svc}.service.local", id.replica));
334 if id.role != "default" {
335 names.push(format!("{}.{svc}.service.local", id.role));
336 }
337
338 names
339 }
340
341 /// Derive the replica group role for a 1-based `replica_idx`.
342 ///
343 /// When `spec.replica_groups` is unset, returns `"default"` (the implicit
344 /// single-group case). Otherwise walks groups in declaration order,
345 /// accumulating each group's `count` until `replica_idx` falls within the
346 /// current group's range, and returns that group's `role`.
347 ///
348 /// Replicas beyond the declared total fall back to `"default"`.
349 #[must_use]
350 pub fn role_for_replica(&self, replica_idx: u32) -> String {
351 let Some(groups) = self.spec.replica_groups.as_ref() else {
352 return "default".to_string();
353 };
354 let mut cumulative = 0u32;
355 for group in groups {
356 cumulative = cumulative.saturating_add(group.count);
357 if replica_idx <= cumulative {
358 return group.role.clone();
359 }
360 }
361 "default".to_string()
362 }
363
364 /// Builder method to add DNS server for service discovery
365 #[must_use]
366 pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
367 self.dns_server = Some(dns_server);
368 self
369 }
370
371 /// Set the DNS server for service discovery
372 pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
373 self.dns_server = Some(dns_server);
374 }
375
376 /// Set the container-injectable overlay resolver IP and apply it to the
377 /// instance's spec.
378 ///
379 /// When `container_dns` is set and the spec is eligible (not host-network,
380 /// no user-supplied `dns`), this pre-populates `spec.dns` with the overlay
381 /// resolver so containers resolve through `<ip>:53` instead of inheriting
382 /// the host's `/etc/resolv.conf`.
383 ///
384 /// Why this exists: on overlay-enabled hosts the netbird `~.`
385 /// systemd-resolved hijack swallows the host resolver, so a container that
386 /// inherits the host resolv.conf cannot resolve anything. The overlay DNS
387 /// server forwards non-overlay queries upstream, so pointing the container
388 /// at it fixes resolution AND gives it service-name discovery.
389 ///
390 /// Port-53 constraint: `resolv.conf` `nameserver` lines (and Docker's
391 /// `--dns`) carry no port — they are always port 53. The injected IP is
392 /// therefore only useful because the daemon binds the overlay resolver on
393 /// `<ip>:53` (see `daemon.rs` Phase 4); the injected value is the bare IP,
394 /// not a `host:port`.
395 ///
396 /// User-supplied `spec.dns` is left untouched: an explicit resolver from
397 /// the deployment spec always wins.
398 pub fn set_container_dns(&mut self, container_dns: IpAddr) {
399 self.container_dns = Some(container_dns);
400 if !self.spec.host_network && self.spec.dns.is_empty() {
401 self.spec.dns = vec![container_dns.to_string()];
402 }
403 }
404
405 /// Set the proxy manager for health-aware load balancing
406 pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
407 self.proxy_manager = Some(proxy_manager);
408 }
409
410 /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
411 pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
412 self.health_states = Some(states);
413 }
414
415 /// Get the last observed image digest (after the most recent successful
416 /// pull). Returns `None` when no pull has happened yet, when the runtime
417 /// does not expose digests, or when no matching `ImageInfo` was found.
418 pub async fn last_pulled_digest(&self) -> Option<String> {
419 self.last_pulled_digest.read().await.clone()
420 }
421
422 /// Pull the service image using the spec's pull policy (literal Docker /
423 /// Kubernetes semantics — no silent auto-upgrade of `IfNotPresent` to
424 /// `Newer` for `:latest` tags) and refresh the cached digest from
425 /// `Runtime::list_images` when the runtime exposes it. Returns the digest
426 /// observed after the pull, when known.
427 ///
428 /// For `Never`, the runtime is still called so it can load the image
429 /// config from the local cache (without any remote round-trip); only the
430 /// remote digest refresh is skipped. Without this call the bundle builder
431 /// has no image entrypoint/cmd and falls back to `/bin/sh`.
432 async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
433 let image_str = self.spec.image.name.to_string();
434 let policy = self.spec.image.pull_policy;
435
436 self.runtime
437 .pull_image_with_policy(
438 &image_str,
439 policy,
440 None,
441 self.spec.image.source_policy.unwrap_or_default(),
442 )
443 .await
444 .map_err(|e| AgentError::PullFailed {
445 image: self.spec.image.name.to_string(),
446 reason: e.to_string(),
447 })?;
448
449 // Best-effort: try to discover the resolved digest via list_images.
450 // Runtimes that don't support introspection (Unsupported) leave the
451 // cached digest unchanged; drift detection then falls back to "always
452 // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
453 // when no digests are known".
454 let new_digest = match self.runtime.list_images().await {
455 Ok(images) => images
456 .into_iter()
457 .find(|info| info.reference == image_str)
458 .and_then(|info| info.digest),
459 Err(e) => {
460 tracing::debug!(
461 image = %image_str,
462 error = %e,
463 "list_images unavailable; cannot record post-pull digest"
464 );
465 None
466 }
467 };
468
469 if let Some(ref digest) = new_digest {
470 *self.last_pulled_digest.write().await = Some(digest.clone());
471 }
472
473 Ok(new_digest)
474 }
475
476 /// Scale to the desired number of replicas
477 ///
478 /// This method uses short-lived locks to avoid blocking concurrent operations.
479 /// I/O operations (pull, create, start, stop, remove) are performed without
480 /// holding the containers lock to allow other operations to proceed.
481 ///
482 /// # Errors
483 /// Returns an error if image pull, container creation, or container lifecycle operations fail.
484 #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
485 pub async fn scale_to(&self, replicas: u32) -> Result<()> {
486 // Phase 1: Determine current state (short read lock)
487 let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
488
489 // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
490 // here with replicas == current_replicas in the steady state) actually
491 // refreshes the cached digest. We skip the call only when scaling
492 // strictly down (no new containers needed). For `Never` the runtime
493 // still needs to load the image config from the local cache so the
494 // bundle builder gets entrypoint/cmd/env — without it the container
495 // falls back to `/bin/sh` and exits instantly. `pull_and_refresh_digest`
496 // itself handles the Never case (no remote round-trip, cache-only).
497 if replicas >= current_replicas {
498 let _ = self.pull_and_refresh_digest().await?;
499 }
500
501 // Phase 2: Scale up - create new containers (no lock held during I/O)
502 //
503 // Compute (role, replica_index) tuples for each new replica. When
504 // `spec.replica_groups` is set, expand groups in declaration order so
505 // each created replica maps to its declared `(role, intra_group_index)`.
506 // Otherwise fall back to the implicit single "default" group. The
507 // `local_node_id` is captured once so every new `ContainerId` carries
508 // the owning node identity for cross-node disambiguation.
509 let local_node_id = self.node_id;
510 if replicas > current_replicas {
511 let replica_specs: Vec<(String, u32)> =
512 if let Some(groups) = self.spec.replica_groups.as_ref() {
513 let mut specs: Vec<(String, u32)> = Vec::new();
514 for group in groups {
515 for idx in 0..group.count {
516 specs.push((group.role.clone(), idx + 1));
517 }
518 }
519 specs
520 .into_iter()
521 .skip(current_replicas as usize)
522 .take((replicas - current_replicas) as usize)
523 .collect()
524 } else {
525 (current_replicas..replicas)
526 .map(|i| ("default".to_string(), i + 1))
527 .collect()
528 };
529
530 for (role, replica_idx) in replica_specs {
531 let id = ContainerId::with_role_and_node(
532 self.service_name.clone(),
533 replica_idx,
534 role,
535 local_node_id,
536 );
537
538 // Create container (no lock needed - I/O operation)
539 //
540 // RouteToPeer must propagate unchanged: the scheduler uses it
541 // to re-place the workload on a capable peer, and wrapping it
542 // in `CreateFailed` would hide the signal and mark the service
543 // dead instead of rescheduling it. All other errors are
544 // normalised to `CreateFailed` for upstream handling.
545 self.runtime
546 .create_container(&id, &self.spec)
547 .await
548 .map_err(|e| match e {
549 AgentError::RouteToPeer { .. } => e,
550 other => AgentError::CreateFailed {
551 id: id.to_string(),
552 reason: other.to_string(),
553 },
554 })?;
555
556 // Run init actions with error policy enforcement (no lock needed)
557 let init_orchestrator = InitOrchestrator::with_error_policy(
558 id.clone(),
559 self.spec.init.clone(),
560 self.spec.errors.clone(),
561 );
562 init_orchestrator.run().await?;
563
564 // Start container (no lock needed - I/O operation)
565 self.runtime
566 .start_container(&id)
567 .await
568 .map_err(|e| AgentError::StartFailed {
569 id: id.to_string(),
570 reason: e.to_string(),
571 })?;
572
573 // Get container PID with retries (may not be immediately available)
574 let mut container_pid = None;
575 for attempt in 1..=5u32 {
576 match self.runtime.get_container_pid(&id).await {
577 Ok(Some(pid)) => {
578 container_pid = Some(pid);
579 break;
580 }
581 Ok(None) if attempt < 5 => {
582 tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
583 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
584 }
585 Ok(None) => {
586 tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
587 }
588 Err(e) => {
589 tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
590 if attempt < 5 {
591 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
592 }
593 }
594 }
595 }
596
597 // Verify the container is still running before attempting
598 // overlay attach. If the init process crashed during start
599 // (bad image, missing libs, failed mount), the PID above is
600 // now dead and every `ip link set ... netns {pid}` will
601 // return a cryptic RTNETLINK error. Surface the real cause
602 // from the container's log tail instead of the cascade.
603 if container_pid.is_some() {
604 let alive = match self.runtime.container_state(&id).await {
605 Ok(
606 ContainerState::Running
607 | ContainerState::Pending
608 | ContainerState::Initializing,
609 ) => true,
610 Ok(state) => {
611 tracing::warn!(
612 container = %id,
613 ?state,
614 "container exited before overlay attach could run"
615 );
616 false
617 }
618 Err(e) => {
619 // State query failed — don't block the attach on
620 // it. The overlay manager's own cleanup-on-error
621 // path now handles the dead-PID case cleanly.
622 tracing::warn!(
623 container = %id,
624 error = %e,
625 "container state query failed before overlay attach, proceeding"
626 );
627 true
628 }
629 };
630 if !alive {
631 let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
632 || " <log read failed>".to_string(),
633 |entries| {
634 if entries.is_empty() {
635 " <no log output>".to_string()
636 } else {
637 entries
638 .into_iter()
639 .map(|e| format!(" {}", e.message))
640 .collect::<Vec<_>>()
641 .join("\n")
642 }
643 },
644 );
645 return Err(AgentError::StartFailed {
646 id: id.to_string(),
647 reason: format!("container exited during startup:\n{log_tail}"),
648 });
649 }
650 }
651
652 // Attach to overlay network if manager is available.
653 //
654 // Linux uses the container PID to enter the netns and attach a
655 // veth. Windows has no PID-addressable netns — the HCN namespace
656 // GUID (obtained from `get_container_namespace_id`) is used
657 // instead, and the endpoint's IP has already been populated by
658 // `EndpointAttachment::create_overlay` during container creation.
659 // We simply register that IP with the slice allocator so host
660 // accounting stays in sync.
661 let overlay_ip = if let Some(overlay) = &self.overlay_manager {
662 let overlay_guard = overlay.read().await;
663 #[cfg(target_os = "windows")]
664 let attach_result: Option<std::net::IpAddr> = {
665 // On Windows the overlay attach (HCN endpoint + per-container
666 // namespace creation, via overlayd) already happened inside
667 // `HcsRuntime::create_container`. Here we only need the IP it
668 // assigned so we can register DNS for service discovery.
669 let _ = (container_pid, &overlay_guard); // unused on Windows
670 match self.runtime.get_container_ip(&id).await {
671 Ok(Some(ip)) => Some(ip),
672 Ok(None) => {
673 tracing::debug!(
674 container = %id,
675 "no overlay IP recorded for container (overlay attach skipped at create time)"
676 );
677 None
678 }
679 Err(e) => {
680 tracing::warn!(
681 container = %id,
682 error = %e,
683 "failed to fetch container overlay IP"
684 );
685 None
686 }
687 }
688 };
689 #[cfg(not(target_os = "windows"))]
690 let attach_result: Option<std::net::IpAddr> = {
691 match self.runtime.overlay_attach_kind() {
692 // VM guest (macOS VZ-Linux): no host netns/PID, so
693 // overlayd allocates the overlay identity and we push
694 // it into the guest over vsock, where it brings up its
695 // own kernel WireGuard device.
696 crate::runtime::OverlayAttachKind::InGuestVsock => {
697 let cid = id.to_string();
698 // Per-deployment resolv.conf search domain so the
699 // guest's bare `<svc>`/`<svc>.service` resolves to
700 // THIS deployment (no cross-deployment clobber).
701 let dns_override = overlay_guard
702 .dns_domain()
703 .and_then(|zone| self.dns_search_domain(zone));
704 match overlay_guard
705 .attach_container_guest(
706 &cid,
707 &self.service_name,
708 true,
709 dns_override,
710 )
711 .await
712 {
713 Ok(cfg) => {
714 let ip = cfg.overlay_ip;
715 match self.runtime.push_overlay_config(&id, &cfg).await {
716 Ok(()) => Some(ip),
717 Err(e) => {
718 tracing::warn!(
719 container = %id,
720 error = %e,
721 "failed to push overlay config into guest; rolling back allocation"
722 );
723 // Don't leak the overlayd IP/peer.
724 if let Err(de) =
725 overlay_guard.detach_container_guest(&cid).await
726 {
727 tracing::warn!(
728 container = %id,
729 error = %de,
730 "failed to roll back guest overlay allocation"
731 );
732 }
733 None
734 }
735 }
736 }
737 Err(e) => {
738 tracing::warn!(
739 container = %id,
740 error = %e,
741 "failed to allocate guest overlay config from overlayd"
742 );
743 None
744 }
745 }
746 }
747 // Host-process runtimes (Linux youki): plumb a veth
748 // into the container's netns by PID.
749 _ => {
750 if let Some(pid) = container_pid {
751 // Per-deployment resolv.conf search domain so the
752 // container's bare `<svc>`/`<svc>.service` resolves
753 // to THIS deployment (no cross-deployment clobber).
754 let dns_override = overlay_guard
755 .dns_domain()
756 .and_then(|zone| self.dns_search_domain(zone));
757 match overlay_guard
758 .attach_container(
759 pid,
760 &self.service_name,
761 true,
762 dns_override,
763 )
764 .await
765 {
766 Ok(ip) => Some(ip),
767 Err(e) => {
768 tracing::warn!(
769 container = %id,
770 error = %e,
771 "failed to attach container to overlay network"
772 );
773 None
774 }
775 }
776 } else {
777 // No PID available (e.g. WASM runtime) - skip overlay attachment
778 tracing::debug!(
779 container = %id,
780 "skipping overlay attachment - no PID available"
781 );
782 None
783 }
784 }
785 }
786 };
787
788 if let Some(ip) = attach_result {
789 tracing::info!(
790 container = %id,
791 overlay_ip = %ip,
792 "attached container to overlay network"
793 );
794
795 // Register DNS for service discovery.
796 if let Some(dns) = &self.dns_server {
797 for hostname in self.dns_hostnames(&id) {
798 match dns.add_record(&hostname, ip).await {
799 Ok(()) => tracing::debug!(
800 hostname = %hostname,
801 ip = %ip,
802 "registered service-discovery DNS record"
803 ),
804 Err(e) => tracing::warn!(
805 hostname = %hostname,
806 error = %e,
807 "failed to register service-discovery DNS record"
808 ),
809 }
810 }
811
812 // Register external service domains (vhosts) so a
813 // client resolving `<host>` lands on an
814 // ingress-capable node, whose 80/443 proxy fans out
815 // to this service's overlay-IP backends anywhere in
816 // the mesh. This is ADDITIONAL to the
817 // deployment-scoped service-discovery records above.
818 self.register_external_domains(dns).await;
819 }
820
821 Some(ip)
822 } else {
823 None
824 }
825 } else {
826 None
827 };
828
829 // If overlay failed, try the container runtime's own IP as fallback
830 let effective_ip = if overlay_ip.is_none() {
831 match self.runtime.get_container_ip(&id).await {
832 Ok(Some(ip)) => {
833 tracing::info!(
834 container = %id,
835 ip = %ip,
836 "using runtime container IP for proxy (overlay unavailable)"
837 );
838 Some(ip)
839 }
840 Ok(None) => {
841 tracing::warn!(
842 container = %id,
843 "no container IP available from runtime, proxy routing will be unavailable"
844 );
845 None
846 }
847 Err(e) => {
848 tracing::warn!(
849 container = %id,
850 error = %e,
851 "failed to get container IP from runtime"
852 );
853 None
854 }
855 }
856 } else {
857 overlay_ip
858 };
859
860 tracing::info!(
861 container = %id,
862 service = %self.service_name,
863 overlay_ip = ?overlay_ip,
864 effective_ip = ?effective_ip,
865 "Container IP resolution complete"
866 );
867
868 // Query port override from the runtime.
869 // On macOS sandbox, each container is assigned a unique port since
870 // all processes share the host network (no network namespaces).
871 // The runtime passes the port to the process via the PORT env var.
872 let port_override = match self.runtime.get_container_port_override(&id).await {
873 Ok(Some(port)) => {
874 tracing::info!(
875 container = %id,
876 port = port,
877 "runtime assigned dynamic port override for this container"
878 );
879 Some(port)
880 }
881 Ok(None) => None,
882 Err(e) => {
883 tracing::warn!(
884 container = %id,
885 error = %e,
886 "failed to query port override from runtime, using spec port"
887 );
888 None
889 }
890 };
891
892 // Start health monitoring and store handle (no lock needed during start)
893 let health_monitor_handle = {
894 let mut check = self.spec.health.check.clone();
895
896 // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
897 // port the container is listening on. With mac-sandbox, each
898 // replica gets a unique assigned port via port_override.
899 if let HealthCheck::Tcp { ref mut port } = check {
900 if *port == 0 {
901 *port = port_override.unwrap_or_else(|| {
902 self.spec
903 .endpoints
904 .iter()
905 .find(|ep| {
906 matches!(
907 ep.protocol,
908 Protocol::Http | Protocol::Https | Protocol::Websocket
909 )
910 })
911 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
912 });
913 }
914 }
915
916 let start_grace = self
917 .spec
918 .health
919 .start_grace
920 .unwrap_or(Duration::from_secs(5));
921 let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
922 let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
923 let retries = self.spec.health.retries;
924
925 let checker = HealthChecker::new(check, effective_ip);
926 let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
927 .with_start_grace(start_grace)
928 .with_check_timeout(check_timeout);
929
930 // Build the optional proxy backend handle. This is only present
931 // when both a proxy manager AND a reachable overlay IP exist; in
932 // degraded-overlay / no-proxy deployments it stays None and the
933 // callback below skips all proxy work while STILL bridging health
934 // state back into ServiceManager.
935 let proxy_backend: Option<(Arc<ProxyManager>, SocketAddr)> =
936 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
937 let proxy = Arc::clone(proxy);
938 // Get the container's target port, using the runtime override if
939 // present. On macOS sandbox, port_override gives each replica a
940 // unique port so the proxy can distinguish backends sharing
941 // 127.0.0.1.
942 let port = port_override.unwrap_or_else(|| {
943 self.spec
944 .endpoints
945 .iter()
946 .find(|ep| {
947 matches!(
948 ep.protocol,
949 Protocol::Http | Protocol::Https | Protocol::Websocket
950 )
951 })
952 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
953 });
954
955 let backend_addr = SocketAddr::new(ip, port);
956
957 // Register backend with load balancer so proxy can route to it.
958 // This must happen before the health callback is created, because
959 // update_backend_health only updates *existing* backends.
960 proxy.add_backend(&self.service_name, backend_addr).await;
961
962 // Publish this container's exposed ports on the node
963 // loopback (`127.0.0.1:<endpoint.port>`) so a consumer
964 // sharing the node loopback can reach the service at
965 // `localhost:<port>`. Gated on the spec's policy
966 // (`Auto` publishes only for single-member services).
967 // Uses the SAME runtime-resolved `ip`/`port_override`
968 // as the backend above: on macOS each replica shares
969 // 127.0.0.1 with a unique override; on Linux/VM the
970 // overlay IP carries the declared target port.
971 if self.spec.publish_to_node_loopback() {
972 if let Err(e) = proxy
973 .publish_loopback_for_container(
974 self.deployment.as_deref(),
975 &self.service_name,
976 &self.spec,
977 ip,
978 port_override,
979 )
980 .await
981 {
982 // A host-port conflict means a DIFFERENT
983 // deployment/service already owns this
984 // published port; refuse to cross-wire
985 // (Bug 7) and surface it at deploy time.
986 tracing::error!(
987 service = %self.service_name,
988 error = %e,
989 "Failed to publish container ports on node loopback"
990 );
991 return Err(e);
992 }
993 }
994
995 Some((proxy, backend_addr))
996 } else {
997 None
998 };
999
1000 // The health bridge is ALWAYS attached, independent of proxy/IP
1001 // availability. stabilization::wait_for_stabilization only treats a
1002 // service as ready when health_states[name] == Healthy, so this write
1003 // must happen even when the overlay is degraded and no proxy backend
1004 // exists — otherwise the service stays healthy=false forever and
1005 // stabilization times out.
1006 let health_states_opt = self.health_states.clone();
1007 let svc_name_for_states = self.service_name.clone();
1008 let svc_name_for_proxy = self.service_name.clone();
1009 let svc_name_for_log = self.service_name.clone();
1010
1011 let health_callback: HealthCallback =
1012 Arc::new(move |container_id: ContainerId, is_healthy: bool| {
1013 tracing::info!(
1014 container = %container_id,
1015 service = %svc_name_for_log,
1016 healthy = is_healthy,
1017 has_proxy_backend = proxy_backend.is_some(),
1018 "health status changed"
1019 );
1020
1021 // Always bridge health state back to ServiceManager's
1022 // health_states map (unconditional — no proxy/IP required).
1023 if let Some(ref health_states) = health_states_opt {
1024 let states = Arc::clone(health_states);
1025 let svc = svc_name_for_states.clone();
1026 tokio::spawn(async move {
1027 let state = if is_healthy {
1028 HealthState::Healthy
1029 } else {
1030 HealthState::Unhealthy {
1031 failures: 0,
1032 reason: "health check failed".into(),
1033 }
1034 };
1035 states.write().await.insert(svc, state);
1036 });
1037 }
1038
1039 // Update proxy backend health only when a proxy backend was
1040 // registered (proxy manager + reachable overlay IP present).
1041 if let Some((proxy, backend_addr)) = proxy_backend.clone() {
1042 let svc = svc_name_for_proxy.clone();
1043 tokio::spawn(async move {
1044 proxy
1045 .update_backend_health(&svc, backend_addr, is_healthy)
1046 .await;
1047 });
1048 }
1049 });
1050
1051 monitor = monitor.with_callback(health_callback);
1052
1053 monitor.start()
1054 };
1055
1056 // Update state (short write lock)
1057 {
1058 let mut containers = self.containers.write().await;
1059 containers.insert(
1060 id.clone(),
1061 Container {
1062 id: id.clone(),
1063 image: self.spec.image.name.to_string(),
1064 state: ContainerState::Running,
1065 pid: None,
1066 task: None,
1067 overlay_ip: effective_ip,
1068 health_monitor: Some(health_monitor_handle),
1069 port_override,
1070 },
1071 );
1072 } // Lock released here
1073 }
1074 }
1075
1076 // Phase 3: Scale down - remove containers (short write lock per removal)
1077 //
1078 // Containers were created with `with_role_and_node(role, local_node_id)`
1079 // on scale-up, so we must reconstruct the same identity on scale-down
1080 // — the role is derived from `replica_groups` via `role_for_replica`
1081 // and the node id is the local cluster node. Mismatched ids would miss
1082 // the live entry in `self.containers` and leak the container.
1083 if replicas < current_replicas {
1084 for i in replicas..current_replicas {
1085 let replica_idx = i + 1;
1086 let id = ContainerId::with_role_and_node(
1087 self.service_name.clone(),
1088 replica_idx,
1089 self.role_for_replica(replica_idx),
1090 local_node_id,
1091 );
1092
1093 // Remove from state first and get the container to abort health monitor (short write lock)
1094 let removed_container = {
1095 let mut containers = self.containers.write().await;
1096 containers.remove(&id)
1097 }; // Lock released here
1098
1099 // Then perform cleanup (no lock held - I/O operations)
1100 if let Some(container) = removed_container {
1101 // Abort the health monitor task if it exists
1102 if let Some(handle) = container.health_monitor {
1103 handle.abort();
1104 }
1105
1106 // Unpublish this container's node-loopback ports (mirror of
1107 // the publish in the start path above). Recomputes the same
1108 // backend from the container's stored runtime-resolved IP and
1109 // port override; the last replica's removal frees the
1110 // loopback listener. Gated identically to publish.
1111 if self.spec.publish_to_node_loopback() {
1112 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip)
1113 {
1114 proxy
1115 .unpublish_loopback_for_container(
1116 &self.spec,
1117 ip,
1118 container.port_override,
1119 )
1120 .await;
1121 }
1122 }
1123
1124 // Remove DNS records for this container
1125 if let Some(dns) = &self.dns_server {
1126 // Remove replica-specific DNS entry
1127 let replica_hostname =
1128 format!("{}.{}.service.local", id.replica, self.service_name);
1129 if let Err(e) = dns.remove_record(&replica_hostname).await {
1130 tracing::warn!(
1131 hostname = %replica_hostname,
1132 error = %e,
1133 "failed to remove replica DNS record"
1134 );
1135 } else {
1136 tracing::debug!(
1137 hostname = %replica_hostname,
1138 "removed replica DNS record"
1139 );
1140 }
1141
1142 // Remove per-role DNS entry if this was a non-default group.
1143 // Note: this is best-effort and removes the record even if
1144 // other replicas in the same role still need it — the DNS
1145 // server's add/remove API is single-record so we can't keep
1146 // it alive for siblings. P2.3-bis (round-robin per-role)
1147 // can fix this later via a per-role refcount; for now the
1148 // service-level hostname keeps cluster-internal clients
1149 // working even when the role-specific record briefly
1150 // disappears.
1151 if id.role != "default" {
1152 let role_hostname =
1153 format!("{}.{}.service.local", id.role, self.service_name);
1154 if let Err(e) = dns.remove_record(&role_hostname).await {
1155 tracing::warn!(
1156 hostname = %role_hostname,
1157 error = %e,
1158 "failed to remove role DNS record"
1159 );
1160 } else {
1161 tracing::debug!(
1162 hostname = %role_hostname,
1163 "removed role DNS record"
1164 );
1165 }
1166 }
1167
1168 // Note: We don't remove the service-level hostname here because
1169 // other replicas may still be using it. The service-level record
1170 // should be cleaned up when the entire service is removed.
1171 }
1172
1173 // Detach from overlay network if manager available.
1174 //
1175 // Done BEFORE stop_container because:
1176 // - The container init process must still be in
1177 // /proc to look up its PID via `get_container_pid`.
1178 // - `OverlayManager::detach_container` deletes host-side
1179 // veth interfaces by name (`veth-<pid>-*`) and
1180 // releases the allocated overlay IPs back to the
1181 // per-node slice. Without this the IPs leak across
1182 // container churn and the slice exhausts.
1183 //
1184 // Best-effort: failures are logged but never abort the
1185 // scale-down. The periodic orphan sweep
1186 // (`start_periodic_orphan_sweep`) catches anything we
1187 // missed.
1188 if let Some(overlay) = &self.overlay_manager {
1189 // VM guests have no host veth/PID — release the overlayd
1190 // allocation (IP + registered mesh peer) by container id
1191 // instead of by PID.
1192 if self.runtime.overlay_attach_kind()
1193 == crate::runtime::OverlayAttachKind::InGuestVsock
1194 {
1195 let overlay_guard = overlay.read().await;
1196 if let Err(e) =
1197 overlay_guard.detach_container_guest(&id.to_string()).await
1198 {
1199 tracing::warn!(
1200 container = %id,
1201 error = %e,
1202 "overlay detach_container_guest failed; relying on orphan sweep"
1203 );
1204 }
1205 } else {
1206 match self.runtime.get_container_pid(&id).await {
1207 Ok(Some(pid)) => {
1208 let overlay_guard = overlay.read().await;
1209 if let Err(e) = overlay_guard.detach_container(pid).await {
1210 tracing::warn!(
1211 container = %id,
1212 pid,
1213 error = %e,
1214 "overlay detach_container failed; relying on orphan sweep"
1215 );
1216 }
1217 }
1218 Ok(None) => {
1219 tracing::debug!(
1220 container = %id,
1221 "no PID available for overlay detach (already exited or non-Linux runtime)"
1222 );
1223 }
1224 Err(e) => {
1225 tracing::warn!(
1226 container = %id,
1227 error = %e,
1228 "failed to query container PID for overlay detach"
1229 );
1230 }
1231 }
1232 }
1233 }
1234
1235 // Stop container
1236 self.runtime
1237 .stop_container(&id, Duration::from_secs(30))
1238 .await?;
1239
1240 // Sync volumes to S3 before removal (no-op if not configured)
1241 if let Err(e) = self.runtime.sync_container_volumes(&id).await {
1242 tracing::warn!(
1243 container = %id,
1244 error = %e,
1245 "failed to sync volumes before removal"
1246 );
1247 }
1248
1249 // Remove container
1250 self.runtime.remove_container(&id).await?;
1251 }
1252 }
1253 }
1254
1255 Ok(())
1256 }
1257
1258 /// Get current number of replicas
1259 pub async fn replica_count(&self) -> usize {
1260 self.containers.read().await.len()
1261 }
1262
1263 /// Get all container IDs
1264 pub async fn container_ids(&self) -> Vec<ContainerId> {
1265 self.containers.read().await.keys().cloned().collect()
1266 }
1267
1268 /// Get per-container info (id, image, state, pid, overlay IP) for every
1269 /// live container in this instance.
1270 ///
1271 /// Surfaces the REAL image reference each container was created from and its
1272 /// REAL lifecycle state (lowercased via [`ContainerState::as_str`]) so the
1273 /// API/`ps` no longer reports a hardcoded `"running"` with no image.
1274 pub async fn container_infos(&self) -> Vec<ContainerInfo> {
1275 self.containers
1276 .read()
1277 .await
1278 .values()
1279 .map(|c| ContainerInfo {
1280 id: c.id.clone(),
1281 image: c.image.clone(),
1282 state: c.state.as_str().to_string(),
1283 pid: c.pid,
1284 overlay_ip: c.overlay_ip.map(|ip| ip.to_string()),
1285 })
1286 .collect()
1287 }
1288
1289 /// Get read access to the containers map
1290 ///
1291 /// This allows callers to access container overlay IPs and other metadata
1292 /// without copying the entire map.
1293 pub fn containers(
1294 &self,
1295 ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
1296 &self.containers
1297 }
1298
1299 /// Check if this service instance has an overlay manager configured
1300 pub fn has_overlay_manager(&self) -> bool {
1301 self.overlay_manager.is_some()
1302 }
1303
1304 /// Check if this service instance has a proxy manager configured
1305 pub fn has_proxy_manager(&self) -> bool {
1306 self.proxy_manager.is_some()
1307 }
1308
1309 /// Get the proxy manager for this instance, if configured.
1310 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1311 self.proxy_manager.as_ref()
1312 }
1313
1314 /// Check if this service instance has a DNS server configured
1315 pub fn has_dns_server(&self) -> bool {
1316 self.dns_server.is_some()
1317 }
1318}
1319
1320/// Per-container summary surfaced to callers (API / `ps`).
1321///
1322/// Carries the REAL image reference and lifecycle state of a single live
1323/// container, replacing the previous id-only view that forced the API to
1324/// fabricate a hardcoded `"running"` state with no image.
1325#[derive(Debug, Clone)]
1326pub struct ContainerInfo {
1327 /// Container identity.
1328 pub id: ContainerId,
1329 /// Image reference the container was created from (canonical form).
1330 pub image: String,
1331 /// Lowercased lifecycle state (e.g. `"running"`, `"exited"`).
1332 pub state: String,
1333 /// Process ID, when the container is running.
1334 pub pid: Option<u32>,
1335 /// Overlay IP rendered as a string, when assigned.
1336 pub overlay_ip: Option<String>,
1337}
1338
1339/// A live deployment container enriched for Docker-compat `ps` rows and for
1340/// name resolution. Produced by [`ServiceManager::list_container_views`].
1341#[derive(Debug, Clone)]
1342pub struct DeploymentContainerView {
1343 /// Deployment (compose project) name, when known.
1344 pub deployment: Option<String>,
1345 /// Service name within the deployment.
1346 pub service: String,
1347 /// Concrete container identity.
1348 pub container_id: ContainerId,
1349 /// Compose `container_name:` (the user-facing Docker name), when set.
1350 pub container_name: Option<String>,
1351 /// Image reference the container was created from.
1352 pub image: String,
1353 /// Lowercased lifecycle state (e.g. `"running"`).
1354 pub state: String,
1355 /// Process id when running.
1356 pub pid: Option<u32>,
1357 /// The service's published port mappings.
1358 pub ports: Vec<zlayer_spec::PortMapping>,
1359}
1360
1361/// Service manager for multiple services
1362pub struct ServiceManager {
1363 runtime: Arc<dyn Runtime + Send + Sync>,
1364 services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
1365 scale_semaphore: Arc<Semaphore>,
1366 /// Overlay network manager for container networking
1367 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1368 /// Stream registry for L4 proxy route registration (TCP/UDP)
1369 stream_registry: Option<Arc<StreamRegistry>>,
1370 /// Proxy manager for health-aware load balancing (hyper-based proxy)
1371 proxy_manager: Option<Arc<ProxyManager>>,
1372 /// DNS server for service discovery
1373 dns_server: Option<Arc<DnsServer>>,
1374 /// Container-injectable overlay resolver IP. When set, new service
1375 /// instances inject `<ip>` into their `spec.dns` so containers resolve
1376 /// through the overlay DNS server (bound on `<ip>:53`) rather than the
1377 /// hijacked host resolv.conf.
1378 container_dns: Option<IpAddr>,
1379 /// Deployment name (used for generating hostnames)
1380 deployment_name: Option<String>,
1381 /// Health states for dependency condition checking
1382 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
1383 /// Job executor for run-to-completion workloads
1384 job_executor: Option<Arc<JobExecutor>>,
1385 /// Cron scheduler for time-based job triggers
1386 cron_scheduler: Option<Arc<CronScheduler>>,
1387 /// Container supervisor for crash/panic policy enforcement
1388 container_supervisor: Option<Arc<ContainerSupervisor>>,
1389 /// Cluster membership + dispatch handle. When `None`, scale operations
1390 /// run purely local (single-node mode). When `Some`, `scale_service`
1391 /// routes through the cluster (leader dispatches to peers; followers
1392 /// forward to the leader).
1393 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1394 /// Whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
1395 /// `NodeConfig.ingress`). Threaded onto each `ServiceInstance` so external
1396 /// service domains resolve to this node's overlay IP when it is the funnel.
1397 /// Defaults to `false`; set by the daemon from `NodeConfig.ingress`.
1398 ingress: bool,
1399}
1400
1401// ---------------------------------------------------------------------------
1402// ServiceManagerBuilder
1403// ---------------------------------------------------------------------------
1404
1405/// Builder for constructing a [`ServiceManager`] with optional subsystems.
1406///
1407/// Prefer using `ServiceManager::builder(runtime)` to start building.
1408///
1409/// # Example
1410///
1411/// ```ignore
1412/// let manager = ServiceManager::builder(runtime)
1413/// .overlay_manager(om)
1414/// .proxy_manager(proxy)
1415/// .deployment_name("prod")
1416/// .build();
1417/// ```
1418pub struct ServiceManagerBuilder {
1419 runtime: Arc<dyn Runtime + Send + Sync>,
1420 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1421 proxy_manager: Option<Arc<ProxyManager>>,
1422 stream_registry: Option<Arc<StreamRegistry>>,
1423 dns_server: Option<Arc<DnsServer>>,
1424 container_dns: Option<IpAddr>,
1425 deployment_name: Option<String>,
1426 job_executor: Option<Arc<JobExecutor>>,
1427 cron_scheduler: Option<Arc<CronScheduler>>,
1428 container_supervisor: Option<Arc<ContainerSupervisor>>,
1429 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1430}
1431
1432impl ServiceManagerBuilder {
1433 /// Create a new builder with the required runtime.
1434 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1435 Self {
1436 runtime,
1437 overlay_manager: None,
1438 proxy_manager: None,
1439 stream_registry: None,
1440 dns_server: None,
1441 container_dns: None,
1442 deployment_name: None,
1443 job_executor: None,
1444 cron_scheduler: None,
1445 container_supervisor: None,
1446 cluster: None,
1447 }
1448 }
1449
1450 /// Set the overlay network manager for container networking.
1451 #[must_use]
1452 pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
1453 self.overlay_manager = Some(om);
1454 self
1455 }
1456
1457 /// Set the proxy manager for health-aware load balancing.
1458 #[must_use]
1459 pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
1460 self.proxy_manager = Some(pm);
1461 self
1462 }
1463
1464 /// Set the stream registry for TCP/UDP L4 proxy route registration.
1465 #[must_use]
1466 pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
1467 self.stream_registry = Some(sr);
1468 self
1469 }
1470
1471 /// Set the DNS server for service discovery.
1472 #[must_use]
1473 pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1474 self.dns_server = Some(dns);
1475 self
1476 }
1477
1478 /// Set the container-injectable overlay resolver IP.
1479 ///
1480 /// The daemon passes the IP it bound the overlay DNS server on at port 53
1481 /// (see `daemon.rs` Phase 4). New service instances inject it into
1482 /// `spec.dns` so containers resolve through the overlay instead of the
1483 /// hijacked host resolv.conf. The port is implicitly 53 (resolv.conf has no
1484 /// port syntax), which is why only the bare IP is threaded here.
1485 #[must_use]
1486 pub fn container_dns(mut self, ip: IpAddr) -> Self {
1487 self.container_dns = Some(ip);
1488 self
1489 }
1490
1491 /// Set the deployment name (used for hostname generation).
1492 #[must_use]
1493 pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
1494 self.deployment_name = Some(name.into());
1495 self
1496 }
1497
1498 /// Set the job executor for run-to-completion workloads.
1499 #[must_use]
1500 pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
1501 self.job_executor = Some(je);
1502 self
1503 }
1504
1505 /// Set the cron scheduler for time-based job triggers.
1506 #[must_use]
1507 pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
1508 self.cron_scheduler = Some(cs);
1509 self
1510 }
1511
1512 /// Set the container supervisor for crash/panic policy enforcement.
1513 #[must_use]
1514 pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
1515 self.container_supervisor = Some(cs);
1516 self
1517 }
1518
1519 /// Set the cluster membership + dispatch handle. When set,
1520 /// [`ServiceManager::scale_service`] will route through the cluster
1521 /// (leader dispatches to peers; followers forward to the leader).
1522 /// When unset (the default), scale operations remain local-only.
1523 #[must_use]
1524 pub fn cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1525 self.cluster = Some(cluster);
1526 self
1527 }
1528
1529 /// Consume the builder and produce a fully-wired [`ServiceManager`].
1530 ///
1531 /// Logs warnings for missing recommended subsystems (proxy,
1532 /// `stream_registry`, `container_supervisor`, `deployment_name`).
1533 pub fn build(self) -> ServiceManager {
1534 if self.proxy_manager.is_none() {
1535 tracing::warn!("ServiceManager built without proxy_manager");
1536 }
1537 if self.stream_registry.is_none() {
1538 tracing::warn!("ServiceManager built without stream_registry");
1539 }
1540 if self.container_supervisor.is_none() {
1541 tracing::warn!("ServiceManager built without container_supervisor");
1542 }
1543 if self.deployment_name.is_none() {
1544 tracing::warn!("ServiceManager built without deployment_name");
1545 }
1546
1547 ServiceManager {
1548 runtime: self.runtime,
1549 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1550 scale_semaphore: Arc::new(Semaphore::new(10)),
1551 overlay_manager: self.overlay_manager,
1552 stream_registry: self.stream_registry,
1553 proxy_manager: self.proxy_manager,
1554 dns_server: self.dns_server,
1555 container_dns: self.container_dns,
1556 deployment_name: self.deployment_name,
1557 health_states: Arc::new(RwLock::new(HashMap::new())),
1558 job_executor: self.job_executor,
1559 cron_scheduler: self.cron_scheduler,
1560 container_supervisor: self.container_supervisor,
1561 cluster: self.cluster,
1562 ingress: false,
1563 }
1564 }
1565}
1566
1567impl ServiceManager {
1568 /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
1569 ///
1570 /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
1571 ///
1572 /// # Example
1573 ///
1574 /// ```ignore
1575 /// let manager = ServiceManager::builder(runtime)
1576 /// .overlay_manager(om)
1577 /// .proxy_manager(proxy)
1578 /// .build();
1579 /// ```
1580 pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
1581 ServiceManagerBuilder::new(runtime)
1582 }
1583
1584 /// Create a new service manager
1585 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1586 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
1587 Self {
1588 runtime,
1589 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1590 scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
1591 overlay_manager: None,
1592 stream_registry: None,
1593 proxy_manager: None,
1594 dns_server: None,
1595 container_dns: None,
1596 deployment_name: None,
1597 health_states: Arc::new(RwLock::new(HashMap::new())),
1598 job_executor: None,
1599 cron_scheduler: None,
1600 container_supervisor: None,
1601 cluster: None,
1602 ingress: false,
1603 }
1604 }
1605
1606 /// Create a service manager with overlay network support
1607 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1608 pub fn with_overlay(
1609 runtime: Arc<dyn Runtime + Send + Sync>,
1610 overlay_manager: Arc<RwLock<OverlayManager>>,
1611 ) -> Self {
1612 Self {
1613 runtime,
1614 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1615 scale_semaphore: Arc::new(Semaphore::new(10)),
1616 overlay_manager: Some(overlay_manager),
1617 stream_registry: None,
1618 proxy_manager: None,
1619 dns_server: None,
1620 container_dns: None,
1621 deployment_name: None,
1622 health_states: Arc::new(RwLock::new(HashMap::new())),
1623 job_executor: None,
1624 cron_scheduler: None,
1625 container_supervisor: None,
1626 cluster: None,
1627 ingress: false,
1628 }
1629 }
1630
1631 /// Create a fully-configured service manager with overlay and proxy support
1632 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1633 pub fn with_full_config(
1634 runtime: Arc<dyn Runtime + Send + Sync>,
1635 overlay_manager: Arc<RwLock<OverlayManager>>,
1636 deployment_name: String,
1637 ) -> Self {
1638 Self {
1639 runtime,
1640 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1641 scale_semaphore: Arc::new(Semaphore::new(10)),
1642 overlay_manager: Some(overlay_manager),
1643 stream_registry: None,
1644 proxy_manager: None,
1645 dns_server: None,
1646 container_dns: None,
1647 deployment_name: Some(deployment_name),
1648 health_states: Arc::new(RwLock::new(HashMap::new())),
1649 job_executor: None,
1650 cron_scheduler: None,
1651 container_supervisor: None,
1652 cluster: None,
1653 ingress: false,
1654 }
1655 }
1656
1657 /// Get the health states map for external monitoring
1658 pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
1659 Arc::clone(&self.health_states)
1660 }
1661
1662 /// Update health state for a service
1663 pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
1664 let mut states = self.health_states.write().await;
1665 states.insert(service_name.to_string(), state);
1666 }
1667
1668 /// Set the deployment name (used for generating hostnames)
1669 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1670 pub fn set_deployment_name(&mut self, name: String) {
1671 self.deployment_name = Some(name);
1672 }
1673
1674 /// Set the stream registry for L4 proxy integration (TCP/UDP)
1675 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1676 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
1677 self.stream_registry = Some(registry);
1678 }
1679
1680 /// Builder pattern: add stream registry for L4 proxy integration
1681 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1682 #[must_use]
1683 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
1684 self.stream_registry = Some(registry);
1685 self
1686 }
1687
1688 /// Get the stream registry (if configured)
1689 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
1690 self.stream_registry.as_ref()
1691 }
1692
1693 /// Set the overlay manager for container networking
1694 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1695 pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
1696 self.overlay_manager = Some(manager);
1697 }
1698
1699 /// Set the proxy manager for health-aware load balancing
1700 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1701 pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
1702 self.proxy_manager = Some(proxy);
1703 }
1704
1705 /// Builder pattern: add proxy manager for health-aware load balancing
1706 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1707 #[must_use]
1708 pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
1709 self.proxy_manager = Some(proxy);
1710 self
1711 }
1712
1713 /// Get the proxy manager (if configured)
1714 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1715 self.proxy_manager.as_ref()
1716 }
1717
1718 /// Set the DNS server for service discovery
1719 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1720 pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
1721 self.dns_server = Some(dns);
1722 }
1723
1724 /// Builder pattern: add DNS server for service discovery
1725 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1726 #[must_use]
1727 pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
1728 self.dns_server = Some(dns);
1729 self
1730 }
1731
1732 /// Get the DNS server (if configured)
1733 pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
1734 self.dns_server.as_ref()
1735 }
1736
1737 /// Set the job executor for run-to-completion workloads
1738 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1739 pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
1740 self.job_executor = Some(executor);
1741 }
1742
1743 /// Set the cron scheduler for time-based job triggers
1744 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1745 pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
1746 self.cron_scheduler = Some(scheduler);
1747 }
1748
1749 /// Builder pattern: add job executor
1750 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1751 #[must_use]
1752 pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
1753 self.job_executor = Some(executor);
1754 self
1755 }
1756
1757 /// Builder pattern: add cron scheduler
1758 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1759 #[must_use]
1760 pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
1761 self.cron_scheduler = Some(scheduler);
1762 self
1763 }
1764
1765 /// Set the cluster handle for cluster-aware scaling.
1766 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1767 pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
1768 self.cluster = Some(cluster);
1769 }
1770
1771 /// Builder pattern: add a cluster handle for cluster-aware scaling.
1772 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1773 #[must_use]
1774 pub fn with_cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
1775 self.cluster = Some(cluster);
1776 self
1777 }
1778
1779 /// Get the cluster handle (if configured).
1780 pub fn cluster(&self) -> Option<&Arc<dyn zlayer_scheduler::cluster::Cluster>> {
1781 self.cluster.as_ref()
1782 }
1783
1784 /// Set whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
1785 /// `NodeConfig.ingress`). Threaded onto each new `ServiceInstance` so its
1786 /// external service domains resolve to this node's overlay IP when it is
1787 /// the funnel. Defaults to `false`.
1788 pub fn set_ingress(&mut self, enabled: bool) {
1789 self.ingress = enabled;
1790 }
1791
1792 /// Whether THIS node holds the standing HTTP/HTTPS ingress.
1793 #[must_use]
1794 pub fn ingress(&self) -> bool {
1795 self.ingress
1796 }
1797
1798 /// Get the job executor (if configured)
1799 pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
1800 self.job_executor.as_ref()
1801 }
1802
1803 /// Get the cron scheduler (if configured)
1804 pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
1805 self.cron_scheduler.as_ref()
1806 }
1807
1808 /// Set the container supervisor for crash/panic policy enforcement
1809 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1810 pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
1811 self.container_supervisor = Some(supervisor);
1812 }
1813
1814 /// Builder pattern: add container supervisor
1815 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
1816 #[must_use]
1817 pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
1818 self.container_supervisor = Some(supervisor);
1819 self
1820 }
1821
1822 /// Get the container supervisor (if configured)
1823 pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
1824 self.container_supervisor.as_ref()
1825 }
1826
1827 /// Start the container supervisor background task
1828 ///
1829 /// This spawns a background task that monitors containers for crashes
1830 /// and enforces the `on_panic` error policy.
1831 ///
1832 /// # Errors
1833 /// Returns an error if no container supervisor is configured.
1834 ///
1835 /// # Returns
1836 /// A `JoinHandle` for the supervisor task.
1837 pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
1838 let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
1839 AgentError::Configuration("Container supervisor not configured".to_string())
1840 })?;
1841
1842 let supervisor = Arc::clone(supervisor);
1843 Ok(tokio::spawn(async move {
1844 supervisor.run_loop().await;
1845 }))
1846 }
1847
1848 /// Shutdown the container supervisor
1849 pub fn shutdown_container_supervisor(&self) {
1850 if let Some(supervisor) = &self.container_supervisor {
1851 supervisor.shutdown();
1852 }
1853 }
1854
1855 /// Get the supervised state of a container
1856 pub async fn get_container_supervised_state(
1857 &self,
1858 container_id: &ContainerId,
1859 ) -> Option<SupervisedState> {
1860 if let Some(supervisor) = &self.container_supervisor {
1861 supervisor.get_state(container_id).await
1862 } else {
1863 None
1864 }
1865 }
1866
1867 /// Get supervisor events receiver
1868 ///
1869 /// Note: This can only be called once; the receiver is moved to the caller.
1870 pub async fn take_supervisor_events(
1871 &self,
1872 ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
1873 if let Some(supervisor) = &self.container_supervisor {
1874 supervisor.take_event_receiver().await
1875 } else {
1876 None
1877 }
1878 }
1879
1880 // ==================== Dependency Orchestration ====================
1881
1882 /// Deploy multiple services respecting their dependency order
1883 ///
1884 /// This method:
1885 /// 1. Builds a dependency graph from the services
1886 /// 2. Validates no cycles exist
1887 /// 3. Computes topological order (services with no deps first)
1888 /// 4. For each service in order, waits for dependencies then starts the service
1889 ///
1890 /// # Arguments
1891 /// * `services` - Map of service name to service specification
1892 ///
1893 /// # Errors
1894 /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
1895 /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
1896 pub async fn deploy_with_dependencies(
1897 &self,
1898 services: HashMap<String, ServiceSpec>,
1899 ) -> Result<()> {
1900 if services.is_empty() {
1901 return Ok(());
1902 }
1903
1904 // Build dependency graph
1905 let graph = DependencyGraph::build(&services)?;
1906
1907 tracing::info!(
1908 service_count = services.len(),
1909 "Starting deployment with dependency ordering"
1910 );
1911
1912 // Get startup order
1913 let order = graph.startup_order();
1914 tracing::debug!(order = ?order, "Computed startup order");
1915
1916 // Start services in dependency order
1917 for service_name in order {
1918 let service_spec = services
1919 .get(service_name)
1920 .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
1921
1922 // Wait for dependencies first
1923 if !service_spec.depends.is_empty() {
1924 tracing::info!(
1925 service = %service_name,
1926 dependency_count = service_spec.depends.len(),
1927 "Waiting for dependencies"
1928 );
1929 self.wait_for_dependencies(service_name, &service_spec.depends)
1930 .await?;
1931 }
1932
1933 // Register and start service
1934 tracing::info!(service = %service_name, "Starting service");
1935 Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
1936
1937 // Get the desired replica count from scale config
1938 let replicas = match &service_spec.scale {
1939 zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
1940 zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
1941 zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
1942 };
1943 self.scale_service(service_name, replicas).await?;
1944
1945 // Mark service as started in health states (Unknown until health check runs)
1946 self.update_health_state(service_name, HealthState::Unknown)
1947 .await;
1948
1949 tracing::info!(
1950 service = %service_name,
1951 replicas = replicas,
1952 "Service started"
1953 );
1954 }
1955
1956 tracing::info!(service_count = services.len(), "Deployment complete");
1957
1958 Ok(())
1959 }
1960
1961 /// Wait for all dependencies of a service to be satisfied
1962 ///
1963 /// # Arguments
1964 /// * `service` - Name of the service waiting for dependencies
1965 /// * `deps` - Slice of dependency specifications
1966 ///
1967 /// # Errors
1968 /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
1969 async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
1970 let condition_checker = DependencyConditionChecker::new(
1971 Arc::clone(&self.runtime),
1972 Arc::clone(&self.health_states),
1973 None,
1974 );
1975
1976 let waiter = DependencyWaiter::new(condition_checker);
1977 let results = waiter.wait_for_all(deps).await?;
1978
1979 // Check results for failures
1980 for result in results {
1981 match result {
1982 WaitResult::TimedOutFail {
1983 service: dep_service,
1984 condition,
1985 timeout,
1986 } => {
1987 return Err(AgentError::DependencyTimeout {
1988 service: service.to_string(),
1989 dependency: dep_service,
1990 condition: format!("{condition:?}"),
1991 timeout,
1992 });
1993 }
1994 WaitResult::TimedOutWarn {
1995 service: dep_service,
1996 condition,
1997 } => {
1998 tracing::warn!(
1999 service = %service,
2000 dependency = %dep_service,
2001 condition = ?condition,
2002 "Dependency timed out but continuing"
2003 );
2004 }
2005 WaitResult::TimedOutContinue | WaitResult::Satisfied => {
2006 // Continue silently
2007 }
2008 }
2009 }
2010
2011 Ok(())
2012 }
2013
2014 /// Check if all dependencies for a service are currently satisfied
2015 ///
2016 /// This is a one-shot check (no waiting). Useful for pre-flight validation.
2017 ///
2018 /// # Errors
2019 /// Returns an error if a dependency check fails unexpectedly.
2020 pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
2021 let condition_checker = DependencyConditionChecker::new(
2022 Arc::clone(&self.runtime),
2023 Arc::clone(&self.health_states),
2024 None,
2025 );
2026
2027 for dep in deps {
2028 if !condition_checker.check(dep).await? {
2029 return Ok(false);
2030 }
2031 }
2032
2033 Ok(true)
2034 }
2035
2036 /// Add or update a workload (service, job, or cron)
2037 ///
2038 /// This method handles different resource types appropriately:
2039 /// - **Service**: Traditional long-running containers with scaling and health checks
2040 /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
2041 /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
2042 ///
2043 /// # Errors
2044 /// Returns an error if service creation, scaling, or cron registration fails.
2045 #[allow(clippy::too_many_lines)]
2046 pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
2047 match spec.rtype {
2048 ResourceType::Service => {
2049 // Long-running service: create/update instance
2050 let mut services = self.services.write().await;
2051
2052 if let Some(instance) = services.get_mut(&name) {
2053 // Update existing service. We need to:
2054 // 1. Update the in-memory spec (so future scale-ups use the new image).
2055 // 2. Recreate the local replicas when the image actually changed —
2056 // either a different image *reference* (e.g. tag bump
2057 // 1.28 -> 1.29), which is a new image regardless of pull
2058 // policy, or, under Always/Newer, observed *digest* drift on
2059 // the same reference.
2060 // The recreate is LOCAL (`scale_service_local`): `upsert_service`
2061 // runs on whichever node owns the replicas (the leader for its
2062 // own share, each worker via the `/internal/scale` handler). Using
2063 // the cluster-routed `scale_service` here would bounce a worker's
2064 // recreate back to the leader and re-enter dispatch. Cluster-wide
2065 // distribution is the caller's job (orchestrate_deployment + the
2066 // scale dispatch that carries this spec to every node).
2067 let image_changed = instance.spec.image.name != spec.image.name;
2068 instance.spec = spec.clone();
2069 if let Some(dns) = &self.dns_server {
2070 instance.set_dns_server(Arc::clone(dns));
2071 }
2072 // Re-apply overlay resolver injection: the spec was just
2073 // replaced wholesale, so any prior injection on the old
2074 // spec is gone. Honors host_network / user-supplied dns.
2075 if let Some(ip) = self.container_dns {
2076 instance.set_container_dns(ip);
2077 }
2078
2079 let effective = spec.image.pull_policy;
2080 let old_digest = instance.last_pulled_digest().await;
2081 let current_replicas =
2082 u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
2083 drop(services); // Release write lock before pull / scale (which take their own locks).
2084
2085 // A changed image reference always recreates. Same-reference
2086 // refreshes are governed by pull policy + digest drift.
2087 let mut should_recreate = image_changed;
2088 let mut new_digest = old_digest.clone();
2089
2090 match effective {
2091 PullPolicy::Never | PullPolicy::IfNotPresent => {
2092 // No proactive pull. If the reference changed we still
2093 // recreate below; the scale-up path pulls the (absent) new
2094 // image per IfNotPresent. A same-reference redeploy under
2095 // these policies is a genuine no-op.
2096 tracing::debug!(
2097 service = %name,
2098 policy = ?effective,
2099 image_changed,
2100 "re-deploy under no-refresh pull policy"
2101 );
2102 }
2103 PullPolicy::Always | PullPolicy::Newer => {
2104 // Pull (this updates the cached digest as a side-effect).
2105 // We need a read guard to keep the instance alive while
2106 // calling its &self method.
2107 let services_ro = self.services.read().await;
2108 new_digest = if let Some(inst) = services_ro.get(&name) {
2109 inst.pull_and_refresh_digest().await?
2110 } else {
2111 // The service vanished between our write-lock release
2112 // and read-lock acquisition (race with remove_service).
2113 // Treat this as a no-op; the caller will see the removal.
2114 tracing::warn!(
2115 service = %name,
2116 "service removed during upsert; skipping drift recreate"
2117 );
2118 return Ok(());
2119 };
2120 drop(services_ro);
2121
2122 // Always forces a recreate. Newer recreates on digest
2123 // drift. When digests are unknown (runtime doesn't expose
2124 // them), we can't observe drift safely under Newer, so the
2125 // reference check above is the only trigger.
2126 should_recreate = should_recreate
2127 || match effective {
2128 PullPolicy::Always => true,
2129 PullPolicy::Newer => match (&old_digest, &new_digest) {
2130 (Some(old), Some(new)) => old != new,
2131 _ => false,
2132 },
2133 _ => false,
2134 };
2135 }
2136 }
2137
2138 if should_recreate && current_replicas > 0 {
2139 tracing::info!(
2140 service = %name,
2141 policy = ?effective,
2142 image_changed,
2143 old_digest = ?old_digest,
2144 new_digest = ?new_digest,
2145 replicas = current_replicas,
2146 "image changed; performing local rolling recreate"
2147 );
2148 self.scale_service_local(&name, 0).await?;
2149 self.scale_service_local(&name, current_replicas).await?;
2150 tracing::info!(
2151 service = %name,
2152 new_digest = ?new_digest,
2153 "service recreated with refreshed image"
2154 );
2155 } else {
2156 tracing::debug!(
2157 service = %name,
2158 policy = ?effective,
2159 old_digest = ?old_digest,
2160 new_digest = ?new_digest,
2161 "service up to date; no recreate required"
2162 );
2163 }
2164 return Ok(());
2165 }
2166 // Create new service with proxy manager for health-aware load balancing
2167 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
2168 let mut instance = if let Some(proxy) = &self.proxy_manager {
2169 ServiceInstance::with_proxy(
2170 name.clone(),
2171 spec,
2172 self.runtime.clone(),
2173 overlay,
2174 Arc::clone(proxy),
2175 )
2176 } else {
2177 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
2178 };
2179 // Thread the local cluster node id so new `ContainerId`s carry
2180 // owning-node identity. Defaults to `0` in single-node mode.
2181 instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
2182 // Thread ingress capability + cluster handle so external service
2183 // domains resolve to an ingress-capable node's overlay IP.
2184 instance.set_ingress_enabled(self.ingress);
2185 if let Some(cluster) = &self.cluster {
2186 instance.set_cluster(Arc::clone(cluster));
2187 }
2188 // Set DNS server if configured
2189 if let Some(dns) = &self.dns_server {
2190 instance.set_dns_server(Arc::clone(dns));
2191 }
2192 // Inject the overlay resolver into the spec so containers use it
2193 // instead of the hijacked host resolv.conf (no-op for
2194 // host_network / user-supplied dns).
2195 if let Some(ip) = self.container_dns {
2196 instance.set_container_dns(ip);
2197 }
2198 // Wire shared health states so callbacks bridge back to ServiceManager
2199 instance.set_health_states(Arc::clone(&self.health_states));
2200 // Register HTTP routes via proxy manager
2201 if let Some(proxy) = &self.proxy_manager {
2202 proxy.add_service(&name, &instance.spec).await;
2203 }
2204 // Register TCP/UDP endpoints in stream registry
2205 if let Some(stream_registry) = &self.stream_registry {
2206 for endpoint in &instance.spec.endpoints {
2207 let svc = StreamService::new(
2208 name.clone(),
2209 Vec::new(), // No backends yet; added on scale-up
2210 );
2211 match endpoint.protocol {
2212 Protocol::Tcp => {
2213 stream_registry.register_tcp(endpoint.port, svc);
2214 tracing::debug!(
2215 service = %name,
2216 port = endpoint.port,
2217 "Registered TCP stream route"
2218 );
2219 }
2220 Protocol::Udp => {
2221 stream_registry.register_udp(endpoint.port, svc);
2222 tracing::debug!(
2223 service = %name,
2224 port = endpoint.port,
2225 "Registered UDP stream route"
2226 );
2227 }
2228 _ => {} // HTTP routes handled by proxy manager
2229 }
2230 }
2231 }
2232 services.insert(name, instance);
2233 }
2234 ResourceType::Job => {
2235 // Job: Just store the spec for later triggering
2236 // Jobs don't start containers immediately; they're triggered on-demand
2237 if let Some(executor) = &self.job_executor {
2238 executor.register_job(&name, spec).await;
2239 tracing::info!(job = %name, "Registered job spec");
2240 } else {
2241 tracing::warn!(
2242 job = %name,
2243 "Job executor not configured, storing as service for reference"
2244 );
2245 // Fallback: store as service instance for reference
2246 let mut services = self.services.write().await;
2247 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
2248 let mut instance = if let Some(proxy) = &self.proxy_manager {
2249 ServiceInstance::with_proxy(
2250 name.clone(),
2251 spec,
2252 self.runtime.clone(),
2253 overlay,
2254 Arc::clone(proxy),
2255 )
2256 } else {
2257 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
2258 };
2259 // Thread the local cluster node id (same as the Service
2260 // branch above) so the fallback-as-service Job entry also
2261 // carries owning-node identity.
2262 instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
2263 // Thread ingress capability + cluster handle (same as the
2264 // Service branch).
2265 instance.set_ingress_enabled(self.ingress);
2266 if let Some(cluster) = &self.cluster {
2267 instance.set_cluster(Arc::clone(cluster));
2268 }
2269 // Set DNS server if configured
2270 if let Some(dns) = &self.dns_server {
2271 instance.set_dns_server(Arc::clone(dns));
2272 }
2273 // Inject the overlay resolver (no-op for host_network /
2274 // user-supplied dns).
2275 if let Some(ip) = self.container_dns {
2276 instance.set_container_dns(ip);
2277 }
2278 services.insert(name, instance);
2279 }
2280 }
2281 ResourceType::Cron => {
2282 // Cron: Register with the cron scheduler
2283 if let Some(scheduler) = &self.cron_scheduler {
2284 scheduler.register(&name, &spec).await?;
2285 tracing::info!(cron = %name, "Registered cron job with scheduler");
2286 } else {
2287 return Err(AgentError::Configuration(format!(
2288 "Cron scheduler not configured for cron job '{name}'"
2289 )));
2290 }
2291 }
2292 }
2293
2294 Ok(())
2295 }
2296
2297 /// Update backend addresses via `ProxyManager` after scaling, applying
2298 /// per-endpoint `target_role` filtering.
2299 ///
2300 /// For each L7 endpoint of the service, this collects the subset of
2301 /// containers whose `ContainerId.role` matches `endpoint.target_role`
2302 /// (or all containers when `target_role` is `None`) and updates the
2303 /// proxy's backend pool for that specific endpoint via
2304 /// [`ProxyManager::update_endpoint_backends`].
2305 async fn update_proxy_backends(&self, instance: &ServiceInstance) {
2306 let Some(proxy) = &self.proxy_manager else {
2307 return;
2308 };
2309 for endpoint in &instance.spec.endpoints {
2310 // Only L7 endpoints flow through the proxy (HTTP/HTTPS/WS).
2311 if !matches!(
2312 endpoint.protocol,
2313 Protocol::Http | Protocol::Https | Protocol::Websocket
2314 ) {
2315 continue;
2316 }
2317 let addrs = self.collect_endpoint_backends(instance, endpoint).await;
2318 proxy
2319 .update_endpoint_backends(&instance.service_name, &endpoint.name, addrs)
2320 .await;
2321 }
2322 }
2323
2324 /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
2325 ///
2326 /// For containers with a port override (macOS sandbox), the addresses already
2327 /// carry the runtime-assigned port. In that case, the container listens on the
2328 /// override port for all traffic, so we use the address port directly. For
2329 /// containers without a port override (Linux, VMs), we reconstruct addresses
2330 /// using the endpoint's declared port, since each container has its own IP
2331 /// and can bind any port independently.
2332 async fn update_stream_backends(&self, instance: &ServiceInstance) {
2333 let Some(stream_registry) = &self.stream_registry else {
2334 return;
2335 };
2336
2337 for endpoint in &instance.spec.endpoints {
2338 match endpoint.protocol {
2339 Protocol::Tcp => {
2340 let tcp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2341 let backend_count = tcp_backends.len();
2342 stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
2343 tracing::debug!(
2344 endpoint = %endpoint.name,
2345 port = endpoint.port,
2346 backend_count = backend_count,
2347 target_role = ?endpoint.target_role,
2348 "Updated TCP stream backends"
2349 );
2350 }
2351 Protocol::Udp => {
2352 let udp_backends = self.collect_endpoint_backends(instance, endpoint).await;
2353 let backend_count = udp_backends.len();
2354 stream_registry.update_udp_backends(endpoint.port, udp_backends);
2355 tracing::debug!(
2356 endpoint = %endpoint.name,
2357 port = endpoint.port,
2358 backend_count = backend_count,
2359 target_role = ?endpoint.target_role,
2360 "Updated UDP stream backends"
2361 );
2362 }
2363 _ => {} // HTTP endpoints handled by update_proxy_backends
2364 }
2365 }
2366 }
2367
2368 /// Scale a service. Cluster-aware: if this node has a `Cluster` handle
2369 /// and we're not the leader, forward to the leader; if leader, compute
2370 /// affinity-aware placement and dispatch each node its share via
2371 /// `dispatch_scale_distributed`; else (single-node) just scale locally.
2372 ///
2373 /// # Errors
2374 /// Returns an error if scaling fails on any participating node.
2375 #[allow(clippy::cast_possible_truncation)]
2376 pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
2377 use zlayer_scheduler::cluster::InternalScaleRequest;
2378
2379 tracing::info!(
2380 target: "zlayer::scale_distribute",
2381 service = name,
2382 replicas,
2383 has_cluster = self.cluster.is_some(),
2384 "scale_service ENTER"
2385 );
2386
2387 // Attach the current spec so every receiving node can register/update
2388 // the service before scaling. This is what propagates an image change
2389 // to worker containers and lets a fresh worker run a replica it has
2390 // never seen. `None` if the service isn't registered locally (the
2391 // receiver then falls back to its own cached spec).
2392 let spec = self
2393 .services
2394 .read()
2395 .await
2396 .get(name)
2397 .map(|inst| inst.spec.clone());
2398 let build_req = |replicas: u32| {
2399 let req = InternalScaleRequest::new(name, replicas);
2400 match spec.clone() {
2401 Some(s) => req.with_spec(s),
2402 None => req,
2403 }
2404 };
2405
2406 if let Some(cluster) = &self.cluster {
2407 let is_leader = cluster.is_leader().await;
2408 tracing::info!(
2409 target: "zlayer::scale_distribute",
2410 service = name,
2411 replicas,
2412 is_leader,
2413 spec_affinity = ?spec.as_ref().and_then(|s| s.affinity.clone()),
2414 "scale_service: cluster path"
2415 );
2416 if !is_leader {
2417 // Follower: forward to the leader and let it dispatch.
2418 return cluster
2419 .forward_scale(build_req(replicas))
2420 .await
2421 .map_err(|e| AgentError::CreateFailed {
2422 id: name.to_string(),
2423 reason: format!("cluster forward: {e}"),
2424 });
2425 }
2426
2427 // Leader path. Compute affinity-aware placement across the Ready
2428 // node set and dispatch each node its share. `dispatch_scale_distributed`
2429 // reuses the same placement machinery as one-off container placement
2430 // (`cluster_nodes_to_node_states` + `place_service_replicas`), honoring
2431 // `ServiceSpec.affinity` (`spread`/`pack`/`pin`). The leader's own
2432 // share short-circuits to a local call (no localhost HTTP round-trip),
2433 // and the attached spec lets fresh workers register the service before
2434 // scaling. Single-node clusters fall through the default impl, which
2435 // dispatches everything to this node (unchanged behavior).
2436 return cluster
2437 .dispatch_scale_distributed(build_req(replicas))
2438 .await
2439 .map_err(|e| AgentError::CreateFailed {
2440 id: name.to_string(),
2441 reason: format!("cluster dispatch: {e}"),
2442 });
2443 }
2444
2445 // No cluster handle — single-node mode.
2446 self.scale_service_local(name, replicas).await
2447 }
2448
2449 /// Local (single-node) scale: directly creates/destroys containers on
2450 /// this node only. Called by:
2451 /// - `scale_service` in single-node mode (when `self.cluster` is None).
2452 /// - The `/api/v1/internal/scale` handler (which the leader's
2453 /// `Cluster::dispatch_scale` HTTP-POSTs to, bottoming out the
2454 /// recursive loop on each receiving node).
2455 /// - The cluster impls' `local_dispatch` closure (for the leader's own
2456 /// share — short-circuited to avoid a localhost round-trip).
2457 ///
2458 /// # Errors
2459 /// Returns an error if the service is not found or scaling fails.
2460 #[allow(clippy::cast_possible_truncation)]
2461 pub async fn scale_service_local(&self, name: &str, replicas: u32) -> Result<()> {
2462 tracing::info!(
2463 target: "zlayer::scale_distribute",
2464 service = name,
2465 replicas,
2466 "scale_service_local ENTER"
2467 );
2468 let _permit = self.scale_semaphore.acquire().await;
2469
2470 let services = self.services.read().await;
2471 let Some(instance) = services.get(name) else {
2472 // Draining a service this node never hosted is a no-op (e.g. the
2473 // leader fans out `count=0` to a node to drain it during a
2474 // scale-down, but that node never ran the service).
2475 if replicas == 0 {
2476 return Ok(());
2477 }
2478 return Err(AgentError::NotFound {
2479 container: name.to_string(),
2480 reason: "service not found".to_string(),
2481 });
2482 };
2483
2484 // Get current replica count before scaling
2485 let current_replicas = instance.replica_count().await as u32;
2486
2487 // Perform the scaling operation
2488 instance.scale_to(replicas).await?;
2489
2490 // After scaling, update proxy and stream backends for each endpoint.
2491 // Per-endpoint collection (rather than a single service-wide list)
2492 // is what makes `EndpointSpec.target_role` filtering possible:
2493 // each endpoint receives only the containers whose
2494 // `ContainerId.role` matches its declared role.
2495 if self.proxy_manager.is_some() {
2496 self.update_proxy_backends(instance).await;
2497 }
2498 if self.stream_registry.is_some() {
2499 self.update_stream_backends(instance).await;
2500 }
2501
2502 // Register new containers with supervisor for crash monitoring.
2503 //
2504 // Container ids here must match what `ServiceInstance::scale_to`
2505 // constructed — same role (derived from `replica_groups`) and same
2506 // local node id. Otherwise supervise/unsupervise miss the live entry
2507 // and crash-restart bookkeeping leaks across scale events.
2508 let local_node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2509 if let Some(supervisor) = &self.container_supervisor {
2510 // For scale-up, register new containers
2511 if replicas > current_replicas {
2512 for i in current_replicas..replicas {
2513 let replica_idx = i + 1;
2514 let container_id = ContainerId::with_role_and_node(
2515 name.to_string(),
2516 replica_idx,
2517 instance.role_for_replica(replica_idx),
2518 local_node_id,
2519 );
2520 supervisor.supervise(&container_id, &instance.spec).await;
2521 }
2522 }
2523 // For scale-down, unregister removed containers
2524 if replicas < current_replicas {
2525 for i in replicas..current_replicas {
2526 let replica_idx = i + 1;
2527 let container_id = ContainerId::with_role_and_node(
2528 name.to_string(),
2529 replica_idx,
2530 instance.role_for_replica(replica_idx),
2531 local_node_id,
2532 );
2533 supervisor.unsupervise(&container_id).await;
2534 }
2535 }
2536 }
2537
2538 Ok(())
2539 }
2540
2541 /// Collect backend addresses for a single endpoint of a service.
2542 ///
2543 /// This queries the service instance's containers for their overlay
2544 /// network IP addresses and constructs backend addresses using the
2545 /// endpoint's container target port.
2546 ///
2547 /// Containers are filtered by `endpoint.target_role`:
2548 /// - `None` (default): all containers of the service are eligible
2549 /// (legacy behavior).
2550 /// - `Some(role)`: only containers whose `ContainerId.role` equals
2551 /// `role` are included. Implements
2552 /// [`zlayer_spec::EndpointSpec::target_role`].
2553 ///
2554 /// If a container has a `port_override` (e.g., macOS sandbox where all
2555 /// containers share the host network), that port is used instead of
2556 /// the spec-declared endpoint port. This allows multiple replicas on
2557 /// the same IP (`127.0.0.1`) to be distinguished by port.
2558 async fn collect_endpoint_backends(
2559 &self,
2560 instance: &ServiceInstance,
2561 endpoint: &zlayer_spec::EndpointSpec,
2562 ) -> Vec<SocketAddr> {
2563 let mut addrs = Vec::new();
2564 let endpoint_port = endpoint.target_port();
2565 let containers = instance.containers().read().await;
2566
2567 for (container_id, container) in containers.iter() {
2568 // target_role filter: skip containers whose role doesn't match.
2569 if let Some(required_role) = endpoint.target_role.as_ref() {
2570 if container_id.role != *required_role {
2571 continue;
2572 }
2573 }
2574 let Some(ip) = container.overlay_ip else {
2575 continue;
2576 };
2577 // Use the runtime-assigned port override if present (macOS
2578 // sandbox), otherwise fall back to the endpoint's declared
2579 // target port.
2580 let port = container.port_override.unwrap_or(endpoint_port);
2581 addrs.push(SocketAddr::new(ip, port));
2582 }
2583
2584 // If we expected backends but found none, log a hint so operators
2585 // can debug. Distinguish "no containers" from "role filter
2586 // excluded everything" from "no overlay IPs".
2587 if addrs.is_empty() && !containers.is_empty() {
2588 tracing::warn!(
2589 service = %instance.service_name,
2590 endpoint = %endpoint.name,
2591 target_role = ?endpoint.target_role,
2592 container_count = containers.len(),
2593 "no backends collected for endpoint - either no matching role, no overlay IPs, or filtering excluded all"
2594 );
2595 }
2596
2597 addrs
2598 }
2599
2600 /// Get service replica count
2601 ///
2602 /// # Errors
2603 /// Returns an error if the service is not found.
2604 pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
2605 let services = self.services.read().await;
2606 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
2607 container: name.to_string(),
2608 reason: "service not found".to_string(),
2609 })?;
2610
2611 Ok(instance.replica_count().await)
2612 }
2613
2614 /// Remove a workload (service, job, or cron)
2615 ///
2616 /// This method handles cleanup for different resource types:
2617 /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
2618 /// - **Job**: Unregisters from job executor
2619 /// - **Cron**: Unregisters from cron scheduler
2620 ///
2621 /// # Errors
2622 /// Returns an error if the service cannot be removed or scale-down fails.
2623 pub async fn remove_service(&self, name: &str) -> Result<()> {
2624 // Try to unregister from cron scheduler first
2625 if let Some(scheduler) = &self.cron_scheduler {
2626 scheduler.unregister(name).await;
2627 }
2628
2629 // Try to unregister from job executor
2630 if let Some(executor) = &self.job_executor {
2631 executor.unregister_job(name).await;
2632 }
2633
2634 // Unregister stream routes (TCP/UDP) from the stream registry
2635 if let Some(stream_registry) = &self.stream_registry {
2636 // Need to get the service spec to know which ports to unregister
2637 let services = self.services.read().await;
2638 if let Some(instance) = services.get(name) {
2639 for endpoint in &instance.spec.endpoints {
2640 match endpoint.protocol {
2641 Protocol::Tcp => {
2642 let _ = stream_registry.unregister_tcp(endpoint.port);
2643 tracing::debug!(
2644 service = %name,
2645 port = endpoint.port,
2646 "Unregistered TCP stream route"
2647 );
2648 }
2649 Protocol::Udp => {
2650 let _ = stream_registry.unregister_udp(endpoint.port);
2651 tracing::debug!(
2652 service = %name,
2653 port = endpoint.port,
2654 "Unregistered UDP stream route"
2655 );
2656 }
2657 _ => {} // HTTP routes handled above
2658 }
2659 }
2660 }
2661 drop(services); // Release read lock
2662 }
2663
2664 // Unpublish node-loopback ports for every live replica of this
2665 // service so the loopback listeners are freed (mirror of the
2666 // per-replica unpublish in `ServiceInstance::scale_to`). Gated on the
2667 // spec's policy; recomputes each backend from the container's stored
2668 // runtime-resolved IP and port override.
2669 {
2670 let services = self.services.read().await;
2671 if let Some(instance) = services.get(name) {
2672 if instance.spec.publish_to_node_loopback() {
2673 if let Some(proxy) = instance.proxy_manager() {
2674 let containers = instance.containers().read().await;
2675 for container in containers.values() {
2676 if let Some(ip) = container.overlay_ip {
2677 proxy
2678 .unpublish_loopback_for_container(
2679 &instance.spec,
2680 ip,
2681 container.port_override,
2682 )
2683 .await;
2684 }
2685 }
2686 }
2687 }
2688 }
2689 drop(services); // Release read lock
2690 }
2691
2692 // Unregister containers from the supervisor
2693 if let Some(supervisor) = &self.container_supervisor {
2694 let containers = self.get_service_containers(name).await;
2695 for container_id in containers {
2696 supervisor.unsupervise(&container_id).await;
2697 }
2698 tracing::debug!(service = %name, "Unregistered containers from supervisor");
2699 }
2700
2701 // Clean up DNS records for the service (bare name + FQDNs).
2702 self.cleanup_service_dns(name).await;
2703
2704 // Remove from services map (may or may not exist depending on rtype)
2705 let mut services = self.services.write().await;
2706 if services.remove(name).is_some() {
2707 tracing::debug!(service = %name, "Removed service from manager");
2708 }
2709
2710 Ok(())
2711 }
2712
2713 /// Remove every DNS record this service registered on attach: the bare
2714 /// compose service name (`{service}`), the service-level FQDN
2715 /// (`{service}.service.local`), and each replica's FQDN
2716 /// (`{replica}.{service}.service.local`). Best-effort; failures are logged.
2717 async fn cleanup_service_dns(&self, name: &str) {
2718 let Some(dns) = &self.dns_server else {
2719 return;
2720 };
2721
2722 // Bare compose service-name record (compose discovery).
2723 if let Err(e) = dns.remove_record(name).await {
2724 tracing::warn!(
2725 hostname = %name,
2726 error = %e,
2727 "failed to remove bare service-name DNS record"
2728 );
2729 }
2730
2731 // Service-level FQDN.
2732 let service_hostname = format!("{name}.service.local");
2733 if let Err(e) = dns.remove_record(&service_hostname).await {
2734 tracing::warn!(
2735 hostname = %service_hostname,
2736 error = %e,
2737 "failed to remove service DNS record"
2738 );
2739 } else {
2740 tracing::debug!(hostname = %service_hostname, "removed service DNS record");
2741 }
2742
2743 // Any remaining replica-specific FQDNs.
2744 let services = self.services.read().await;
2745 if let Some(instance) = services.get(name) {
2746 let containers = instance.containers().read().await;
2747 for (id, _) in containers.iter() {
2748 let replica_hostname = format!("{}.{}.service.local", id.replica, name);
2749 if let Err(e) = dns.remove_record(&replica_hostname).await {
2750 tracing::warn!(
2751 hostname = %replica_hostname,
2752 error = %e,
2753 "failed to remove replica DNS record during service removal"
2754 );
2755 }
2756 }
2757 }
2758 }
2759
2760 /// Introspect service infrastructure wiring.
2761 /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
2762 pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
2763 let services = self.services.read().await;
2764 services.get(name).map(|i| {
2765 (
2766 i.has_overlay_manager(),
2767 i.has_proxy_manager(),
2768 i.has_dns_server(),
2769 )
2770 })
2771 }
2772
2773 /// List all services
2774 pub async fn list_services(&self) -> Vec<String> {
2775 self.services.read().await.keys().cloned().collect()
2776 }
2777
2778 /// Get logs for a service, aggregated from all container replicas.
2779 ///
2780 /// # Arguments
2781 /// * `service_name` - Name of the service to fetch logs for
2782 /// * `tail` - Number of lines to return per container (0 = all)
2783 /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
2784 ///
2785 /// # Errors
2786 /// Returns an error if the service or instance is not found.
2787 ///
2788 /// # Returns
2789 /// Structured log entries from all (or specific) container replicas. Each
2790 /// entry has its `service` and `deployment` fields populated when available.
2791 pub async fn get_service_logs(
2792 &self,
2793 service_name: &str,
2794 tail: usize,
2795 instance: Option<&str>,
2796 ) -> Result<Vec<LogEntry>> {
2797 let container_ids = self.get_service_containers(service_name).await;
2798
2799 if container_ids.is_empty() {
2800 return Err(AgentError::NotFound {
2801 container: service_name.to_string(),
2802 reason: "no containers found for service".to_string(),
2803 });
2804 }
2805
2806 // If a specific instance is requested, filter to just that one
2807 let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
2808 if let Ok(replica_num) = inst.parse::<u32>() {
2809 container_ids
2810 .iter()
2811 .filter(|id| id.replica == replica_num)
2812 .collect()
2813 } else {
2814 // Try matching by full container ID string suffix
2815 container_ids
2816 .iter()
2817 .filter(|id| id.to_string().contains(inst))
2818 .collect()
2819 }
2820 } else {
2821 container_ids.iter().collect()
2822 };
2823
2824 if target_ids.is_empty() {
2825 return Err(AgentError::NotFound {
2826 container: format!("{}/{}", service_name, instance.unwrap_or("?")),
2827 reason: "instance not found".to_string(),
2828 });
2829 }
2830
2831 let mut all_entries: Vec<LogEntry> = Vec::new();
2832
2833 for id in &target_ids {
2834 match self.runtime.container_logs(id, tail).await {
2835 Ok(mut entries) => {
2836 // Populate service and deployment metadata on each entry
2837 for entry in &mut entries {
2838 if entry.service.is_none() {
2839 entry.service = Some(service_name.to_string());
2840 }
2841 if entry.deployment.is_none() {
2842 entry.deployment.clone_from(&self.deployment_name);
2843 }
2844 }
2845 all_entries.extend(entries);
2846 }
2847 Err(e) => {
2848 tracing::warn!(
2849 service = service_name,
2850 container = %id,
2851 error = %e,
2852 "Failed to read container logs"
2853 );
2854 }
2855 }
2856 }
2857
2858 Ok(all_entries)
2859 }
2860
2861 /// Get all container IDs for a specific service
2862 ///
2863 /// Returns an empty vector if the service doesn't exist.
2864 ///
2865 /// # Arguments
2866 /// * `service_name` - Name of the service to query
2867 ///
2868 /// # Returns
2869 /// Vector of `ContainerIds` for all replicas of the service
2870 pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
2871 let services = self.services.read().await;
2872 if let Some(instance) = services.get(service_name) {
2873 instance.container_ids().await
2874 } else {
2875 Vec::new()
2876 }
2877 }
2878
2879 /// Get per-container info (id, image, real state, pid, overlay IP) for a
2880 /// specific service.
2881 ///
2882 /// Unlike [`get_service_containers`](Self::get_service_containers) (which
2883 /// returns ids only), this surfaces the REAL image reference and lifecycle
2884 /// state of each live container so the API/`ps` can report them accurately.
2885 ///
2886 /// Returns an empty vector if the service doesn't exist.
2887 pub async fn get_service_container_infos(&self, service_name: &str) -> Vec<ContainerInfo> {
2888 let services = self.services.read().await;
2889 if let Some(instance) = services.get(service_name) {
2890 instance.container_infos().await
2891 } else {
2892 Vec::new()
2893 }
2894 }
2895
2896 /// This node's **local** view of `service` (running replica count, health,
2897 /// containers), used for cluster-wide aggregation. Served by the internal
2898 /// `/api/v1/internal/services/{svc}/state` endpoint and used as the local
2899 /// part of [`Self::cluster_service_states`].
2900 pub async fn local_service_state(
2901 &self,
2902 service: &str,
2903 ) -> zlayer_types::cluster::NodeServiceState {
2904 use zlayer_types::cluster::{ClusterContainerSummary, NodeServiceState};
2905 let node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
2906 let infos = self.get_service_container_infos(service).await;
2907 #[allow(clippy::cast_possible_truncation)]
2908 let running = infos
2909 .iter()
2910 .filter(|i| i.state.eq_ignore_ascii_case("running"))
2911 .count() as u32;
2912 // A node running 0 replicas is trivially healthy (it can't drag the
2913 // cluster-wide aggregate). Otherwise require a Healthy health state.
2914 let healthy = if running == 0 {
2915 true
2916 } else {
2917 let states = self.health_states();
2918 let guard = states.read().await;
2919 matches!(guard.get(service), Some(HealthState::Healthy))
2920 };
2921 let containers = infos
2922 .into_iter()
2923 .map(|i| ClusterContainerSummary {
2924 node_id,
2925 id: i.id.to_string(),
2926 service: i.id.service.clone(),
2927 replica: i.id.replica,
2928 image: i.image,
2929 state: i.state,
2930 pid: i.pid,
2931 overlay_ip: i.overlay_ip,
2932 })
2933 .collect();
2934 NodeServiceState {
2935 node_id,
2936 running,
2937 healthy,
2938 containers,
2939 }
2940 }
2941
2942 /// Cluster-wide per-node states for `service`: this node's local view plus
2943 /// every other node's (fetched via the cluster handle's
2944 /// `fetch_remote_service_states`). When not clustered, returns just the
2945 /// local view. This is the source of truth for distributed-service replica
2946 /// counts, health, and the `ps` container listing on the leader.
2947 pub async fn cluster_service_states(
2948 &self,
2949 service: &str,
2950 ) -> Vec<zlayer_types::cluster::NodeServiceState> {
2951 let mut states = vec![self.local_service_state(service).await];
2952 if let Some(cluster) = &self.cluster {
2953 states.extend(cluster.fetch_remote_service_states(service).await);
2954 }
2955 states
2956 }
2957
2958 /// Execute a command inside a running container for a service
2959 ///
2960 /// Picks a specific replica if provided, otherwise uses the first available container.
2961 ///
2962 /// # Arguments
2963 /// * `service_name` - Name of the service
2964 /// * `replica` - Optional replica number to target
2965 /// * `cmd` - Command and arguments to execute
2966 ///
2967 /// # Errors
2968 /// Returns an error if the service or replica is not found, or if exec fails.
2969 ///
2970 /// # Panics
2971 /// Panics if no replica is specified and the container list is unexpectedly empty
2972 /// after the emptiness check (should not happen in practice).
2973 ///
2974 /// # Returns
2975 /// Tuple of (`exit_code`, stdout, stderr)
2976 pub async fn exec_in_container(
2977 &self,
2978 service_name: &str,
2979 replica: Option<u32>,
2980 cmd: &[String],
2981 ) -> Result<(i32, String, String)> {
2982 let container_ids = self.get_service_containers(service_name).await;
2983
2984 if container_ids.is_empty() {
2985 return Err(AgentError::NotFound {
2986 container: service_name.to_string(),
2987 reason: "no containers found for service".to_string(),
2988 });
2989 }
2990
2991 // Pick the target container
2992 let target = if let Some(rep) = replica {
2993 container_ids
2994 .into_iter()
2995 .find(|cid| cid.replica == rep)
2996 .ok_or_else(|| AgentError::NotFound {
2997 container: format!("{service_name}-rep-{rep}"),
2998 reason: format!("replica {rep} not found for service"),
2999 })?
3000 } else {
3001 // Use the first container (lowest replica number)
3002 container_ids.into_iter().next().unwrap()
3003 };
3004
3005 self.runtime.exec(&target, cmd).await
3006 }
3007
3008 /// List every live container across all services, enriched with the data a
3009 /// Docker `ps` row needs and the data the Docker-name resolver needs.
3010 ///
3011 /// For each running container this surfaces the deployment name, the service
3012 /// name, the concrete [`ContainerId`], the compose `container_name:` label
3013 /// (when set, the user-facing Docker name), the real image, the lifecycle
3014 /// state, and the service's published port mappings. Used by the unified
3015 /// container-name resolver and by `docker ps` so compose deployments show up
3016 /// and resolve by their `container_name`.
3017 pub async fn list_container_views(&self) -> Vec<DeploymentContainerView> {
3018 let deployment = self.deployment_name.clone();
3019 let services = self.services.read().await;
3020 let mut out = Vec::new();
3021 for (service_name, instance) in services.iter() {
3022 let container_name = instance
3023 .spec
3024 .labels
3025 .get("com.docker.compose.container_name")
3026 .cloned();
3027 let ports = instance.spec.port_mappings.clone();
3028 for info in instance.container_infos().await {
3029 out.push(DeploymentContainerView {
3030 deployment: deployment.clone(),
3031 service: service_name.clone(),
3032 container_id: info.id,
3033 container_name: container_name.clone(),
3034 image: info.image,
3035 state: info.state,
3036 pid: info.pid,
3037 ports: ports.clone(),
3038 });
3039 }
3040 }
3041 out
3042 }
3043
3044 /// Resolve a Docker-style container name/id to a live deployment
3045 /// [`ContainerId`].
3046 ///
3047 /// Matching precedence (first hit wins):
3048 /// 1. The compose `container_name:` label (e.g. `forgejo-e2e`).
3049 /// 2. The conventional compose names `{deployment}-{service}-{replica}` and
3050 /// `{deployment}_{service}_{replica}` (replica is 1-based, mirroring
3051 /// Docker Compose; `ContainerId.replica` is 0-based so we add 1).
3052 /// 3. The bare service name (`{service}`), targeting its first replica.
3053 /// 4. The [`ContainerId`] `Display` form.
3054 ///
3055 /// Returns `None` when nothing matches a *running* container.
3056 pub async fn resolve_container_name(&self, name: &str) -> Option<ContainerId> {
3057 let views = self.list_container_views().await;
3058 // 1. explicit container_name label.
3059 if let Some(v) = views
3060 .iter()
3061 .find(|v| v.container_name.as_deref() == Some(name))
3062 {
3063 return Some(v.container_id.clone());
3064 }
3065 // 2 & 3. conventional names + bare service name.
3066 for v in &views {
3067 let dep = v.deployment.as_deref().unwrap_or("");
3068 let svc = &v.service;
3069 let rep1 = v.container_id.replica + 1;
3070 let candidates = [
3071 format!("{dep}-{svc}-{rep1}"),
3072 format!("{dep}_{svc}_{rep1}"),
3073 svc.clone(),
3074 ];
3075 if candidates.iter().any(|c| c == name) {
3076 return Some(v.container_id.clone());
3077 }
3078 }
3079 // 4. ContainerId Display form.
3080 for v in &views {
3081 if v.container_id.to_string() == name {
3082 return Some(v.container_id.clone());
3083 }
3084 }
3085 None
3086 }
3087
3088 /// Execute a command in a specific deployment container (by its concrete
3089 /// [`ContainerId`]) honouring Docker `exec` options (`--user`, `-w`, `-e`).
3090 ///
3091 /// Routes through [`Runtime::exec_with_opts`] so runtimes that support
3092 /// dropping to a uid/gid + chdir + env injection (macOS VZ-Linux) apply
3093 /// them; others fall back to a plain buffered exec.
3094 ///
3095 /// # Errors
3096 /// Propagates the runtime's exec error.
3097 pub async fn exec_in_container_id_with_opts(
3098 &self,
3099 id: &ContainerId,
3100 opts: &crate::runtime::ExecOptions,
3101 ) -> Result<(i32, String, String)> {
3102 self.runtime.exec_with_opts(id, opts).await
3103 }
3104
3105 // ==================== Job Management ====================
3106
3107 /// Trigger a job execution
3108 ///
3109 /// # Arguments
3110 /// * `name` - Name of the registered job
3111 /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
3112 ///
3113 /// # Returns
3114 /// The execution ID for tracking the job
3115 ///
3116 /// # Errors
3117 /// - Returns error if job executor is not configured
3118 /// - Returns error if the job is not registered
3119 pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
3120 let executor = self
3121 .job_executor
3122 .as_ref()
3123 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
3124
3125 let spec = executor
3126 .get_job_spec(name)
3127 .await
3128 .ok_or_else(|| AgentError::NotFound {
3129 container: name.to_string(),
3130 reason: "job not registered".to_string(),
3131 })?;
3132
3133 executor.trigger(name, &spec, trigger).await
3134 }
3135
3136 /// Get the status of a job execution
3137 ///
3138 /// # Arguments
3139 /// * `id` - The execution ID returned from `trigger_job`
3140 ///
3141 /// # Returns
3142 /// The job execution details, or None if not found
3143 pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
3144 if let Some(executor) = &self.job_executor {
3145 executor.get_execution(id).await
3146 } else {
3147 None
3148 }
3149 }
3150
3151 /// List all executions for a specific job
3152 ///
3153 /// # Arguments
3154 /// * `name` - Name of the job
3155 ///
3156 /// # Returns
3157 /// Vector of job executions for the specified job
3158 pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
3159 if let Some(executor) = &self.job_executor {
3160 executor.list_executions(name).await
3161 } else {
3162 Vec::new()
3163 }
3164 }
3165
3166 /// Cancel a running job execution
3167 ///
3168 /// # Arguments
3169 /// * `id` - The execution ID to cancel
3170 ///
3171 /// # Errors
3172 /// Returns error if job executor is not configured or if cancellation fails
3173 pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
3174 let executor = self
3175 .job_executor
3176 .as_ref()
3177 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
3178
3179 executor.cancel(id).await
3180 }
3181
3182 // ==================== Cron Management ====================
3183
3184 /// Manually trigger a cron job (outside of its schedule)
3185 ///
3186 /// # Arguments
3187 /// * `name` - Name of the cron job
3188 ///
3189 /// # Returns
3190 /// The execution ID for tracking the triggered job
3191 ///
3192 /// # Errors
3193 /// Returns error if cron scheduler is not configured or job not found
3194 pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
3195 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
3196 AgentError::Configuration("Cron scheduler not configured".to_string())
3197 })?;
3198
3199 scheduler.trigger_now(name).await
3200 }
3201
3202 /// Enable or disable a cron job
3203 ///
3204 /// # Arguments
3205 /// * `name` - Name of the cron job
3206 /// * `enabled` - Whether to enable or disable the job
3207 pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
3208 if let Some(scheduler) = &self.cron_scheduler {
3209 scheduler.set_enabled(name, enabled).await;
3210 }
3211 }
3212
3213 /// List all registered cron jobs
3214 pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
3215 if let Some(scheduler) = &self.cron_scheduler {
3216 scheduler.list_jobs().await
3217 } else {
3218 Vec::new()
3219 }
3220 }
3221
3222 /// Start the cron scheduler background task
3223 ///
3224 /// This spawns a background task that checks for due cron jobs every second.
3225 /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
3226 ///
3227 /// # Errors
3228 /// Returns error if cron scheduler is not configured
3229 pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
3230 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
3231 AgentError::Configuration("Cron scheduler not configured".to_string())
3232 })?;
3233
3234 let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
3235 Ok(tokio::spawn(async move {
3236 scheduler.run_loop().await;
3237 }))
3238 }
3239
3240 /// Shutdown the cron scheduler
3241 pub fn shutdown_cron(&self) {
3242 if let Some(scheduler) = &self.cron_scheduler {
3243 scheduler.shutdown();
3244 }
3245 }
3246}
3247
3248#[cfg(test)]
3249#[allow(deprecated)]
3250mod tests {
3251 use super::*;
3252 use crate::runtime::MockRuntime;
3253
3254 #[tokio::test]
3255 async fn test_service_manager() {
3256 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3257 let manager = ServiceManager::new(runtime);
3258
3259 // Add service
3260 let spec = mock_spec();
3261 Box::pin(manager.upsert_service("test".to_string(), spec))
3262 .await
3263 .unwrap();
3264
3265 // Scale up
3266 manager.scale_service("test", 3).await.unwrap();
3267
3268 // Check count
3269 let count = manager.service_replica_count("test").await.unwrap();
3270 assert_eq!(count, 3);
3271
3272 // List services
3273 let services = manager.list_services().await;
3274 assert_eq!(services, vec!["test".to_string()]);
3275 }
3276
3277 #[tokio::test]
3278 async fn test_service_manager_basic_lifecycle() {
3279 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3280 let manager = ServiceManager::new(runtime);
3281
3282 // Add service with HTTP endpoint
3283 let spec = mock_spec();
3284 Box::pin(manager.upsert_service("api".to_string(), spec))
3285 .await
3286 .unwrap();
3287
3288 // Scale up
3289 manager.scale_service("api", 2).await.unwrap();
3290
3291 // Check count
3292 let count = manager.service_replica_count("api").await.unwrap();
3293 assert_eq!(count, 2);
3294
3295 // Remove service
3296 manager.remove_service("api").await.unwrap();
3297
3298 // Verify service is gone
3299 let services = manager.list_services().await;
3300 assert!(!services.contains(&"api".to_string()));
3301 }
3302
3303 #[tokio::test]
3304 async fn test_service_manager_with_full_config() {
3305 use tokio::sync::RwLock;
3306
3307 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3308
3309 // Create a mock overlay manager (skip actual network setup)
3310 let overlay_manager = Arc::new(RwLock::new(
3311 OverlayManager::new("test-deployment".to_string(), "test".to_string())
3312 .await
3313 .unwrap(),
3314 ));
3315
3316 let manager =
3317 ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
3318
3319 // Add service
3320 let spec = mock_spec();
3321 Box::pin(manager.upsert_service("web".to_string(), spec))
3322 .await
3323 .unwrap();
3324
3325 // Verify service is registered
3326 let services = manager.list_services().await;
3327 assert!(services.contains(&"web".to_string()));
3328 }
3329
3330 #[test]
3331 fn test_container_state_as_str() {
3332 use crate::runtime::ContainerState;
3333 assert_eq!(ContainerState::Pending.as_str(), "pending");
3334 assert_eq!(ContainerState::Initializing.as_str(), "initializing");
3335 assert_eq!(ContainerState::Running.as_str(), "running");
3336 assert_eq!(ContainerState::Stopping.as_str(), "stopping");
3337 assert_eq!(ContainerState::Exited { code: 0 }.as_str(), "exited");
3338 assert_eq!(
3339 ContainerState::Failed {
3340 reason: "boom".to_string()
3341 }
3342 .as_str(),
3343 "failed"
3344 );
3345 // Display delegates to as_str.
3346 assert_eq!(ContainerState::Running.to_string(), "running");
3347 }
3348
3349 /// A container created from image X must report image X and its real
3350 /// lifecycle state through the new `container_infos` accessor, replacing
3351 /// the previously hardcoded `"running"` / empty-image behavior.
3352 #[tokio::test]
3353 async fn test_container_infos_surfaces_image_and_state() {
3354 use crate::runtime::{Container, ContainerState};
3355
3356 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3357 let manager = ServiceManager::new(runtime);
3358
3359 let spec = mock_spec(); // image name = "test:latest"
3360 let image = spec.image.name.to_string();
3361 Box::pin(manager.upsert_service("web".to_string(), spec))
3362 .await
3363 .unwrap();
3364
3365 // Inject containers directly with distinct states.
3366 {
3367 let services = manager.services.read().await;
3368 let instance = services.get("web").unwrap();
3369 let mut containers = instance.containers().write().await;
3370
3371 let running_id = ContainerId::new("web", 0);
3372 containers.insert(
3373 running_id.clone(),
3374 Container {
3375 id: running_id,
3376 image: image.clone(),
3377 state: ContainerState::Running,
3378 pid: Some(4242),
3379 task: None,
3380 overlay_ip: None,
3381 health_monitor: None,
3382 port_override: None,
3383 },
3384 );
3385
3386 let exited_id = ContainerId::new("web", 1);
3387 containers.insert(
3388 exited_id.clone(),
3389 Container {
3390 id: exited_id,
3391 image: image.clone(),
3392 state: ContainerState::Exited { code: 1 },
3393 pid: None,
3394 task: None,
3395 overlay_ip: None,
3396 health_monitor: None,
3397 port_override: None,
3398 },
3399 );
3400 }
3401
3402 let mut infos = manager.get_service_container_infos("web").await;
3403 infos.sort_by_key(|i| i.id.replica);
3404 assert_eq!(infos.len(), 2);
3405
3406 // Every container reports the real image it was created from.
3407 assert!(infos.iter().all(|i| i.image == image));
3408 assert!(infos.iter().all(|i| i.image == "test:latest"));
3409
3410 // Real per-container state is surfaced (not a hardcoded "running").
3411 assert_eq!(infos[0].state, "running");
3412 assert_eq!(infos[0].pid, Some(4242));
3413 assert_eq!(infos[1].state, "exited");
3414 assert_eq!(infos[1].pid, None);
3415
3416 // Unknown service yields an empty list.
3417 assert!(manager
3418 .get_service_container_infos("missing")
3419 .await
3420 .is_empty());
3421 }
3422
3423 /// Bug 2 (`cluster_upgrade`): a changed image *reference* (tag bump) under
3424 /// `if_not_present` must still recreate the local replicas. Previously the
3425 /// recreate only fired on digest drift under `Always`/`Newer`, so a tag
3426 /// change was silently ignored and containers stayed on the old image.
3427 #[tokio::test]
3428 async fn upsert_recreates_local_replicas_on_image_reference_change() {
3429 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3430 let manager = ServiceManager::new(runtime);
3431
3432 // Deploy v1 with the e2e's pull policy (if_not_present) and scale up.
3433 let mut spec = mock_spec();
3434 spec.image.name = "docker.io/library/nginx:1.28-alpine".parse().unwrap();
3435 spec.image.pull_policy = zlayer_spec::PullPolicy::IfNotPresent;
3436 Box::pin(manager.upsert_service("web".to_string(), spec.clone()))
3437 .await
3438 .unwrap();
3439 manager.scale_service_local("web", 2).await.unwrap();
3440
3441 let v1: Vec<String> = manager
3442 .get_service_container_infos("web")
3443 .await
3444 .into_iter()
3445 .map(|i| i.image)
3446 .collect();
3447 assert_eq!(v1.len(), 2);
3448 assert!(
3449 v1.iter().all(|img| img.contains("1.28")),
3450 "expected v1 images, got {v1:?}"
3451 );
3452
3453 // Upgrade to v2 under the SAME if_not_present policy.
3454 let mut spec_v2 = spec;
3455 spec_v2.image.name = "docker.io/library/nginx:1.29-alpine".parse().unwrap();
3456 Box::pin(manager.upsert_service("web".to_string(), spec_v2))
3457 .await
3458 .unwrap();
3459
3460 let v2: Vec<String> = manager
3461 .get_service_container_infos("web")
3462 .await
3463 .into_iter()
3464 .map(|i| i.image)
3465 .collect();
3466 assert_eq!(v2.len(), 2, "replica count preserved across upgrade");
3467 assert!(
3468 v2.iter().all(|img| img.contains("1.29")),
3469 "containers must be recreated on the new image, got {v2:?}"
3470 );
3471 }
3472
3473 fn mock_spec() -> ServiceSpec {
3474 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3475 r"
3476version: v1
3477deployment: test
3478services:
3479 test:
3480 rtype: service
3481 image:
3482 name: test:latest
3483 endpoints:
3484 - name: http
3485 protocol: http
3486 port: 8080
3487 scale:
3488 mode: fixed
3489 replicas: 1
3490",
3491 )
3492 .unwrap()
3493 .services
3494 .remove("test")
3495 .unwrap()
3496 }
3497
3498 #[test]
3499 fn test_set_container_dns_injects_when_empty() {
3500 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3501 let spec = mock_spec(); // spec.dns defaults to empty, host_network false
3502 let mut instance =
3503 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3504 instance.set_container_dns("10.42.0.1".parse().unwrap());
3505 assert_eq!(instance.spec.dns, vec!["10.42.0.1".to_string()]);
3506 }
3507
3508 #[test]
3509 fn test_set_container_dns_skips_host_network() {
3510 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3511 let mut spec = mock_spec();
3512 spec.host_network = true;
3513 let mut instance =
3514 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3515 instance.set_container_dns("10.42.0.1".parse().unwrap());
3516 assert!(
3517 instance.spec.dns.is_empty(),
3518 "host_network containers must inherit the host resolv.conf"
3519 );
3520 }
3521
3522 #[test]
3523 fn test_set_container_dns_preserves_user_dns() {
3524 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3525 let mut spec = mock_spec();
3526 spec.dns = vec!["1.1.1.1".to_string()];
3527 let mut instance =
3528 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3529 instance.set_container_dns("10.42.0.1".parse().unwrap());
3530 assert_eq!(
3531 instance.spec.dns,
3532 vec!["1.1.1.1".to_string()],
3533 "user-supplied spec.dns must win over the overlay resolver"
3534 );
3535 }
3536
3537 /// Helper to create a `ServiceSpec` with dependencies
3538 fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
3539 let mut spec = mock_spec();
3540 spec.depends = deps;
3541 spec
3542 }
3543
3544 /// Helper to create a `DependsSpec`
3545 fn dep(
3546 service: &str,
3547 condition: zlayer_spec::DependencyCondition,
3548 timeout_ms: u64,
3549 on_timeout: zlayer_spec::TimeoutAction,
3550 ) -> DependsSpec {
3551 DependsSpec {
3552 service: service.to_string(),
3553 condition,
3554 timeout: Some(Duration::from_millis(timeout_ms)),
3555 on_timeout,
3556 }
3557 }
3558
3559 #[tokio::test]
3560 async fn test_deploy_with_dependencies_no_deps() {
3561 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3562 let manager = ServiceManager::new(runtime);
3563
3564 // Services with no dependencies
3565 let mut services = HashMap::new();
3566 services.insert("a".to_string(), mock_spec());
3567 services.insert("b".to_string(), mock_spec());
3568
3569 // Should deploy both without issue
3570 Box::pin(manager.deploy_with_dependencies(services))
3571 .await
3572 .unwrap();
3573
3574 // Both services should be registered
3575 let service_list = manager.list_services().await;
3576 assert_eq!(service_list.len(), 2);
3577 }
3578
3579 #[tokio::test]
3580 async fn test_deploy_with_dependencies_linear() {
3581 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3582 let manager = ServiceManager::new(runtime);
3583
3584 // A -> B -> C (A depends on B, B depends on C)
3585 // All use "started" condition which is satisfied when container is running
3586 let mut services = HashMap::new();
3587 services.insert("c".to_string(), mock_spec());
3588 services.insert(
3589 "b".to_string(),
3590 mock_spec_with_deps(vec![dep(
3591 "c",
3592 zlayer_spec::DependencyCondition::Started,
3593 5000,
3594 zlayer_spec::TimeoutAction::Fail,
3595 )]),
3596 );
3597 services.insert(
3598 "a".to_string(),
3599 mock_spec_with_deps(vec![dep(
3600 "b",
3601 zlayer_spec::DependencyCondition::Started,
3602 5000,
3603 zlayer_spec::TimeoutAction::Fail,
3604 )]),
3605 );
3606
3607 // Should deploy in order: c, b, a
3608 Box::pin(manager.deploy_with_dependencies(services))
3609 .await
3610 .unwrap();
3611
3612 // All services should be registered
3613 let service_list = manager.list_services().await;
3614 assert_eq!(service_list.len(), 3);
3615 }
3616
3617 #[tokio::test]
3618 async fn test_deploy_with_dependencies_cycle_detection() {
3619 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3620 let manager = ServiceManager::new(runtime);
3621
3622 // A -> B -> A (cycle)
3623 let mut services = HashMap::new();
3624 services.insert(
3625 "a".to_string(),
3626 mock_spec_with_deps(vec![dep(
3627 "b",
3628 zlayer_spec::DependencyCondition::Started,
3629 5000,
3630 zlayer_spec::TimeoutAction::Fail,
3631 )]),
3632 );
3633 services.insert(
3634 "b".to_string(),
3635 mock_spec_with_deps(vec![dep(
3636 "a",
3637 zlayer_spec::DependencyCondition::Started,
3638 5000,
3639 zlayer_spec::TimeoutAction::Fail,
3640 )]),
3641 );
3642
3643 // Should fail with cycle detection
3644 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3645 assert!(result.is_err());
3646 let err = result.unwrap_err().to_string();
3647 assert!(err.contains("Cyclic dependency"));
3648 }
3649
3650 #[tokio::test]
3651 async fn test_deploy_with_dependencies_timeout_continue() {
3652 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3653 let manager = ServiceManager::new(runtime);
3654
3655 // A depends on B (healthy), but B never becomes healthy
3656 // Using continue action, so it should proceed anyway
3657 let mut services = HashMap::new();
3658 services.insert("b".to_string(), mock_spec());
3659 services.insert(
3660 "a".to_string(),
3661 mock_spec_with_deps(vec![dep(
3662 "b",
3663 zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
3664 100, // Short timeout
3665 zlayer_spec::TimeoutAction::Continue, // But continue anyway
3666 )]),
3667 );
3668
3669 // Should deploy both despite timeout
3670 Box::pin(manager.deploy_with_dependencies(services))
3671 .await
3672 .unwrap();
3673
3674 let service_list = manager.list_services().await;
3675 assert_eq!(service_list.len(), 2);
3676 }
3677
3678 #[tokio::test]
3679 async fn test_deploy_with_dependencies_timeout_warn() {
3680 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3681 let manager = ServiceManager::new(runtime);
3682
3683 // A depends on B (healthy), but B never becomes healthy
3684 // Using warn action, so it should proceed with a warning
3685 let mut services = HashMap::new();
3686 services.insert("b".to_string(), mock_spec());
3687 services.insert(
3688 "a".to_string(),
3689 mock_spec_with_deps(vec![dep(
3690 "b",
3691 zlayer_spec::DependencyCondition::Healthy,
3692 100,
3693 zlayer_spec::TimeoutAction::Warn,
3694 )]),
3695 );
3696
3697 // Should deploy both despite timeout (with warning)
3698 Box::pin(manager.deploy_with_dependencies(services))
3699 .await
3700 .unwrap();
3701
3702 let service_list = manager.list_services().await;
3703 assert_eq!(service_list.len(), 2);
3704 }
3705
3706 #[tokio::test]
3707 async fn test_deploy_with_dependencies_timeout_fail() {
3708 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3709 let manager = ServiceManager::new(runtime);
3710
3711 // A depends on B (healthy), but B never becomes healthy
3712 // Using fail action, so deployment should fail
3713 let mut services = HashMap::new();
3714 services.insert("b".to_string(), mock_spec());
3715 services.insert(
3716 "a".to_string(),
3717 mock_spec_with_deps(vec![dep(
3718 "b",
3719 zlayer_spec::DependencyCondition::Healthy,
3720 100,
3721 zlayer_spec::TimeoutAction::Fail,
3722 )]),
3723 );
3724
3725 // Should fail after B is started but doesn't become healthy
3726 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
3727 assert!(result.is_err());
3728
3729 // B should be started (it has no deps), but A should fail
3730 let err = result.unwrap_err().to_string();
3731 assert!(err.contains("Dependency timeout"));
3732 }
3733
3734 #[tokio::test]
3735 async fn test_check_dependencies_all_satisfied() {
3736 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3737 let manager = ServiceManager::new(runtime);
3738
3739 // Mark a service as healthy
3740 manager
3741 .update_health_state("db", HealthState::Healthy)
3742 .await;
3743
3744 let deps = vec![DependsSpec {
3745 service: "db".to_string(),
3746 condition: zlayer_spec::DependencyCondition::Healthy,
3747 timeout: Some(Duration::from_secs(60)),
3748 on_timeout: zlayer_spec::TimeoutAction::Fail,
3749 }];
3750
3751 let satisfied = manager.check_dependencies(&deps).await.unwrap();
3752 assert!(satisfied);
3753 }
3754
3755 #[tokio::test]
3756 async fn test_check_dependencies_not_satisfied() {
3757 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3758 let manager = ServiceManager::new(runtime);
3759
3760 // Service not healthy (no state set = Unknown)
3761 let deps = vec![DependsSpec {
3762 service: "db".to_string(),
3763 condition: zlayer_spec::DependencyCondition::Healthy,
3764 timeout: Some(Duration::from_secs(60)),
3765 on_timeout: zlayer_spec::TimeoutAction::Fail,
3766 }];
3767
3768 let satisfied = manager.check_dependencies(&deps).await.unwrap();
3769 assert!(!satisfied);
3770 }
3771
3772 #[tokio::test]
3773 async fn test_health_state_tracking() {
3774 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3775 let manager = ServiceManager::new(runtime);
3776
3777 // Update health states
3778 manager
3779 .update_health_state("db", HealthState::Healthy)
3780 .await;
3781 manager
3782 .update_health_state("cache", HealthState::Unknown)
3783 .await;
3784
3785 // Verify states
3786 let states = manager.health_states();
3787 let states_read = states.read().await;
3788
3789 assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
3790 assert!(matches!(
3791 states_read.get("cache"),
3792 Some(HealthState::Unknown)
3793 ));
3794 }
3795
3796 /// Regression test for the stabilization timeout that blocked the raft-e2e
3797 /// `cluster_scaling` / `cluster_upgrade` suites.
3798 ///
3799 /// Previously the callback that bridges a container's health result into the
3800 /// `ServiceManager` `health_states` map was only attached when BOTH a proxy
3801 /// manager AND a reachable overlay IP existed. In degraded-overlay / no-proxy
3802 /// deployments that `if let` was false, so `health_states` was never written,
3803 /// the service stayed `healthy=false` forever, and stabilization timed out
3804 /// even though the container was running and its health check passing.
3805 ///
3806 /// This test drives the real `scale_to` create path with:
3807 /// * NO `proxy_manager` (so `proxy_backend` resolves to None), and
3808 /// * a `Command { command: "true" }` health check (always passes host-side),
3809 /// then asserts the shared `health_states` map receives `Healthy` for the
3810 /// service — proving the bridge fires unconditionally.
3811 ///
3812 /// Gated to `#[cfg(unix)]` because `HealthCheck::Command` is executed via
3813 /// `sh -c <command>` in `crate::health::HealthChecker::check_command`. On
3814 /// Windows hosts without `sh` on PATH (the default Windows CI image), no
3815 /// Command-based health check can ever pass, so the test would fail for
3816 /// reasons unrelated to the bridge it is regression-testing. The bridge
3817 /// behavior under test is platform-agnostic; only the test fixture's
3818 /// "always-passes command" needs a Unix shell.
3819 #[cfg(unix)]
3820 #[tokio::test]
3821 async fn test_health_states_bridge_fires_without_proxy() {
3822 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3823
3824 // Service spec with a host-side command health check that always passes.
3825 // Zero start-grace + a short interval keep the test fast.
3826 let mut spec = mock_spec();
3827 spec.health = zlayer_spec::HealthSpec {
3828 start_grace: Some(Duration::from_millis(0)),
3829 interval: Some(Duration::from_millis(50)),
3830 timeout: Some(Duration::from_secs(5)),
3831 retries: 1,
3832 check: HealthCheck::Command {
3833 command: "true".to_string(),
3834 },
3835 };
3836
3837 // Build a ServiceInstance with NO proxy_manager and NO overlay_manager,
3838 // then wire in the shared health_states map exactly as ServiceManager does.
3839 let mut instance =
3840 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
3841 let health_states: Arc<RwLock<HashMap<String, HealthState>>> =
3842 Arc::new(RwLock::new(HashMap::new()));
3843 instance.set_health_states(Arc::clone(&health_states));
3844
3845 // Drive the real create path (no proxy, MockRuntime IP present but proxy
3846 // absent => proxy_backend is None, hitting the previously-broken branch).
3847 instance.scale_to(1).await.unwrap();
3848
3849 // Poll for the bridged Healthy state (the monitor checks asynchronously
3850 // after its start grace). Bounded so a regression fails fast.
3851 let mut bridged = false;
3852 for _ in 0..100 {
3853 if matches!(
3854 health_states.read().await.get("web"),
3855 Some(HealthState::Healthy)
3856 ) {
3857 bridged = true;
3858 break;
3859 }
3860 tokio::time::sleep(Duration::from_millis(50)).await;
3861 }
3862
3863 assert!(
3864 bridged,
3865 "health_states must receive Healthy for the service even without a \
3866 proxy or overlay IP; the bridge regressed and stabilization would time out"
3867 );
3868 }
3869
3870 // ==================== Job/Cron Integration Tests ====================
3871
3872 fn mock_job_spec() -> ServiceSpec {
3873 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3874 r"
3875version: v1
3876deployment: test
3877services:
3878 backup:
3879 rtype: job
3880 image:
3881 name: backup:latest
3882",
3883 )
3884 .unwrap()
3885 .services
3886 .remove("backup")
3887 .unwrap()
3888 }
3889
3890 fn mock_cron_spec() -> ServiceSpec {
3891 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
3892 r#"
3893version: v1
3894deployment: test
3895services:
3896 cleanup:
3897 rtype: cron
3898 schedule: "0 0 * * * * *"
3899 image:
3900 name: cleanup:latest
3901"#,
3902 )
3903 .unwrap()
3904 .services
3905 .remove("cleanup")
3906 .unwrap()
3907 }
3908
3909 #[tokio::test]
3910 async fn test_service_manager_with_job_executor() {
3911 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3912 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3913
3914 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
3915
3916 // Register job
3917 let job_spec = mock_job_spec();
3918 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
3919 .await
3920 .unwrap();
3921
3922 // Trigger job
3923 let exec_id = manager
3924 .trigger_job("backup", JobTrigger::Cli)
3925 .await
3926 .unwrap();
3927
3928 // Give job time to start
3929 tokio::time::sleep(Duration::from_millis(50)).await;
3930
3931 // Check execution exists
3932 let execution = manager.get_job_execution(&exec_id).await;
3933 assert!(execution.is_some());
3934 assert_eq!(execution.unwrap().job_name, "backup");
3935 }
3936
3937 #[tokio::test]
3938 async fn test_service_manager_with_cron_scheduler() {
3939 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3940 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3941 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3942
3943 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3944
3945 // Register cron job
3946 let cron_spec = mock_cron_spec();
3947 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3948 .await
3949 .unwrap();
3950
3951 // List cron jobs
3952 let cron_jobs = manager.list_cron_jobs().await;
3953 assert_eq!(cron_jobs.len(), 1);
3954 assert_eq!(cron_jobs[0].name, "cleanup");
3955 assert!(cron_jobs[0].enabled);
3956 }
3957
3958 #[tokio::test]
3959 async fn test_service_manager_trigger_cron() {
3960 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3961 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3962 let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
3963
3964 let manager = ServiceManager::new(runtime)
3965 .with_job_executor(job_executor)
3966 .with_cron_scheduler(cron_scheduler);
3967
3968 // Register cron job
3969 let cron_spec = mock_cron_spec();
3970 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3971 .await
3972 .unwrap();
3973
3974 // Manually trigger the cron job
3975 let exec_id = manager.trigger_cron("cleanup").await.unwrap();
3976 assert!(!exec_id.0.is_empty());
3977 }
3978
3979 #[tokio::test]
3980 async fn test_service_manager_enable_disable_cron() {
3981 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
3982 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
3983 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
3984
3985 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
3986
3987 // Register cron job
3988 let cron_spec = mock_cron_spec();
3989 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
3990 .await
3991 .unwrap();
3992
3993 // Initially enabled
3994 let cron_jobs = manager.list_cron_jobs().await;
3995 assert!(cron_jobs[0].enabled);
3996
3997 // Disable
3998 manager.set_cron_enabled("cleanup", false).await;
3999 let cron_jobs = manager.list_cron_jobs().await;
4000 assert!(!cron_jobs[0].enabled);
4001
4002 // Re-enable
4003 manager.set_cron_enabled("cleanup", true).await;
4004 let cron_jobs = manager.list_cron_jobs().await;
4005 assert!(cron_jobs[0].enabled);
4006 }
4007
4008 #[tokio::test]
4009 async fn test_service_manager_remove_cleans_up_job() {
4010 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4011 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
4012
4013 let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
4014
4015 // Register job
4016 let job_spec = mock_job_spec();
4017 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
4018 .await
4019 .unwrap();
4020
4021 // Verify job is registered
4022 let spec = job_executor.get_job_spec("backup").await;
4023 assert!(spec.is_some());
4024
4025 // Remove job
4026 manager.remove_service("backup").await.unwrap();
4027
4028 // Verify job is unregistered
4029 let spec = job_executor.get_job_spec("backup").await;
4030 assert!(spec.is_none());
4031 }
4032
4033 #[tokio::test]
4034 async fn test_service_manager_remove_cleans_up_cron() {
4035 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4036 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
4037 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
4038
4039 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
4040
4041 // Register cron job
4042 let cron_spec = mock_cron_spec();
4043 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
4044 .await
4045 .unwrap();
4046
4047 // Verify cron job is registered
4048 assert_eq!(cron_scheduler.job_count().await, 1);
4049
4050 // Remove cron job
4051 manager.remove_service("cleanup").await.unwrap();
4052
4053 // Verify cron job is unregistered
4054 assert_eq!(cron_scheduler.job_count().await, 0);
4055 }
4056
4057 #[tokio::test]
4058 async fn test_service_manager_job_without_executor() {
4059 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4060 let manager = ServiceManager::new(runtime);
4061
4062 // Try to trigger job without executor configured
4063 let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
4064 assert!(result.is_err());
4065 assert!(result.unwrap_err().to_string().contains("not configured"));
4066 }
4067
4068 #[tokio::test]
4069 async fn test_service_manager_cron_without_scheduler() {
4070 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4071 let manager = ServiceManager::new(runtime);
4072
4073 // Try to register cron job without scheduler configured
4074 let cron_spec = mock_cron_spec();
4075 let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
4076 assert!(result.is_err());
4077 assert!(result.unwrap_err().to_string().contains("not configured"));
4078 }
4079
4080 #[tokio::test]
4081 async fn test_service_manager_list_job_executions() {
4082 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4083 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
4084
4085 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
4086
4087 // Register job
4088 let job_spec = mock_job_spec();
4089 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
4090 .await
4091 .unwrap();
4092
4093 // Trigger job twice
4094 manager
4095 .trigger_job("backup", JobTrigger::Cli)
4096 .await
4097 .unwrap();
4098 manager
4099 .trigger_job("backup", JobTrigger::Scheduler)
4100 .await
4101 .unwrap();
4102
4103 // Give jobs time to start
4104 tokio::time::sleep(Duration::from_millis(50)).await;
4105
4106 // List executions
4107 let executions = manager.list_job_executions("backup").await;
4108 assert_eq!(executions.len(), 2);
4109 }
4110
4111 // ==================== Container Supervisor Integration Tests ====================
4112
4113 #[tokio::test]
4114 async fn test_service_manager_with_supervisor() {
4115 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4116 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
4117
4118 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
4119
4120 // Add service
4121 let spec = mock_spec();
4122 Box::pin(manager.upsert_service("api".to_string(), spec))
4123 .await
4124 .unwrap();
4125
4126 // Scale up - containers should be registered with supervisor
4127 manager.scale_service("api", 2).await.unwrap();
4128
4129 // Verify containers are supervised
4130 assert_eq!(supervisor.supervised_count().await, 2);
4131
4132 // Scale down - containers should be unregistered
4133 manager.scale_service("api", 1).await.unwrap();
4134 assert_eq!(supervisor.supervised_count().await, 1);
4135
4136 // Remove service - remaining containers should be unregistered
4137 manager.remove_service("api").await.unwrap();
4138 assert_eq!(supervisor.supervised_count().await, 0);
4139 }
4140
4141 #[tokio::test]
4142 async fn test_service_manager_supervisor_state() {
4143 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4144 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
4145
4146 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
4147
4148 // Add and scale service
4149 let spec = mock_spec();
4150 Box::pin(manager.upsert_service("web".to_string(), spec))
4151 .await
4152 .unwrap();
4153 manager.scale_service("web", 1).await.unwrap();
4154
4155 // Check supervised state
4156 let container_id = ContainerId::new("web".to_string(), 1);
4157 let state = manager.get_container_supervised_state(&container_id).await;
4158 assert_eq!(state, Some(SupervisedState::Running));
4159 }
4160
4161 #[tokio::test]
4162 async fn test_service_manager_start_supervisor() {
4163 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4164 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
4165
4166 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
4167
4168 // Start the supervisor
4169 let handle = manager.start_container_supervisor().unwrap();
4170
4171 // Give it time to start
4172 tokio::time::sleep(Duration::from_millis(50)).await;
4173 assert!(supervisor.is_running());
4174
4175 // Shutdown
4176 manager.shutdown_container_supervisor();
4177
4178 // Wait for it to stop
4179 tokio::time::timeout(Duration::from_secs(1), handle)
4180 .await
4181 .unwrap()
4182 .unwrap();
4183
4184 assert!(!supervisor.is_running());
4185 }
4186
4187 #[tokio::test]
4188 async fn test_service_manager_supervisor_not_configured() {
4189 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4190 let manager = ServiceManager::new(runtime);
4191
4192 // Try to start supervisor without configuring it
4193 let result = manager.start_container_supervisor();
4194 assert!(result.is_err());
4195 assert!(result.unwrap_err().to_string().contains("not configured"));
4196 }
4197
4198 // ==================== Stream Registry Integration Tests ====================
4199
4200 fn mock_tcp_spec() -> ServiceSpec {
4201 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4202 r"
4203version: v1
4204deployment: test
4205services:
4206 database:
4207 rtype: service
4208 image:
4209 name: postgres:latest
4210 endpoints:
4211 - name: postgresql
4212 protocol: tcp
4213 port: 5432
4214 scale:
4215 mode: fixed
4216 replicas: 1
4217",
4218 )
4219 .unwrap()
4220 .services
4221 .remove("database")
4222 .unwrap()
4223 }
4224
4225 fn mock_udp_spec() -> ServiceSpec {
4226 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4227 r"
4228version: v1
4229deployment: test
4230services:
4231 dns:
4232 rtype: service
4233 image:
4234 name: dns:latest
4235 endpoints:
4236 - name: dns
4237 protocol: udp
4238 port: 53
4239 scale:
4240 mode: fixed
4241 replicas: 1
4242",
4243 )
4244 .unwrap()
4245 .services
4246 .remove("dns")
4247 .unwrap()
4248 }
4249
4250 fn mock_mixed_spec() -> ServiceSpec {
4251 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4252 r"
4253version: v1
4254deployment: test
4255services:
4256 mixed:
4257 rtype: service
4258 image:
4259 name: mixed:latest
4260 endpoints:
4261 - name: http
4262 protocol: http
4263 port: 8080
4264 - name: grpc
4265 protocol: tcp
4266 port: 9000
4267 - name: metrics
4268 protocol: udp
4269 port: 8125
4270 scale:
4271 mode: fixed
4272 replicas: 1
4273",
4274 )
4275 .unwrap()
4276 .services
4277 .remove("mixed")
4278 .unwrap()
4279 }
4280
4281 #[tokio::test]
4282 async fn test_service_manager_with_stream_registry_tcp() {
4283 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4284 let stream_registry = Arc::new(StreamRegistry::new());
4285
4286 let mut manager = ServiceManager::new(runtime);
4287 manager.set_stream_registry(stream_registry.clone());
4288 manager.set_deployment_name("test".to_string());
4289
4290 // Add TCP-only service
4291 let spec = mock_tcp_spec();
4292 Box::pin(manager.upsert_service("database".to_string(), spec))
4293 .await
4294 .unwrap();
4295
4296 // Verify TCP route was registered
4297 assert_eq!(stream_registry.tcp_count(), 1);
4298 assert!(stream_registry.tcp_ports().contains(&5432));
4299
4300 // Remove service and verify cleanup
4301 manager.remove_service("database").await.unwrap();
4302 assert_eq!(stream_registry.tcp_count(), 0);
4303 }
4304
4305 #[tokio::test]
4306 async fn test_service_manager_with_stream_registry_udp() {
4307 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4308 let stream_registry = Arc::new(StreamRegistry::new());
4309
4310 let mut manager = ServiceManager::new(runtime);
4311 manager.set_stream_registry(stream_registry.clone());
4312 manager.set_deployment_name("test".to_string());
4313
4314 // Add UDP-only service
4315 let spec = mock_udp_spec();
4316 Box::pin(manager.upsert_service("dns".to_string(), spec))
4317 .await
4318 .unwrap();
4319
4320 // Verify UDP route was registered
4321 assert_eq!(stream_registry.udp_count(), 1);
4322 assert!(stream_registry.udp_ports().contains(&53));
4323
4324 // Remove service and verify cleanup
4325 manager.remove_service("dns").await.unwrap();
4326 assert_eq!(stream_registry.udp_count(), 0);
4327 }
4328
4329 #[tokio::test]
4330 async fn test_service_manager_with_stream_registry_mixed() {
4331 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4332 let stream_registry = Arc::new(StreamRegistry::new());
4333
4334 let mut manager = ServiceManager::new(runtime);
4335 manager.set_stream_registry(stream_registry.clone());
4336 manager.set_deployment_name("test".to_string());
4337
4338 // Add mixed service (HTTP + TCP + UDP)
4339 let spec = mock_mixed_spec();
4340 Box::pin(manager.upsert_service("mixed".to_string(), spec))
4341 .await
4342 .unwrap();
4343
4344 // Verify stream routes were registered
4345 assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
4346 assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
4347
4348 assert!(stream_registry.tcp_ports().contains(&9000));
4349 assert!(stream_registry.udp_ports().contains(&8125));
4350
4351 // Remove service and verify stream cleanup
4352 manager.remove_service("mixed").await.unwrap();
4353 assert_eq!(stream_registry.tcp_count(), 0);
4354 assert_eq!(stream_registry.udp_count(), 0);
4355 }
4356
4357 #[tokio::test]
4358 async fn test_service_manager_stream_registry_builder() {
4359 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4360 let stream_registry = Arc::new(StreamRegistry::new());
4361
4362 // Test builder pattern
4363 let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
4364
4365 // Verify stream registry is accessible
4366 assert!(manager.stream_registry().is_some());
4367 }
4368
4369 #[tokio::test]
4370 async fn test_tcp_service_without_stream_registry() {
4371 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4372
4373 // Manager without stream registry
4374 let mut manager = ServiceManager::new(runtime);
4375 manager.set_deployment_name("test".to_string());
4376
4377 // Add TCP service - should log warning but not fail
4378 let spec = mock_tcp_spec();
4379 Box::pin(manager.upsert_service("database".to_string(), spec))
4380 .await
4381 .unwrap();
4382
4383 // No stream registry to check, but service should be tracked
4384 let services = manager.list_services().await;
4385 assert!(services.contains(&"database".to_string()));
4386 }
4387
4388 /// Verify `collect_endpoint_backends` filters containers by
4389 /// `EndpointSpec.target_role`.
4390 ///
4391 /// Given two replica groups (`primary` × 1, `read` × 2) and two
4392 /// endpoints — one with `target_role: primary` and one with
4393 /// `target_role: read` — each endpoint should receive only the
4394 /// matching containers' overlay addresses. The legacy no-filter
4395 /// endpoint (`target_role: None`) should receive all of them.
4396 #[tokio::test]
4397 #[allow(clippy::too_many_lines)]
4398 async fn test_collect_endpoint_backends_respects_target_role() {
4399 use crate::runtime::Container;
4400 use std::collections::HashMap as StdHashMap;
4401 use std::net::{IpAddr, Ipv4Addr};
4402 use zlayer_spec::{
4403 EndpointSpec, ExposeType, GroupAffinity, Protocol, ReplicaGroup, ScaleSpec,
4404 };
4405
4406 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4407 let manager = ServiceManager::new(runtime.clone());
4408
4409 // Build a spec with replica_groups and three endpoints:
4410 // - "write" targets role "primary"
4411 // - "read" targets role "read"
4412 // - "any" has no target_role (legacy)
4413 let mut spec = mock_spec();
4414 spec.replica_groups = Some(vec![
4415 ReplicaGroup {
4416 role: "primary".to_string(),
4417 count: 1,
4418 image: None,
4419 env: StdHashMap::new(),
4420 command: None,
4421 resources: None,
4422 affinity: GroupAffinity::default(),
4423 },
4424 ReplicaGroup {
4425 role: "read".to_string(),
4426 count: 2,
4427 image: None,
4428 env: StdHashMap::new(),
4429 command: None,
4430 resources: None,
4431 affinity: GroupAffinity::default(),
4432 },
4433 ]);
4434 spec.scale = ScaleSpec::Fixed { replicas: 3 };
4435 spec.endpoints = vec![
4436 EndpointSpec {
4437 name: "write".to_string(),
4438 protocol: Protocol::Tcp,
4439 port: 5432,
4440 target_port: Some(5432),
4441 path: None,
4442 host: None,
4443 expose: ExposeType::Internal,
4444 stream: None,
4445 tunnel: None,
4446 target_role: Some("primary".to_string()),
4447 },
4448 EndpointSpec {
4449 name: "read".to_string(),
4450 protocol: Protocol::Tcp,
4451 port: 5433,
4452 target_port: Some(5432),
4453 path: None,
4454 host: None,
4455 expose: ExposeType::Internal,
4456 stream: None,
4457 tunnel: None,
4458 target_role: Some("read".to_string()),
4459 },
4460 EndpointSpec {
4461 name: "any".to_string(),
4462 protocol: Protocol::Tcp,
4463 port: 5434,
4464 target_port: Some(5432),
4465 path: None,
4466 host: None,
4467 expose: ExposeType::Internal,
4468 stream: None,
4469 tunnel: None,
4470 target_role: None,
4471 },
4472 ];
4473
4474 let instance = ServiceInstance::new(
4475 "postgres".to_string(),
4476 spec.clone(),
4477 runtime,
4478 None, // overlay_manager — not exercised by this test
4479 );
4480
4481 // Inject three containers directly: one primary, two read replicas.
4482 let cid_primary = ContainerId::with_role_and_node("postgres", 1, "primary", 0);
4483 let cid_first_read = ContainerId::with_role_and_node("postgres", 2, "read", 0);
4484 let cid_second_read = ContainerId::with_role_and_node("postgres", 3, "read", 0);
4485
4486 let ip_primary = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
4487 let ip_first_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2));
4488 let ip_second_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3));
4489
4490 {
4491 let mut containers = instance.containers().write().await;
4492 containers.insert(
4493 cid_primary.clone(),
4494 Container {
4495 id: cid_primary,
4496 image: spec.image.name.to_string(),
4497 state: crate::runtime::ContainerState::Running,
4498 pid: None,
4499 task: None,
4500 overlay_ip: Some(ip_primary),
4501 health_monitor: None,
4502 port_override: None,
4503 },
4504 );
4505 containers.insert(
4506 cid_first_read.clone(),
4507 Container {
4508 id: cid_first_read,
4509 image: spec.image.name.to_string(),
4510 state: crate::runtime::ContainerState::Running,
4511 pid: None,
4512 task: None,
4513 overlay_ip: Some(ip_first_read),
4514 health_monitor: None,
4515 port_override: None,
4516 },
4517 );
4518 containers.insert(
4519 cid_second_read.clone(),
4520 Container {
4521 id: cid_second_read,
4522 image: spec.image.name.to_string(),
4523 state: crate::runtime::ContainerState::Running,
4524 pid: None,
4525 task: None,
4526 overlay_ip: Some(ip_second_read),
4527 health_monitor: None,
4528 port_override: None,
4529 },
4530 );
4531 }
4532
4533 let write_ep = &spec.endpoints[0];
4534 let read_ep = &spec.endpoints[1];
4535 let any_ep = &spec.endpoints[2];
4536
4537 let write_backends = manager.collect_endpoint_backends(&instance, write_ep).await;
4538 let read_backends = manager.collect_endpoint_backends(&instance, read_ep).await;
4539 let any_backends = manager.collect_endpoint_backends(&instance, any_ep).await;
4540
4541 // write endpoint -> only the primary container
4542 assert_eq!(write_backends.len(), 1, "write should match only primary");
4543 assert!(
4544 write_backends.iter().any(|a| a.ip() == ip_primary),
4545 "write backends missing primary IP: {write_backends:?}"
4546 );
4547
4548 // read endpoint -> both read containers, no primary
4549 assert_eq!(
4550 read_backends.len(),
4551 2,
4552 "read should match both read replicas"
4553 );
4554 assert!(read_backends.iter().any(|a| a.ip() == ip_first_read));
4555 assert!(read_backends.iter().any(|a| a.ip() == ip_second_read));
4556 assert!(
4557 !read_backends.iter().any(|a| a.ip() == ip_primary),
4558 "read backends must not contain primary: {read_backends:?}"
4559 );
4560
4561 // legacy endpoint (target_role = None) -> every container
4562 assert_eq!(
4563 any_backends.len(),
4564 3,
4565 "any-role endpoint should see all containers"
4566 );
4567 }
4568
4569 /// Build a `ServiceInstance` whose spec carries a deployment, so the
4570 /// instance's deployment-scoping helpers (`dns_hostnames` /
4571 /// `dns_search_domain`) are exercised through the real construction path
4572 /// (the constructors capture `spec.deployment`).
4573 fn instance_in_deployment(service: &str, deployment: &str) -> ServiceInstance {
4574 let mut spec = ServiceSpec::minimal(service, "postgres:16-alpine");
4575 spec.deployment = Some(deployment.to_string());
4576 ServiceInstance::new(
4577 service.to_string(),
4578 spec,
4579 Arc::new(MockRuntime::new()),
4580 None,
4581 )
4582 }
4583
4584 /// Register every hostname an instance would register into a real
4585 /// [`DnsServer`]'s authority, then resolve a name through that authority and
4586 /// assert the answer. Uses the DNS handle's direct authority lookup (no UDP
4587 /// roundtrip — a blocking sync DNS client inside a tokio runtime would
4588 /// deadlock the current-thread executor), so the test exercises the actual
4589 /// record store deterministically.
4590 async fn resolve_through_authority(
4591 handle: &zlayer_overlay::DnsHandle,
4592 fqdn: &str,
4593 ) -> Option<IpAddr> {
4594 handle.lookup_a(fqdn).await
4595 }
4596
4597 /// Bug 6 (the CRITICAL leak): two DIFFERENT deployments each with a service
4598 /// named `postgres`, registered into ONE daemon-global DNS authority, must
4599 /// resolve to their OWN instance's IP — no last-writer-wins clobber across
4600 /// deployments. The deployment-scoped FQDNs are what a guest's
4601 /// `search <D>.zlayer.local` expands the bare `postgres` / `postgres.service`
4602 /// queries into, so distinct scoped keys are exactly what prevents the leak.
4603 #[tokio::test]
4604 async fn deployment_scoped_dns_no_cross_deployment_clobber() {
4605 use zlayer_overlay::DnsServer;
4606
4607 let bind = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
4608 let server = DnsServer::new(bind, "zlayer.local.").expect("build DNS server");
4609 let handle = server.handle();
4610
4611 let inst_a = instance_in_deployment("postgres", "deploy-a");
4612 let inst_b = instance_in_deployment("postgres", "deploy-b");
4613
4614 let cid = ContainerId::with_role_and_node("postgres", 1, "default", 0);
4615 let ip_a = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 0, 11));
4616 let ip_b = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 0, 22));
4617
4618 // Register every name each instance would register for this container,
4619 // in deployment order, into the SAME authority.
4620 for name in inst_a.dns_hostnames(&cid) {
4621 handle.add_record(&name, ip_a).await.expect("add a record");
4622 }
4623 for name in inst_b.dns_hostnames(&cid) {
4624 handle.add_record(&name, ip_b).await.expect("add b record");
4625 }
4626
4627 // The deployment-scoped FQDNs stay distinct and resolve to their OWN IP.
4628 assert_eq!(
4629 resolve_through_authority(&handle, "postgres.deploy-a.zlayer.local").await,
4630 Some(ip_a),
4631 "deploy-a's postgres must resolve to deploy-a's IP"
4632 );
4633 assert_eq!(
4634 resolve_through_authority(&handle, "postgres.deploy-b.zlayer.local").await,
4635 Some(ip_b),
4636 "deploy-b's postgres must resolve to deploy-b's IP (no clobber)"
4637 );
4638 // `<svc>.service` form, scoped per deployment.
4639 assert_eq!(
4640 resolve_through_authority(&handle, "postgres.service.deploy-a.zlayer.local").await,
4641 Some(ip_a),
4642 );
4643 assert_eq!(
4644 resolve_through_authority(&handle, "postgres.service.deploy-b.zlayer.local").await,
4645 Some(ip_b),
4646 );
4647 }
4648
4649 /// Bug 6 part (a): two services in the SAME deployment resolve each other by
4650 /// their deployment-scoped names.
4651 #[tokio::test]
4652 async fn deployment_scoped_dns_same_deployment_siblings_resolve() {
4653 use zlayer_overlay::DnsServer;
4654
4655 let bind = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
4656 let server = DnsServer::new(bind, "zlayer.local.").expect("build DNS server");
4657 let handle = server.handle();
4658
4659 let db = instance_in_deployment("db", "myapp");
4660 let cache = instance_in_deployment("cache", "myapp");
4661 let cid = ContainerId::with_role_and_node("x", 1, "default", 0);
4662 let ip_db = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 1, 1));
4663 let ip_cache = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 1, 2));
4664
4665 for name in db.dns_hostnames(&cid) {
4666 handle.add_record(&name, ip_db).await.expect("add db");
4667 }
4668 for name in cache.dns_hostnames(&cid) {
4669 handle.add_record(&name, ip_cache).await.expect("add cache");
4670 }
4671
4672 // Within deployment `myapp`, the search-expanded sibling FQDNs resolve
4673 // to the same deployment's instances.
4674 assert_eq!(
4675 resolve_through_authority(&handle, "db.myapp.zlayer.local").await,
4676 Some(ip_db)
4677 );
4678 assert_eq!(
4679 resolve_through_authority(&handle, "cache.myapp.zlayer.local").await,
4680 Some(ip_cache)
4681 );
4682 }
4683
4684 /// Bug 6 part (b): the per-deployment resolv.conf `search` value is emitted
4685 /// correctly (deployment scope first, bare zone last for cross-deployment
4686 /// FQDN + global names), and is absent for a non-deployment instance.
4687 #[test]
4688 fn dns_search_domain_is_deployment_scoped() {
4689 let scoped = instance_in_deployment("api", "deploy-a");
4690 assert_eq!(
4691 scoped.dns_search_domain("zlayer.local"),
4692 Some("deploy-a.zlayer.local zlayer.local".to_string()),
4693 );
4694 // Trailing dot on the zone is normalized away.
4695 assert_eq!(
4696 scoped.dns_search_domain("zlayer.local."),
4697 Some("deploy-a.zlayer.local zlayer.local".to_string()),
4698 );
4699
4700 // No deployment => no override (caller falls back to the global zone).
4701 let unscoped = ServiceInstance::new(
4702 "api".to_string(),
4703 ServiceSpec::minimal("api", "nginx:latest"),
4704 Arc::new(MockRuntime::new()),
4705 None,
4706 );
4707 assert_eq!(unscoped.dns_search_domain("zlayer.local"), None);
4708 }
4709
4710 /// The deployment-scoped hostnames include both the bare-name scope
4711 /// (`<svc>.<D>`) and the `<svc>.service.<D>` form, and the legacy unscoped
4712 /// names are still emitted for native / compose back-compat.
4713 #[test]
4714 fn dns_hostnames_emit_scoped_and_legacy_families() {
4715 let inst = instance_in_deployment("postgres", "myapp");
4716 let cid = ContainerId::with_role_and_node("postgres", 2, "default", 0);
4717 let names = inst.dns_hostnames(&cid);
4718
4719 // Deployment-scoped family.
4720 assert!(names.contains(&"postgres.myapp".to_string()));
4721 assert!(names.contains(&"postgres.service.myapp".to_string()));
4722 assert!(names.contains(&"postgres.myapp.service".to_string()));
4723 assert!(names.contains(&"2.postgres.service.myapp".to_string()));
4724
4725 // Legacy / unscoped family (back-compat).
4726 assert!(names.contains(&"postgres".to_string()));
4727 assert!(names.contains(&"postgres.service.local".to_string()));
4728 assert!(names.contains(&"2.postgres.service.local".to_string()));
4729 }
4730
4731 /// Minimal [`Cluster`] stub for the external-domain DNS tests: only
4732 /// `select_ingress_overlay_ip` carries behavior (returns the configured
4733 /// peer overlay IP); every other method is an unreachable stub since these
4734 /// tests never exercise scaling/placement.
4735 struct IngressPeerCluster {
4736 ingress_overlay_ip: Option<String>,
4737 }
4738
4739 #[async_trait::async_trait]
4740 impl zlayer_scheduler::cluster::Cluster for IngressPeerCluster {
4741 fn node_id(&self) -> u64 {
4742 2
4743 }
4744 async fn select_ingress_overlay_ip(&self) -> Option<String> {
4745 self.ingress_overlay_ip.clone()
4746 }
4747 async fn is_leader(&self) -> bool {
4748 false
4749 }
4750 async fn leader_addr(&self) -> Option<std::net::SocketAddr> {
4751 None
4752 }
4753 async fn list_nodes(&self) -> Vec<zlayer_scheduler::cluster::NodeRecord> {
4754 Vec::new()
4755 }
4756 async fn dispatch_scale(
4757 &self,
4758 _target: u64,
4759 _req: zlayer_scheduler::cluster::InternalScaleRequest,
4760 ) -> Result<(), zlayer_scheduler::cluster::ClusterError> {
4761 unreachable!("not exercised by external-domain DNS tests")
4762 }
4763 async fn forward_scale(
4764 &self,
4765 _req: zlayer_scheduler::cluster::InternalScaleRequest,
4766 ) -> Result<(), zlayer_scheduler::cluster::ClusterError> {
4767 unreachable!("not exercised by external-domain DNS tests")
4768 }
4769 async fn place_container(
4770 &self,
4771 _spec: &zlayer_spec::ServiceSpec,
4772 ) -> Result<
4773 Option<zlayer_scheduler::cluster::ContainerPlacement>,
4774 zlayer_scheduler::cluster::ClusterError,
4775 > {
4776 unreachable!("not exercised by external-domain DNS tests")
4777 }
4778 }
4779
4780 /// A `ServiceSpec` whose single endpoint carries an external vhost domain.
4781 fn mock_spec_with_host(host: &str) -> ServiceSpec {
4782 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(&format!(
4783 r"
4784version: v1
4785deployment: test
4786services:
4787 web:
4788 rtype: service
4789 image:
4790 name: test:latest
4791 endpoints:
4792 - name: http
4793 protocol: http
4794 port: 8080
4795 host: {host}
4796 scale:
4797 mode: fixed
4798 replicas: 1
4799"
4800 ))
4801 .unwrap()
4802 .services
4803 .remove("web")
4804 .unwrap()
4805 }
4806
4807 #[tokio::test]
4808 async fn external_domain_registers_host_to_ingress_peer_ip() {
4809 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4810 let dns = Arc::new(
4811 zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
4812 );
4813 let spec = mock_spec_with_host("app.example.com");
4814 let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
4815 instance.set_dns_server(Arc::clone(&dns));
4816 // THIS node is not the ingress; the cluster names a peer's overlay IP.
4817 instance.set_ingress_enabled(false);
4818 instance.set_cluster(Arc::new(IngressPeerCluster {
4819 ingress_overlay_ip: Some("10.200.0.7".to_string()),
4820 }));
4821
4822 instance.register_external_domains(&dns).await;
4823
4824 // The external domain resolves to the ingress peer's overlay IP.
4825 let handle = dns.handle();
4826 let ip = handle.lookup_a("app.example.com.").await;
4827 assert_eq!(ip, Some("10.200.0.7".parse().unwrap()));
4828 }
4829
4830 #[tokio::test]
4831 async fn external_domain_skips_when_no_ingress_node() {
4832 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4833 let dns = Arc::new(
4834 zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
4835 );
4836 let spec = mock_spec_with_host("app.example.com");
4837 let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
4838 instance.set_dns_server(Arc::clone(&dns));
4839 // No ingress locally and the cluster has no ingress node → WARN + skip,
4840 // never error, and no record is registered.
4841 instance.set_ingress_enabled(false);
4842 instance.set_cluster(Arc::new(IngressPeerCluster {
4843 ingress_overlay_ip: None,
4844 }));
4845
4846 instance.register_external_domains(&dns).await;
4847
4848 let handle = dns.handle();
4849 assert_eq!(handle.lookup_a("app.example.com.").await, None);
4850 }
4851
4852 #[tokio::test]
4853 async fn external_domain_skips_wildcard_host_patterns() {
4854 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4855 let dns = Arc::new(
4856 zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
4857 );
4858 // A wildcard host is a routing matcher, not a resolvable name.
4859 let spec = mock_spec_with_host("'*.example.com'");
4860 let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
4861 instance.set_dns_server(Arc::clone(&dns));
4862 instance.set_ingress_enabled(false);
4863 instance.set_cluster(Arc::new(IngressPeerCluster {
4864 ingress_overlay_ip: Some("10.200.0.7".to_string()),
4865 }));
4866
4867 instance.register_external_domains(&dns).await;
4868
4869 let handle = dns.handle();
4870 // Wildcard skipped → nothing registered under the literal pattern.
4871 assert_eq!(handle.lookup_a("*.example.com.").await, None);
4872 }
4873}