zlayer_agent/service.rs
1//! Service-level container lifecycle management
2
3use crate::container_supervisor::{ContainerSupervisor, SupervisedState, SupervisorEvent};
4use crate::cron_scheduler::CronScheduler;
5use crate::dependency::{
6 DependencyConditionChecker, DependencyGraph, DependencyWaiter, WaitResult,
7};
8use crate::error::{AgentError, Result};
9use crate::health::{HealthCallback, HealthChecker, HealthMonitor, HealthState};
10use crate::init::InitOrchestrator;
11use crate::job::{JobExecution, JobExecutionId, JobExecutor, JobTrigger};
12use crate::overlay_manager::OverlayManager;
13use crate::proxy_manager::ProxyManager;
14use crate::runtime::{Container, ContainerId, ContainerState, Runtime};
15use std::collections::HashMap;
16use std::net::{IpAddr, SocketAddr};
17use std::sync::Arc;
18use std::time::Duration;
19use tokio::sync::{RwLock, Semaphore};
20use zlayer_observability::logs::LogEntry;
21use zlayer_overlay::DnsServer;
22use zlayer_proxy::{StreamRegistry, StreamService};
23use zlayer_spec::{DependsSpec, HealthCheck, Protocol, PullPolicy, ResourceType, ServiceSpec};
24
25/// Shared handle to the daemon's registry-credential store, used by the
26/// supervisor to resolve `zlayer login` credentials (and the
27/// `~/.docker/config.json` fallback) for a service's image when recreating a
28/// container outside the API create handler (restore / drift / scale). Matches
29/// the concrete type wired into the API state in `serve.rs`.
30pub type RegistryCredentialStoreHandle =
31 Arc<zlayer_secrets::RegistryCredentialStore<Arc<zlayer_secrets::PersistentSecretsStore>>>;
32
33/// Service instance manages a single service's containers
34pub struct ServiceInstance {
35 pub service_name: String,
36 pub spec: ServiceSpec,
37 runtime: Arc<dyn Runtime + Send + Sync>,
38 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
39 /// Overlay network manager for container networking (optional, not needed for Docker runtime)
40 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
41 /// Proxy manager for updating backend health (optional)
42 proxy_manager: Option<Arc<ProxyManager>>,
43 /// DNS server for service discovery (optional)
44 dns_server: Option<Arc<DnsServer>>,
45 /// Container-injectable overlay resolver IP (optional). When set, this
46 /// node's overlay DNS server is reachable on `<ip>:53` and we inject it
47 /// into the container's resolv.conf so workloads resolve through the
48 /// overlay instead of inheriting the host's resolv.conf.
49 container_dns: Option<IpAddr>,
50 /// Shared health states map so callbacks can update ServiceManager-level health
51 health_states: Option<Arc<RwLock<HashMap<String, HealthState>>>>,
52 /// Most recently observed image digest after a successful pull. Used by
53 /// `upsert_service` to detect drift on `:latest`/`Newer` redeploys without
54 /// requiring callers to track digest state externally. Wrapped in a
55 /// `RwLock` so `&self` methods (`scale_to`) can update it.
56 last_pulled_digest: tokio::sync::RwLock<Option<String>>,
57 /// Local cluster node id used when constructing new `ContainerId`s during
58 /// scale-up. `0` in single-node deployments or when the cluster handle is
59 /// not yet wired. Populated by `ServiceManager` from `Cluster::node_id()`
60 /// at instance construction time.
61 node_id: u64,
62 /// Owning deployment name (the `zlayer up` / deploy request's deployment),
63 /// when known. Threaded from `ServiceSpec.deployment` by `upsert_service`.
64 ///
65 /// Used to scope service-discovery DNS per-deployment: records are
66 /// registered as `{service}.{deployment}` / `{service}.service.{deployment}`
67 /// (within the daemon zone) and the container's resolv.conf gets a
68 /// per-deployment `search` domain so a bare `<svc>` / `<svc>.service` query
69 /// resolves to THIS deployment's instance and never clobbers another
70 /// deployment that happens to share a service name. `None` for standalone /
71 /// single-deployment callers (falls back to the daemon's global zone).
72 deployment: Option<String>,
73 /// Whether THIS node holds the standing HTTP/HTTPS ingress on
74 /// `0.0.0.0:80` / `0.0.0.0:443` (mirrors `NodeConfig.ingress`). When `true`
75 /// and the node has an overlay IP, external service domains
76 /// (`EndpointSpec.host`) are resolved to this node's own overlay IP;
77 /// otherwise the selected ingress peer's overlay IP is used. Defaults to
78 /// `false`. Threaded from `ServiceManager`.
79 ingress_enabled: bool,
80 /// Cluster handle used to select an ingress-capable peer's overlay IP when
81 /// THIS node is not itself the ingress. `None` in standalone / single-node
82 /// mode (the funnel is then THIS node when it is ingress-capable).
83 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
84 /// Sink used to revoke a container's scoped access token on scale-down /
85 /// removal. Threaded from `ServiceManager`. `None` = no persistence backend
86 /// wired (the container token was minted without a revocable `jti`).
87 token_sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>,
88 /// Sink that persists the resolved image digest into the deployment store
89 /// after a successful pull (see [`crate::auth::DeploymentDigestSink`]).
90 /// `None` disables digest persistence. Threaded from `ServiceManager`.
91 digest_sink: Option<Arc<dyn crate::auth::DeploymentDigestSink>>,
92 /// Restore pin: the image digest this service resolved to before the daemon
93 /// last went down (loaded from `StoredDeployment.resolved_image_digests` on
94 /// restore). When set, [`Self::pull_and_refresh_digest`] tries a strictly
95 /// LOCAL resolution (`SourcePolicy::LocalOnly`) FIRST — recreating the
96 /// container from the already-local image (local registry + local blob
97 /// layers) with zero remote/S3 traffic — and only falls back to the spec's
98 /// (remote-capable) source policy when the image is NOT present locally.
99 /// `None` for fresh deploys (the normal pull path runs unchanged).
100 restore_pin: tokio::sync::RwLock<Option<String>>,
101 /// Registry credential store used to resolve `zlayer login` credentials +
102 /// the `~/.docker/config.json` fallback for this service's image when the
103 /// supervisor recreates a container (restore / drift / scale). Threaded from
104 /// `ServiceManager`. `None` = no store wired → the pull runs anonymous /
105 /// docker-config as before. Without this,
106 /// [`Self::pull_and_refresh_digest`] pulled with `None` auth and a private
107 /// image failed to re-pull on supervised recreate — even though the initial
108 /// deploy (through the API create handler) had resolved auth correctly.
109 registry_store: Option<RegistryCredentialStoreHandle>,
110}
111
112impl ServiceInstance {
113 /// Create a new service instance
114 pub fn new(
115 service_name: String,
116 spec: ServiceSpec,
117 runtime: Arc<dyn Runtime + Send + Sync>,
118 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
119 ) -> Self {
120 let deployment = spec.deployment.clone();
121 Self {
122 service_name,
123 spec,
124 runtime,
125 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
126 overlay_manager,
127 proxy_manager: None,
128 dns_server: None,
129 container_dns: None,
130 health_states: None,
131 last_pulled_digest: tokio::sync::RwLock::new(None),
132 node_id: 0,
133 deployment,
134 ingress_enabled: false,
135 cluster: None,
136 token_sink: None,
137 digest_sink: None,
138 restore_pin: tokio::sync::RwLock::new(None),
139 registry_store: None,
140 }
141 }
142
143 /// Create a new service instance with proxy manager for health-aware load balancing
144 pub fn with_proxy(
145 service_name: String,
146 spec: ServiceSpec,
147 runtime: Arc<dyn Runtime + Send + Sync>,
148 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
149 proxy_manager: Arc<ProxyManager>,
150 ) -> Self {
151 let deployment = spec.deployment.clone();
152 Self {
153 service_name,
154 spec,
155 runtime,
156 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
157 overlay_manager,
158 proxy_manager: Some(proxy_manager),
159 dns_server: None,
160 container_dns: None,
161 health_states: None,
162 last_pulled_digest: tokio::sync::RwLock::new(None),
163 node_id: 0,
164 deployment,
165 ingress_enabled: false,
166 cluster: None,
167 token_sink: None,
168 digest_sink: None,
169 restore_pin: tokio::sync::RwLock::new(None),
170 registry_store: None,
171 }
172 }
173
174 /// Set the local cluster node id. Used by `ServiceManager` to thread
175 /// `Cluster::node_id()` down to container construction so new
176 /// `ContainerId`s carry the owning node identity. Defaults to `0` (the
177 /// single-node sentinel) when unset.
178 pub fn set_node_id(&mut self, node_id: u64) {
179 self.node_id = node_id;
180 }
181
182 /// Set the owning deployment name for service-discovery DNS scoping.
183 ///
184 /// Idempotent with construction: the constructors already capture
185 /// `spec.deployment`, but `ServiceManager` calls this so a deployment
186 /// stamped after the fact (or via a different code path) is honored.
187 pub fn set_deployment(&mut self, deployment: Option<String>) {
188 self.deployment = deployment;
189 }
190
191 /// Set whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
192 /// `NodeConfig.ingress`). Threaded by `ServiceManager` so external service
193 /// domains can be resolved to this node's overlay IP when it is the funnel.
194 pub fn set_ingress_enabled(&mut self, enabled: bool) {
195 self.ingress_enabled = enabled;
196 }
197
198 /// Set the cluster handle used to select an ingress-capable peer's overlay
199 /// IP when THIS node is not itself the ingress.
200 pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
201 self.cluster = Some(cluster);
202 }
203
204 /// Set the token sink used to revoke container scoped tokens on
205 /// scale-down / removal. Threaded from `ServiceManager`.
206 pub fn set_token_sink(&mut self, sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>) {
207 self.token_sink = sink;
208 }
209
210 /// Set the sink that persists the resolved image digest into the deployment
211 /// store after a successful pull. Threaded from `ServiceManager`.
212 pub fn set_digest_sink(&mut self, sink: Option<Arc<dyn crate::auth::DeploymentDigestSink>>) {
213 self.digest_sink = sink;
214 }
215
216 /// Set the registry credential store used to resolve stored (`zlayer login`)
217 /// credentials + the `~/.docker/config.json` fallback for this service's
218 /// image on supervised recreate. Threaded from `ServiceManager`.
219 pub fn set_registry_store(&mut self, store: Option<RegistryCredentialStoreHandle>) {
220 self.registry_store = store;
221 }
222
223 /// Pin this service to a previously-resolved image `digest` for restore.
224 ///
225 /// On daemon restart the bin loads `StoredDeployment.resolved_image_digests`
226 /// and calls this for each service before scaling. With a pin set,
227 /// [`Self::pull_and_refresh_digest`] resolves the image STRICTLY locally
228 /// first (no remote/S3), so a service whose layers are already on disk comes
229 /// back even when the S3/origin tiers are down — breaking the boot-time
230 /// circular dependency where coming up requires re-pulling an already-local
231 /// image. `&self` so the restore path can pin without an exclusive borrow.
232 pub async fn set_restore_pin(&self, digest: Option<String>) {
233 *self.restore_pin.write().await = digest;
234 }
235
236 /// Resolve the overlay IP of the node that should serve external domains for
237 /// this service (the ingress funnel), as an `IpAddr`:
238 ///
239 /// 1. If THIS node is ingress-capable and has an overlay IP, use it.
240 /// 2. Otherwise ask the cluster for the deterministically-selected ingress
241 /// peer's overlay IP.
242 /// 3. Otherwise (standalone, ingress-disabled local node, no peer) return
243 /// `None` — the caller WARNs and skips registering the domain.
244 async fn resolve_ingress_ip(&self) -> Option<IpAddr> {
245 if self.ingress_enabled {
246 if let Some(om) = &self.overlay_manager {
247 if let Some(ip) = om.read().await.node_ip() {
248 return Some(ip);
249 }
250 }
251 }
252 // Not the local funnel (or no local overlay IP): defer to the cluster's
253 // chosen ingress peer.
254 if let Some(cluster) = &self.cluster {
255 if let Some(ip_str) = cluster.select_ingress_overlay_ip().await {
256 match ip_str.parse::<IpAddr>() {
257 Ok(ip) => return Some(ip),
258 Err(e) => tracing::warn!(
259 ingress_overlay_ip = %ip_str,
260 error = %e,
261 "selected ingress peer overlay IP is not a valid IP address; \
262 skipping external-domain DNS registration"
263 ),
264 }
265 }
266 }
267 None
268 }
269
270 /// Register an A/AAAA record for every endpoint's external vhost domain
271 /// (`EndpointSpec.host`) pointing at the ingress funnel's overlay IP, so a
272 /// client resolving `<host>` reaches an ingress-capable node whose 80/443
273 /// proxy fans out to this service's backends across the mesh.
274 ///
275 /// Wildcard host patterns (`*.example.com`) are routing matchers, not
276 /// resolvable names, so they are skipped. When no ingress node exists
277 /// anywhere in the mesh yet (no 80/443 entrypoint reachable), each host is
278 /// logged at WARN and skipped — never an error; it re-registers on the
279 /// next container attach once an entrypoint appears.
280 async fn register_external_domains(&self, dns: &Arc<DnsServer>) {
281 // Collect resolvable external domains for this service's endpoints.
282 let hosts: Vec<String> = self
283 .spec
284 .endpoints
285 .iter()
286 .filter_map(|ep| ep.host.as_ref())
287 .map(|h| h.trim().to_string())
288 .filter(|h| !h.is_empty() && !h.contains('*'))
289 .collect();
290 if hosts.is_empty() {
291 return;
292 }
293
294 let Some(ingress_ip) = self.resolve_ingress_ip().await else {
295 for host in &hosts {
296 tracing::warn!(
297 service = %self.service_name,
298 host = %host,
299 "no 80/443 entrypoint reachable yet — open one to serve {host}; \
300 skipping external-domain DNS registration (will retry on next attach)"
301 );
302 }
303 return;
304 };
305
306 for host in &hosts {
307 // `host` is already a fully-qualified external domain; pass it with
308 // a trailing dot so `DnsServer::add_record` treats it as an FQDN and
309 // does NOT append the daemon's internal zone origin.
310 let fqdn = if host.ends_with('.') {
311 host.clone()
312 } else {
313 format!("{host}.")
314 };
315 match dns.add_record(&fqdn, ingress_ip).await {
316 Ok(()) => tracing::info!(
317 service = %self.service_name,
318 host = %host,
319 ingress_ip = %ingress_ip,
320 "registered external-domain DNS record (host -> ingress overlay IP)"
321 ),
322 Err(e) => tracing::warn!(
323 service = %self.service_name,
324 host = %host,
325 error = %e,
326 "failed to register external-domain DNS record"
327 ),
328 }
329 }
330 }
331
332 /// The per-deployment resolv.conf `search` domain list for containers in
333 /// this service's deployment, given the daemon's global DNS `zone` (e.g.
334 /// `zlayer.local`).
335 ///
336 /// Returns a space-separated `search` value placing the deployment scope
337 /// FIRST so a guest's bare `<svc>` / `<svc>.service` query expands to THIS
338 /// deployment's record before anything else, with the bare zone last so
339 /// cross-deployment by-FQDN names (`<svc>.<otherdeployment>`) still resolve:
340 ///
341 /// ```text
342 /// search <deployment>.<zone> <zone>
343 /// ```
344 ///
345 /// When this instance has no deployment (standalone / single-deployment),
346 /// returns `None` so callers fall back to the daemon's global zone domain.
347 #[must_use]
348 pub fn dns_search_domain(&self, zone: &str) -> Option<String> {
349 let zone = zone.trim_end_matches('.');
350 self.deployment.as_deref().map(|d| {
351 // `<deployment>.<zone>` first (deployment scope wins), `<zone>` last
352 // (cross-deployment FQDN + global names still resolve).
353 format!("{d}.{zone} {zone}")
354 })
355 }
356
357 /// The set of service-discovery hostnames to register for one container,
358 /// relative to the daemon's DNS zone (each gets `<zone>` appended by
359 /// [`DnsServer::add_record`]).
360 ///
361 /// Two families are emitted:
362 ///
363 /// 1. **Deployment-scoped** (only when this instance carries a
364 /// `deployment`): `<svc>.<D>`, `<svc>.service.<D>`,
365 /// `<replica>.<svc>.service.<D>`, the documented example FQDN form
366 /// `<svc>.<D>.service`, and (for non-default replica groups)
367 /// `<role>.<svc>.service.<D>`. Paired with the per-deployment
368 /// `search <D>.<zone> <zone>` resolv.conf domain (see
369 /// [`Self::dns_search_domain`]), a guest's bare `<svc>` expands to
370 /// `<svc>.<D>.<zone>` and `<svc>.service` to `<svc>.service.<D>.<zone>`,
371 /// so both resolve to THIS deployment's instance and NEVER clobber a
372 /// different deployment that happens to share a service name.
373 ///
374 /// 2. **Unscoped / legacy** (always): the bare `<svc>` name plus
375 /// `<svc>.service.local`, `<replica>.<svc>.service.local`, and the role
376 /// form. These preserve the historical compose-style discovery used by
377 /// native containers (no per-deployment search domain) and existing
378 /// deployments. NOTE: the bare `<svc>` is the last-writer-wins,
379 /// cross-deployment-ambiguous key — it is kept for back-compat but the
380 /// deployment-scoped names above are what make discovery correct.
381 #[must_use]
382 fn dns_hostnames(&self, id: &ContainerId) -> Vec<String> {
383 let svc = &self.service_name;
384 let mut names: Vec<String> = Vec::new();
385
386 // --- Deployment-scoped family (correct, no cross-deployment leak) ---
387 if let Some(d) = self.deployment.as_deref() {
388 // bare `<svc>` -> `<svc>.<D>` (resolves via `search <D>.<zone>`)
389 names.push(format!("{svc}.{d}"));
390 // `<svc>.service` -> `<svc>.service.<D>`
391 names.push(format!("{svc}.service.{d}"));
392 // documented example FQDN form `<svc>.<D>.service`
393 names.push(format!("{svc}.{d}.service"));
394 // replica-specific `<replica>.<svc>.service` -> `.<D>`
395 names.push(format!("{}.{svc}.service.{d}", id.replica));
396 // per-role group form for non-default replica groups
397 if id.role != "default" {
398 names.push(format!("{}.{svc}.service.{d}", id.role));
399 }
400 }
401
402 // --- Unscoped / legacy family (compose back-compat) ---
403 // Bare compose service name (e.g. `postgres`); multiple replicas upsert
404 // the same key and the in-memory authority keeps the most recent A.
405 names.push(svc.clone());
406 names.push(format!("{svc}.service.local"));
407 names.push(format!("{}.{svc}.service.local", id.replica));
408 if id.role != "default" {
409 names.push(format!("{}.{svc}.service.local", id.role));
410 }
411
412 names
413 }
414
415 /// Derive the replica group role for a 1-based `replica_idx`.
416 ///
417 /// When `spec.replica_groups` is unset, returns `"default"` (the implicit
418 /// single-group case). Otherwise walks groups in declaration order,
419 /// accumulating each group's `count` until `replica_idx` falls within the
420 /// current group's range, and returns that group's `role`.
421 ///
422 /// Replicas beyond the declared total fall back to `"default"`.
423 #[must_use]
424 pub fn role_for_replica(&self, replica_idx: u32) -> String {
425 let Some(groups) = self.spec.replica_groups.as_ref() else {
426 return "default".to_string();
427 };
428 let mut cumulative = 0u32;
429 for group in groups {
430 cumulative = cumulative.saturating_add(group.count);
431 if replica_idx <= cumulative {
432 return group.role.clone();
433 }
434 }
435 "default".to_string()
436 }
437
438 /// Builder method to add DNS server for service discovery
439 #[must_use]
440 pub fn with_dns(mut self, dns_server: Arc<DnsServer>) -> Self {
441 self.dns_server = Some(dns_server);
442 self
443 }
444
445 /// Set the DNS server for service discovery
446 pub fn set_dns_server(&mut self, dns_server: Arc<DnsServer>) {
447 // Stamp the per-deployment DNS `search` domain onto the spec so a Linux
448 // container's resolv.conf expands a bare `<svc>` / `<svc>.service`
449 // query to the registered deployment-scoped FQDN. The overlayd Linux
450 // attach path never applies the search domain itself, so without this
451 // `<svc>.service` resolution times out. Skip when the user supplied
452 // their own search list or this instance carries no deployment.
453 if self.spec.dns_search.is_empty() {
454 let zone = dns_server.zone_origin().to_string();
455 if let Some(search) = self.dns_search_domain(&zone) {
456 self.spec.dns_search = search.split_whitespace().map(str::to_string).collect();
457 }
458 }
459 self.dns_server = Some(dns_server);
460 }
461
462 /// Set the container-injectable overlay resolver IP and apply it to the
463 /// instance's spec.
464 ///
465 /// When `container_dns` is set and the spec is eligible (not host-network,
466 /// no user-supplied `dns`), this pre-populates `spec.dns` with the overlay
467 /// resolver so containers resolve through `<ip>:53` instead of inheriting
468 /// the host's `/etc/resolv.conf`.
469 ///
470 /// Why this exists: on overlay-enabled hosts the netbird `~.`
471 /// systemd-resolved hijack swallows the host resolver, so a container that
472 /// inherits the host resolv.conf cannot resolve anything. The overlay DNS
473 /// server forwards non-overlay queries upstream, so pointing the container
474 /// at it fixes resolution AND gives it service-name discovery.
475 ///
476 /// Port-53 constraint: `resolv.conf` `nameserver` lines (and Docker's
477 /// `--dns`) carry no port — they are always port 53. The injected IP is
478 /// therefore only useful because the daemon binds the overlay resolver on
479 /// `<ip>:53` (see `daemon.rs` Phase 4); the injected value is the bare IP,
480 /// not a `host:port`.
481 ///
482 /// User-supplied `spec.dns` is left untouched: an explicit resolver from
483 /// the deployment spec always wins.
484 pub fn set_container_dns(&mut self, container_dns: IpAddr) {
485 self.container_dns = Some(container_dns);
486 if !self.spec.host_network && self.spec.dns.is_empty() {
487 self.spec.dns = vec![container_dns.to_string()];
488 }
489 }
490
491 /// Whether this service's containers should be attached to the overlay
492 /// network at start.
493 ///
494 /// Host-network containers (`spec.host_network`) share the HOST's network
495 /// namespace — there is no private netns to plumb a veth into, and
496 /// attaching by the host-netns PID would wire a stray `zl-*` link into the
497 /// host stack (which systemd-networkd then tries to manage, hitting its
498 /// watchdog and SIGABRT'ing). Such containers must never be attached. This
499 /// mirrors the `host_network` guard in [`set_container_dns`].
500 fn should_attach_overlay(&self) -> bool {
501 !self.spec.host_network
502 }
503
504 /// Effective isolation-network name for this service's overlay attach.
505 /// Derived from the data-plane [`OverlayMode`] (Isolated => fenced to a
506 /// network named after the service) plus any explicit
507 /// `com.zlayer.isolation_network` label (which always wins). `None` => flat
508 /// cluster mesh.
509 fn isolation_network(&self) -> Option<String> {
510 let mode = self
511 .spec
512 .overlay
513 .as_ref()
514 .map(|o| o.mode)
515 .unwrap_or_default();
516 let explicit = self
517 .spec
518 .labels
519 .get(zlayer_types::overlay::ISOLATION_NETWORK_LABEL)
520 .cloned();
521 crate::overlay_manager::resolve_isolation_network(mode, &self.service_name, explicit)
522 }
523
524 /// Set the proxy manager for health-aware load balancing
525 pub fn set_proxy_manager(&mut self, proxy_manager: Arc<ProxyManager>) {
526 self.proxy_manager = Some(proxy_manager);
527 }
528
529 /// Set the shared health states map so health callbacks can bridge state back to `ServiceManager`
530 pub fn set_health_states(&mut self, states: Arc<RwLock<HashMap<String, HealthState>>>) {
531 self.health_states = Some(states);
532 }
533
534 /// Get the last observed image digest (after the most recent successful
535 /// pull). Returns `None` when no pull has happened yet, when the runtime
536 /// does not expose digests, or when no matching `ImageInfo` was found.
537 pub async fn last_pulled_digest(&self) -> Option<String> {
538 self.last_pulled_digest.read().await.clone()
539 }
540
541 /// Pull the service image using the spec's pull policy (literal Docker /
542 /// Kubernetes semantics — no silent auto-upgrade of `IfNotPresent` to
543 /// `Newer` for `:latest` tags) and refresh the cached digest from
544 /// `Runtime::list_images` when the runtime exposes it. Returns the digest
545 /// observed after the pull, when known.
546 ///
547 /// For `Never`, the runtime is still called so it can load the image
548 /// config from the local cache (without any remote round-trip); only the
549 /// remote digest refresh is skipped. Without this call the bundle builder
550 /// has no image entrypoint/cmd and falls back to `/bin/sh`.
551 async fn pull_and_refresh_digest(&self) -> Result<Option<String>> {
552 let image_str = self.spec.image.name.to_string();
553 let policy = self.spec.image.pull_policy;
554 let spec_source = self.spec.image.source_policy.unwrap_or_default();
555
556 // Restore-from-local: when this service carries a restore pin (a digest
557 // resolved before the daemon last went down) AND its image is present
558 // locally, recreate from the already-local image with ZERO remote/S3
559 // traffic. We do this by attempting the pull with
560 // `SourcePolicy::LocalOnly` FIRST — the registry chain consults only the
561 // local blob cache + local registry (`try_local_registry`) and errors
562 // cleanly on a local miss, never reaching S3/origin. On a local miss we
563 // fall back to the spec's (remote-capable) source policy so a genuinely
564 // absent image still pulls normally. This breaks the boot-time circular
565 // dependency where ZLayer uses its own (down) ZataStorage as the S3 blob
566 // backend: an already-local service comes back regardless.
567 let restore_pin = self.restore_pin.read().await.clone();
568 let mut resolved_locally = false;
569 if restore_pin.is_some() {
570 match self
571 .runtime
572 .pull_image_with_policy(
573 &image_str,
574 policy,
575 None,
576 zlayer_spec::SourcePolicy::LocalOnly,
577 )
578 .await
579 {
580 Ok(()) => {
581 resolved_locally = true;
582 tracing::info!(
583 image = %image_str,
584 pinned_digest = ?restore_pin,
585 "restore: image present locally; recreated from local layers \
586 (no remote/S3 traffic)"
587 );
588 }
589 Err(e) => {
590 tracing::info!(
591 image = %image_str,
592 error = %e,
593 "restore: image not present locally under pinned digest; \
594 falling back to the spec source policy (remote-capable)"
595 );
596 }
597 }
598 }
599
600 // Normal (remote-capable) pull when there is no restore pin, or when the
601 // local-only restore probe missed.
602 if !resolved_locally {
603 // Resolve registry auth for the image's host from the daemon's
604 // credential store (a `zlayer login <host>` credential), falling
605 // back to `~/.docker/config.json`, then anonymous. The API create
606 // handler does this at first deploy; the supervisor's recreate path
607 // (restore / drift / scale) must too, or a private image that
608 // deployed fine fails to re-pull here. `None` store (standalone /
609 // not wired) preserves the old anonymous behaviour.
610 let resolved_auth = match &self.registry_store {
611 Some(store) => {
612 zlayer_secrets::resolve_stored_registry_auth(&image_str, store).await
613 }
614 None => None,
615 };
616 self.runtime
617 .pull_image_with_policy(&image_str, policy, resolved_auth.as_ref(), spec_source)
618 .await
619 .map_err(|e| AgentError::PullFailed {
620 image: self.spec.image.name.to_string(),
621 reason: e.to_string(),
622 })?;
623 }
624
625 // Best-effort: try to discover the resolved digest via list_images.
626 // Runtimes that don't support introspection (Unsupported) leave the
627 // cached digest unchanged; drift detection then falls back to "always
628 // recreate on PullPolicy::Always, never recreate on PullPolicy::Newer
629 // when no digests are known".
630 let new_digest = match self.runtime.list_images().await {
631 Ok(images) => images
632 .into_iter()
633 .find(|info| info.reference == image_str)
634 .and_then(|info| info.digest),
635 Err(e) => {
636 tracing::debug!(
637 image = %image_str,
638 error = %e,
639 "list_images unavailable; cannot record post-pull digest"
640 );
641 None
642 }
643 };
644
645 // When list_images doesn't expose a digest but we just resolved a pinned
646 // image strictly from local layers, the pin IS the resolved digest —
647 // re-use it so the recorded/cached value survives a restore where the
648 // runtime can't introspect the local store.
649 let effective_digest = new_digest.clone().or(if resolved_locally {
650 restore_pin.clone()
651 } else {
652 None
653 });
654
655 if let Some(ref digest) = effective_digest {
656 *self.last_pulled_digest.write().await = Some(digest.clone());
657 // Persist the resolved digest into the deployment store so a future
658 // restart can recreate this service from the already-local image.
659 // Best-effort: a failed store write is logged by the sink and never
660 // fails the deploy/scale. Requires the owning deployment name.
661 if let (Some(sink), Some(deployment)) = (&self.digest_sink, self.deployment.as_deref())
662 {
663 sink.record(deployment, &self.service_name, digest).await;
664 }
665 }
666
667 Ok(effective_digest)
668 }
669
670 /// Attach a HOST-process (Linux youki) replica to the overlay in the
671 /// "created" state — by PID, before `start_container`. Returns the assigned
672 /// overlay IP, or `None` when there is no overlay manager, the service must
673 /// not attach (host-network), the runtime is the VM-guest kind (attached
674 /// post-start over vsock instead), or no PID is available (WASM) — those
675 /// benign cases return `Ok(None)`. A genuine attach failure is a HARD error
676 /// (`Err`): we never silently downgrade a service that asked for the overlay
677 /// to host networking. Kept as a standalone, `Box::pin`ned `async fn` so the
678 /// overlayd RPC future does not inflate `scale_to`'s state machine past the
679 /// `large_futures` threshold.
680 #[cfg(not(target_os = "windows"))]
681 async fn attach_host_overlay(
682 &self,
683 id: &ContainerId,
684 container_pid: Option<u32>,
685 ) -> Result<Option<std::net::IpAddr>, AgentError> {
686 let Some(overlay) = self
687 .overlay_manager
688 .as_ref()
689 .filter(|_| self.should_attach_overlay())
690 else {
691 return Ok(None);
692 };
693 if matches!(
694 self.runtime.overlay_attach_kind_for(id).await,
695 crate::runtime::OverlayAttachKind::GuestManaged
696 | crate::runtime::OverlayAttachKind::HostProxy
697 ) {
698 // GuestManaged: attach handled post-start. HostProxy (Seatbelt /
699 // native-VZ): the node is already on the overlay via a host-level
700 // `utun` and the container shares the host stack, so there is no
701 // per-container created-state attach.
702 return Ok(None);
703 }
704 let Some(pid) = container_pid else {
705 // No PID available (e.g. WASM runtime) - skip overlay attachment.
706 tracing::debug!(
707 container = %id,
708 "skipping overlay attachment - no PID available"
709 );
710 return Ok(None);
711 };
712 let overlay_guard = overlay.read().await;
713 // Per-deployment resolv.conf search domain so the container's bare
714 // `<svc>`/`<svc>.service` resolves to THIS deployment (no
715 // cross-deployment clobber).
716 let dns_override = overlay_guard
717 .dns_domain()
718 .and_then(|zone| self.dns_search_domain(zone));
719 match overlay_guard
720 .attach_container(
721 pid,
722 &self.service_name,
723 true,
724 false,
725 self.isolation_network(),
726 dns_override,
727 )
728 .await
729 {
730 Ok(ip) => Ok(Some(ip)),
731 Err(e) => Err(AgentError::Network(format!(
732 "failed to attach container {id} to overlay network: {e}"
733 ))),
734 }
735 }
736
737 /// Reclaim a half-created replica after an init-action / start / liveness
738 /// failure in the scale-up path: detach the overlay (by the PID it was
739 /// attached with, when an attach happened) and remove the container, then
740 /// return the original error for the caller to propagate. Mirrors the job
741 /// path's detach + remove on its init/start early returns.
742 ///
743 /// `attached_pid` is `Some(pid)` only when a HOST (Linux youki) overlay
744 /// attach actually succeeded for this replica; it is always `None` on
745 /// Windows (HCN teardown happens at container-remove) and for the VM-guest
746 /// kind (released by id elsewhere). Kept as a standalone `async fn` (vs an
747 /// inline closure) so its future does not inflate `scale_to`'s state
748 /// machine past the `large_futures` threshold.
749 async fn cleanup_failed_start(
750 &self,
751 id: &ContainerId,
752 attached_pid: Option<u32>,
753 err: AgentError,
754 ) -> AgentError {
755 #[cfg(not(target_os = "windows"))]
756 if let (Some(overlay), Some(pid)) = (self.overlay_manager.as_ref(), attached_pid) {
757 let overlay_guard = overlay.read().await;
758 if let Err(de) = overlay_guard.detach_container(pid).await {
759 tracing::warn!(
760 container = %id,
761 pid,
762 error = %de,
763 "failed to detach overlay during startup cleanup (veth/IP may leak)"
764 );
765 }
766 }
767 #[cfg(target_os = "windows")]
768 let _ = attached_pid;
769 if let Err(re) = self.runtime.remove_container(id).await {
770 tracing::warn!(
771 container = %id,
772 error = %re,
773 "failed to remove container during startup cleanup"
774 );
775 }
776 err
777 }
778
779 /// Scale to the desired number of replicas
780 ///
781 /// This method uses short-lived locks to avoid blocking concurrent operations.
782 /// I/O operations (pull, create, start, stop, remove) are performed without
783 /// holding the containers lock to allow other operations to proceed.
784 ///
785 /// # Errors
786 /// Returns an error if image pull, container creation, or container lifecycle operations fail.
787 #[allow(clippy::too_many_lines, clippy::cast_possible_truncation)]
788 pub async fn scale_to(&self, replicas: u32) -> Result<()> {
789 // Phase 1: Determine current state (short read lock)
790 let current_replicas = { self.containers.read().await.len() as u32 }; // Lock released here
791
792 // Phase 1b: Pull image up front so a redeploy on `:latest` (which lands
793 // here with replicas == current_replicas in the steady state) actually
794 // refreshes the cached digest. We skip the call only when scaling
795 // strictly down (no new containers needed). For `Never` the runtime
796 // still needs to load the image config from the local cache so the
797 // bundle builder gets entrypoint/cmd/env — without it the container
798 // falls back to `/bin/sh` and exits instantly. `pull_and_refresh_digest`
799 // itself handles the Never case (no remote round-trip, cache-only).
800 if replicas >= current_replicas {
801 let _ = self.pull_and_refresh_digest().await?;
802 }
803
804 // Phase 2: Scale up - create new containers (no lock held during I/O)
805 //
806 // Compute (role, replica_index) tuples for each new replica. When
807 // `spec.replica_groups` is set, expand groups in declaration order so
808 // each created replica maps to its declared `(role, intra_group_index)`.
809 // Otherwise fall back to the implicit single "default" group. The
810 // `local_node_id` is captured once so every new `ContainerId` carries
811 // the owning node identity for cross-node disambiguation.
812 let local_node_id = self.node_id;
813 if replicas > current_replicas {
814 let replica_specs: Vec<(String, u32)> =
815 if let Some(groups) = self.spec.replica_groups.as_ref() {
816 let mut specs: Vec<(String, u32)> = Vec::new();
817 for group in groups {
818 for idx in 0..group.count {
819 specs.push((group.role.clone(), idx + 1));
820 }
821 }
822 specs
823 .into_iter()
824 .skip(current_replicas as usize)
825 .take((replicas - current_replicas) as usize)
826 .collect()
827 } else {
828 (current_replicas..replicas)
829 .map(|i| ("default".to_string(), i + 1))
830 .collect()
831 };
832
833 for (role, replica_idx) in replica_specs {
834 let id = ContainerId::with_role_and_node(
835 self.service_name.clone(),
836 replica_idx,
837 role,
838 local_node_id,
839 );
840
841 // Reap any leftover container occupying this id before
842 // creating it. After a daemon restart a `KillMode=process`
843 // survivor (or a crash-orphan) from the previous instance can
844 // still hold this id; `create_container` would then collide
845 // ("container already exists"). Policy is reap & recreate, so
846 // stop+remove the stale one first. `NotFound` is the normal,
847 // expected clean-slate case.
848 match self.runtime.container_state(&id).await {
849 Err(AgentError::NotFound { .. }) => {}
850 Ok(state) => {
851 tracing::warn!(
852 container = %id,
853 ?state,
854 "reaping leftover container before recreate (reap & recreate)"
855 );
856 let _ = self
857 .runtime
858 .stop_container(&id, std::time::Duration::from_secs(5))
859 .await;
860 if let Err(e) = self.runtime.remove_container(&id).await {
861 tracing::warn!(
862 container = %id,
863 error = %e,
864 "failed to remove leftover container before recreate; continuing"
865 );
866 }
867 }
868 Err(e) => {
869 tracing::warn!(
870 container = %id,
871 error = %e,
872 "container_state probe failed before create; attempting create anyway"
873 );
874 }
875 }
876
877 // Create container (no lock needed - I/O operation)
878 //
879 // RouteToPeer must propagate unchanged: the scheduler uses it
880 // to re-place the workload on a capable peer, and wrapping it
881 // in `CreateFailed` would hide the signal and mark the service
882 // dead instead of rescheduling it. All other errors are
883 // normalised to `CreateFailed` for upstream handling.
884 self.runtime
885 .create_container(&id, &self.spec)
886 .await
887 .map_err(|e| match e {
888 AgentError::RouteToPeer { .. } => e,
889 other => AgentError::CreateFailed {
890 id: id.to_string(),
891 reason: other.to_string(),
892 },
893 })?;
894
895 // Get container PID with retries (may not be immediately
896 // available). youki writes the init pid-file at CREATE time
897 // (the init is paused on the start fifo), so the PID is known
898 // in the "created" state — BEFORE start_container. We capture
899 // it here so the host overlay attach below can run while the
900 // process is still pid-1, dumpable, and root-owned. Once the
901 // entrypoint execve's + drops to a non-root user the netns
902 // becomes non-dumpable (overlayd EACCES on /proc/<pid>/ns/net),
903 // and a one-shot that already exited would 404 (ENOENT). WASM
904 // runtimes have no PID and report None here, which the host
905 // attach arm skips.
906 let mut container_pid = None;
907 for attempt in 1..=5u32 {
908 match self.runtime.get_container_pid(&id).await {
909 Ok(Some(pid)) => {
910 container_pid = Some(pid);
911 break;
912 }
913 Ok(None) if attempt < 5 => {
914 tracing::debug!(container = %id, attempt, "PID not available yet, retrying");
915 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
916 }
917 Ok(None) => {
918 tracing::warn!(container = %id, "Container PID unavailable after 5 attempts");
919 }
920 Err(e) => {
921 tracing::warn!(container = %id, attempt, error = %e, "Failed to get PID");
922 if attempt < 5 {
923 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
924 }
925 }
926 }
927 }
928
929 // Attach the HOST-process (Linux youki) overlay veth in the
930 // "created" state — between create_container and
931 // start_container — mirroring the job path. The guest (VZ
932 // vsock) and Windows (HCN) attaches stay POST-start below: the
933 // guest pushes config into a RUNNING guest over vsock, and
934 // Windows attaches its HCN endpoint at create time inside
935 // overlayd (here we only read the IP back, post-start).
936 //
937 // `host_overlay_ip` carries the host arm's result so the shared
938 // DNS registration + `overlay_ip` merge below run exactly once,
939 // after start, for whichever attach kind produced an IP.
940 #[cfg(not(target_os = "windows"))]
941 let host_overlay_ip: Option<std::net::IpAddr> =
942 match Box::pin(self.attach_host_overlay(&id, container_pid)).await {
943 Ok(ip) => ip,
944 Err(e) => {
945 return Err(Box::pin(self.cleanup_failed_start(&id, None, e)).await);
946 }
947 };
948
949 // From here on, an init-action or start failure must reclaim the
950 // overlay veth/IP we just attached AND remove the half-created
951 // container, or both leak (the attach moved BEFORE start, so the
952 // old `?`-early-returns would now strand a veth). Mirrors the job
953 // path's detach + remove on its init/start early returns. The
954 // cleanup lives in a separate `async fn` (and is `Box::pin`ned at
955 // the await sites) so its future does not inflate `scale_to`'s
956 // state machine past the `large_futures` threshold.
957 #[cfg(not(target_os = "windows"))]
958 let attached_pid = if host_overlay_ip.is_some() {
959 container_pid
960 } else {
961 None
962 };
963 #[cfg(target_os = "windows")]
964 let attached_pid: Option<u32> = None;
965
966 // Run init actions with error policy enforcement (no lock needed)
967 let init_orchestrator = InitOrchestrator::with_error_policy(
968 id.clone(),
969 self.spec.init.clone(),
970 self.spec.errors.clone(),
971 );
972 if let Err(e) = init_orchestrator.run().await {
973 return Err(Box::pin(self.cleanup_failed_start(&id, attached_pid, e)).await);
974 }
975
976 // Start container (no lock needed - I/O operation)
977 if let Err(e) =
978 self.runtime
979 .start_container(&id)
980 .await
981 .map_err(|e| AgentError::StartFailed {
982 id: id.to_string(),
983 reason: e.to_string(),
984 })
985 {
986 return Err(Box::pin(self.cleanup_failed_start(&id, attached_pid, e)).await);
987 }
988
989 // Verify the container is still running after start. If the
990 // init process crashed during start (bad image, missing libs,
991 // failed mount), surface the real cause from the container's
992 // log tail instead of a cryptic downstream cascade.
993 if container_pid.is_some() {
994 let alive = match self.runtime.container_state(&id).await {
995 Ok(
996 ContainerState::Running
997 | ContainerState::Pending
998 | ContainerState::Initializing,
999 ) => true,
1000 Ok(state) => {
1001 tracing::warn!(
1002 container = %id,
1003 ?state,
1004 "container exited immediately after start"
1005 );
1006 false
1007 }
1008 Err(e) => {
1009 // State query failed — don't block on it.
1010 tracing::warn!(
1011 container = %id,
1012 error = %e,
1013 "container state query failed after start, proceeding"
1014 );
1015 true
1016 }
1017 };
1018 if !alive {
1019 let log_tail = self.runtime.container_logs(&id, 40).await.ok().map_or_else(
1020 || " <log read failed>".to_string(),
1021 |entries| {
1022 if entries.is_empty() {
1023 " <no log output>".to_string()
1024 } else {
1025 entries
1026 .into_iter()
1027 .map(|e| format!(" {}", e.message))
1028 .collect::<Vec<_>>()
1029 .join("\n")
1030 }
1031 },
1032 );
1033 return Err(Box::pin(self.cleanup_failed_start(
1034 &id,
1035 attached_pid,
1036 AgentError::StartFailed {
1037 id: id.to_string(),
1038 reason: format!("container exited during startup:\n{log_tail}"),
1039 },
1040 ))
1041 .await);
1042 }
1043 }
1044
1045 // Resolve the overlay IP. The host (Linux youki) attach already
1046 // ran in the created state above; here we only run the guest
1047 // (vsock) and Windows (HCN read-back) kinds, which require a
1048 // RUNNING container, then register DNS once for whichever kind
1049 // produced an IP.
1050 let overlay_ip = if let Some(overlay) = self
1051 .overlay_manager
1052 .as_ref()
1053 .filter(|_| self.should_attach_overlay())
1054 {
1055 let overlay_guard = overlay.read().await;
1056 #[cfg(target_os = "windows")]
1057 let attach_result: Option<std::net::IpAddr> = {
1058 match self.runtime.overlay_attach_kind_for(&id).await {
1059 // HCS: the HCN endpoint + per-container namespace were
1060 // created inside `HcsRuntime::create_container` by
1061 // overlayd; here we only need the IP it assigned so we
1062 // can register DNS for service discovery.
1063 crate::runtime::OverlayAttachKind::HostIp => {
1064 let _ = &overlay_guard; // unused for the read-back path
1065 match self.runtime.get_container_ip(&id).await {
1066 Ok(Some(ip)) => Some(ip),
1067 Ok(None) => {
1068 tracing::debug!(
1069 container = %id,
1070 "no overlay IP recorded (HCS attach skipped at create time)"
1071 );
1072 None
1073 }
1074 Err(e) => {
1075 tracing::warn!(
1076 container = %id,
1077 error = %e,
1078 "failed to fetch container overlay IP"
1079 );
1080 None
1081 }
1082 }
1083 }
1084 // WSL2 Linux guest: overlayd allocates the overlay
1085 // identity and we push the WireGuard config into the
1086 // distro's network namespace, where it brings up its own
1087 // kernel WireGuard device. (Same as the non-Windows
1088 // GuestManaged arm below.)
1089 crate::runtime::OverlayAttachKind::GuestManaged => {
1090 let cid = id.to_string();
1091 // Per-deployment resolv.conf search domain so the
1092 // guest's bare `<svc>`/`<svc>.service` resolves to
1093 // THIS deployment (no cross-deployment clobber).
1094 let dns_override = overlay_guard
1095 .dns_domain()
1096 .and_then(|zone| self.dns_search_domain(zone));
1097 match overlay_guard
1098 .attach_container_guest(
1099 &cid,
1100 &self.service_name,
1101 true,
1102 self.isolation_network(),
1103 dns_override,
1104 )
1105 .await
1106 {
1107 Ok(cfg) => {
1108 let ip = cfg.overlay_ip;
1109 match self.runtime.push_overlay_config(&id, &cfg).await {
1110 Ok(()) => Some(ip),
1111 Err(e) => {
1112 tracing::warn!(
1113 container = %id,
1114 error = %e,
1115 "failed to push overlay config into guest; rolling back allocation"
1116 );
1117 // Don't leak the overlayd IP/peer.
1118 if let Err(de) =
1119 overlay_guard.detach_container_guest(&cid).await
1120 {
1121 tracing::warn!(
1122 container = %id,
1123 error = %de,
1124 "failed to roll back guest overlay allocation"
1125 );
1126 }
1127 None
1128 }
1129 }
1130 }
1131 Err(e) => {
1132 tracing::warn!(
1133 container = %id,
1134 error = %e,
1135 "failed to allocate guest overlay config from overlayd"
1136 );
1137 None
1138 }
1139 }
1140 }
1141 // HostProxy/HostNetns don't occur on Windows.
1142 _ => None,
1143 }
1144 };
1145 #[cfg(not(target_os = "windows"))]
1146 let attach_result: Option<std::net::IpAddr> = {
1147 match self.runtime.overlay_attach_kind_for(&id).await {
1148 // VM guest (macOS VZ-Linux): no host netns/PID, so
1149 // overlayd allocates the overlay identity and we push
1150 // it into the RUNNING guest over vsock, where it brings
1151 // up its own kernel WireGuard device.
1152 crate::runtime::OverlayAttachKind::GuestManaged => {
1153 let cid = id.to_string();
1154 // Per-deployment resolv.conf search domain so the
1155 // guest's bare `<svc>`/`<svc>.service` resolves to
1156 // THIS deployment (no cross-deployment clobber).
1157 let dns_override = overlay_guard
1158 .dns_domain()
1159 .and_then(|zone| self.dns_search_domain(zone));
1160 match overlay_guard
1161 .attach_container_guest(
1162 &cid,
1163 &self.service_name,
1164 true,
1165 self.isolation_network(),
1166 dns_override,
1167 )
1168 .await
1169 {
1170 Ok(cfg) => {
1171 let ip = cfg.overlay_ip;
1172 match self.runtime.push_overlay_config(&id, &cfg).await {
1173 Ok(()) => Some(ip),
1174 Err(e) => {
1175 tracing::warn!(
1176 container = %id,
1177 error = %e,
1178 "failed to push overlay config into guest; rolling back allocation"
1179 );
1180 // Don't leak the overlayd IP/peer.
1181 if let Err(de) =
1182 overlay_guard.detach_container_guest(&cid).await
1183 {
1184 tracing::warn!(
1185 container = %id,
1186 error = %de,
1187 "failed to roll back guest overlay allocation"
1188 );
1189 }
1190 None
1191 }
1192 }
1193 }
1194 Err(e) => {
1195 tracing::warn!(
1196 container = %id,
1197 error = %e,
1198 "failed to allocate guest overlay config from overlayd"
1199 );
1200 None
1201 }
1202 }
1203 }
1204 // Host-shared native runtime (Seatbelt / native-VZ /
1205 // libkrun): overlayd allocates a distinct overlay /32
1206 // from the node slice + utun alias; the runtime
1207 // forwards <overlay_ip>:port to the container's local
1208 // delivery.
1209 crate::runtime::OverlayAttachKind::HostProxy => {
1210 let dns_override = overlay_guard
1211 .dns_domain()
1212 .and_then(|zone| self.dns_search_domain(zone));
1213 match overlay_guard
1214 .attach_container_host_shared(
1215 &id.to_string(),
1216 &self.service_name,
1217 false,
1218 self.isolation_network(),
1219 dns_override,
1220 )
1221 .await
1222 {
1223 Ok(ip) => {
1224 if let Err(e) =
1225 self.runtime.attach_overlay_ip(&id, ip).await
1226 {
1227 tracing::warn!(
1228 container = %id,
1229 error = %e,
1230 "failed to start host-shared overlay forwarders"
1231 );
1232 }
1233 Some(ip)
1234 }
1235 Err(e) => {
1236 tracing::warn!(
1237 container = %id,
1238 error = %e,
1239 "failed to attach host-shared container to overlay"
1240 );
1241 None
1242 }
1243 }
1244 }
1245 // Host-process runtimes (Linux youki): already
1246 // attached in the created state above.
1247 _ => host_overlay_ip,
1248 }
1249 };
1250
1251 if let Some(ip) = attach_result {
1252 tracing::info!(
1253 container = %id,
1254 overlay_ip = %ip,
1255 "attached container to overlay network"
1256 );
1257
1258 // Register DNS for service discovery.
1259 if let Some(dns) = &self.dns_server {
1260 for hostname in self.dns_hostnames(&id) {
1261 match dns.add_record(&hostname, ip).await {
1262 Ok(()) => tracing::debug!(
1263 hostname = %hostname,
1264 ip = %ip,
1265 "registered service-discovery DNS record"
1266 ),
1267 Err(e) => tracing::warn!(
1268 hostname = %hostname,
1269 error = %e,
1270 "failed to register service-discovery DNS record"
1271 ),
1272 }
1273 }
1274
1275 // Register external service domains (vhosts) so a
1276 // client resolving `<host>` lands on an
1277 // ingress-capable node, whose 80/443 proxy fans out
1278 // to this service's overlay-IP backends anywhere in
1279 // the mesh. This is ADDITIONAL to the
1280 // deployment-scoped service-discovery records above.
1281 self.register_external_domains(dns).await;
1282 }
1283
1284 Some(ip)
1285 } else {
1286 None
1287 }
1288 } else {
1289 None
1290 };
1291
1292 // If overlay failed, try the container runtime's own IP as fallback
1293 let effective_ip = if overlay_ip.is_none() {
1294 match self.runtime.get_container_ip(&id).await {
1295 Ok(Some(ip)) => {
1296 tracing::info!(
1297 container = %id,
1298 ip = %ip,
1299 "using runtime container IP for proxy (overlay unavailable)"
1300 );
1301 Some(ip)
1302 }
1303 Ok(None) => {
1304 tracing::warn!(
1305 container = %id,
1306 "no container IP available from runtime, proxy routing will be unavailable"
1307 );
1308 None
1309 }
1310 Err(e) => {
1311 tracing::warn!(
1312 container = %id,
1313 error = %e,
1314 "failed to get container IP from runtime"
1315 );
1316 None
1317 }
1318 }
1319 } else {
1320 overlay_ip
1321 };
1322
1323 tracing::info!(
1324 container = %id,
1325 service = %self.service_name,
1326 overlay_ip = ?overlay_ip,
1327 effective_ip = ?effective_ip,
1328 "Container IP resolution complete"
1329 );
1330
1331 // Query port override from the runtime.
1332 // On macOS sandbox, each container is assigned a unique port since
1333 // all processes share the host network (no network namespaces).
1334 // The runtime passes the port to the process via the PORT env var.
1335 let port_override = match self.runtime.get_container_port_override(&id).await {
1336 Ok(Some(port)) => {
1337 tracing::info!(
1338 container = %id,
1339 port = port,
1340 "runtime assigned dynamic port override for this container"
1341 );
1342 Some(port)
1343 }
1344 Ok(None) => None,
1345 Err(e) => {
1346 tracing::warn!(
1347 container = %id,
1348 error = %e,
1349 "failed to query port override from runtime, using spec port"
1350 );
1351 None
1352 }
1353 };
1354
1355 // Start health monitoring and store handle (no lock needed during start)
1356 let health_monitor_handle = {
1357 let mut check = self.spec.health.check.clone();
1358
1359 // Resolve Tcp { port: 0 } ("use first endpoint") to the actual
1360 // port the container is listening on. With mac-sandbox, each
1361 // replica gets a unique assigned port via port_override.
1362 if let HealthCheck::Tcp { ref mut port } = check {
1363 if *port == 0 {
1364 *port = port_override.unwrap_or_else(|| {
1365 self.spec
1366 .endpoints
1367 .iter()
1368 .find(|ep| {
1369 matches!(
1370 ep.protocol,
1371 Protocol::Http | Protocol::Https | Protocol::Websocket
1372 )
1373 })
1374 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
1375 });
1376 }
1377 }
1378
1379 let start_grace = self
1380 .spec
1381 .health
1382 .start_grace
1383 .unwrap_or(Duration::from_secs(5));
1384 let check_timeout = self.spec.health.timeout.unwrap_or(Duration::from_secs(5));
1385 let interval = self.spec.health.interval.unwrap_or(Duration::from_secs(10));
1386 let retries = self.spec.health.retries;
1387
1388 let checker = HealthChecker::new(check, effective_ip);
1389 let mut monitor = HealthMonitor::new(id.clone(), checker, interval, retries)
1390 .with_start_grace(start_grace)
1391 .with_check_timeout(check_timeout);
1392
1393 // Build the optional proxy backend handle. This is only present
1394 // when both a proxy manager AND a reachable overlay IP exist; in
1395 // degraded-overlay / no-proxy deployments it stays None and the
1396 // callback below skips all proxy work while STILL bridging health
1397 // state back into ServiceManager.
1398 let proxy_backend: Option<(Arc<ProxyManager>, SocketAddr)> =
1399 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, effective_ip) {
1400 let proxy = Arc::clone(proxy);
1401 // Get the container's target port, using the runtime override if
1402 // present. On macOS sandbox, port_override gives each replica a
1403 // unique port so the proxy can distinguish backends sharing
1404 // 127.0.0.1.
1405 let port = port_override.unwrap_or_else(|| {
1406 self.spec
1407 .endpoints
1408 .iter()
1409 .find(|ep| {
1410 matches!(
1411 ep.protocol,
1412 Protocol::Http | Protocol::Https | Protocol::Websocket
1413 )
1414 })
1415 .map_or(8080, zlayer_spec::EndpointSpec::target_port)
1416 });
1417
1418 let backend_addr = SocketAddr::new(ip, port);
1419
1420 // Register backend with load balancer so proxy can route to it.
1421 // This must happen before the health callback is created, because
1422 // update_backend_health only updates *existing* backends.
1423 proxy.add_backend(&self.service_name, backend_addr).await;
1424
1425 // Publish this container's exposed ports on the node
1426 // loopback (`127.0.0.1:<endpoint.port>`) so a consumer
1427 // sharing the node loopback can reach the service at
1428 // `localhost:<port>`. Gated on the spec's policy
1429 // (`Auto` publishes only for single-member services).
1430 // Uses the SAME runtime-resolved `ip`/`port_override`
1431 // as the backend above: on macOS each replica shares
1432 // 127.0.0.1 with a unique override; on Linux/VM the
1433 // overlay IP carries the declared target port.
1434 if self.spec.publish_to_node_loopback() {
1435 if let Err(e) = proxy
1436 .publish_loopback_for_container(
1437 self.deployment.as_deref(),
1438 &self.service_name,
1439 &self.spec,
1440 ip,
1441 port_override,
1442 )
1443 .await
1444 {
1445 // A host-port conflict means a DIFFERENT
1446 // deployment/service already owns this
1447 // published port; refuse to cross-wire
1448 // (Bug 7) and surface it at deploy time.
1449 tracing::error!(
1450 service = %self.service_name,
1451 error = %e,
1452 "Failed to publish container ports on node loopback"
1453 );
1454 return Err(e);
1455 }
1456 }
1457
1458 // Publish explicit Docker-style port mappings
1459 // (`host_ip:host_port -> container_port`). Unlike the
1460 // loopback publish above this is NOT gated on
1461 // `publish_to_node_loopback()` and is driven by
1462 // `port_mappings` rather than endpoints, so a workload
1463 // with port mappings and no endpoints still gets a host
1464 // listener. Backend port mirrors the loopback choice:
1465 // the runtime `port_override` on macOS sandbox, else the
1466 // mapping's `container_port`.
1467 for mapping in &self.spec.port_mappings {
1468 let pm_backend = SocketAddr::new(
1469 ip,
1470 port_override.unwrap_or(mapping.container_port),
1471 );
1472 if let Err(e) = proxy
1473 .publish_port_mapping(
1474 self.deployment.as_deref(),
1475 &self.service_name,
1476 mapping,
1477 pm_backend,
1478 )
1479 .await
1480 {
1481 tracing::error!(
1482 service = %self.service_name,
1483 host_port = ?mapping.host_port,
1484 container_port = mapping.container_port,
1485 error = %e,
1486 "Failed to publish container port mapping"
1487 );
1488 return Err(e);
1489 }
1490 }
1491
1492 Some((proxy, backend_addr))
1493 } else {
1494 None
1495 };
1496
1497 // The health bridge is ALWAYS attached, independent of proxy/IP
1498 // availability. stabilization::wait_for_stabilization only treats a
1499 // service as ready when health_states[name] == Healthy, so this write
1500 // must happen even when the overlay is degraded and no proxy backend
1501 // exists — otherwise the service stays healthy=false forever and
1502 // stabilization times out.
1503 let health_states_opt = self.health_states.clone();
1504 let svc_name_for_states = self.service_name.clone();
1505 let svc_name_for_proxy = self.service_name.clone();
1506 let svc_name_for_log = self.service_name.clone();
1507
1508 let health_callback: HealthCallback =
1509 Arc::new(move |container_id: ContainerId, is_healthy: bool| {
1510 tracing::info!(
1511 container = %container_id,
1512 service = %svc_name_for_log,
1513 healthy = is_healthy,
1514 has_proxy_backend = proxy_backend.is_some(),
1515 "health status changed"
1516 );
1517
1518 // Always bridge health state back to ServiceManager's
1519 // health_states map (unconditional — no proxy/IP required).
1520 if let Some(ref health_states) = health_states_opt {
1521 let states = Arc::clone(health_states);
1522 let svc = svc_name_for_states.clone();
1523 tokio::spawn(async move {
1524 let state = if is_healthy {
1525 HealthState::Healthy
1526 } else {
1527 HealthState::Unhealthy {
1528 failures: 0,
1529 reason: "health check failed".into(),
1530 }
1531 };
1532 states.write().await.insert(svc, state);
1533 });
1534 }
1535
1536 // Update proxy backend health only when a proxy backend was
1537 // registered (proxy manager + reachable overlay IP present).
1538 if let Some((proxy, backend_addr)) = proxy_backend.clone() {
1539 let svc = svc_name_for_proxy.clone();
1540 tokio::spawn(async move {
1541 proxy
1542 .update_backend_health(&svc, backend_addr, is_healthy)
1543 .await;
1544 });
1545 }
1546 });
1547
1548 monitor = monitor.with_callback(health_callback);
1549
1550 monitor.start()
1551 };
1552
1553 // Update state (short write lock)
1554 {
1555 let mut containers = self.containers.write().await;
1556 containers.insert(
1557 id.clone(),
1558 Container {
1559 id: id.clone(),
1560 image: self.spec.image.name.to_string(),
1561 state: ContainerState::Running,
1562 // Record the init PID captured at start so the
1563 // scale-down detach can release the overlay IP +
1564 // veth even after the container has exited (a re-
1565 // query via `get_container_pid` returns None for a
1566 // dead container, so the detach — and the per-
1567 // container `/28` IP release — would be skipped,
1568 // leaking the IP until the whole service is torn
1569 // down).
1570 pid: container_pid,
1571 task: None,
1572 overlay_ip: effective_ip,
1573 health_monitor: Some(health_monitor_handle),
1574 port_override,
1575 },
1576 );
1577 } // Lock released here
1578 }
1579 }
1580
1581 // Phase 3: Scale down - remove containers (short write lock per removal)
1582 //
1583 // Containers were created with `with_role_and_node(role, local_node_id)`
1584 // on scale-up, so we must reconstruct the same identity on scale-down
1585 // — the role is derived from `replica_groups` via `role_for_replica`
1586 // and the node id is the local cluster node. Mismatched ids would miss
1587 // the live entry in `self.containers` and leak the container.
1588 if replicas < current_replicas {
1589 for i in replicas..current_replicas {
1590 let replica_idx = i + 1;
1591 let id = ContainerId::with_role_and_node(
1592 self.service_name.clone(),
1593 replica_idx,
1594 self.role_for_replica(replica_idx),
1595 local_node_id,
1596 );
1597
1598 // Remove from state first and get the container to abort health monitor (short write lock)
1599 let removed_container = {
1600 let mut containers = self.containers.write().await;
1601 containers.remove(&id)
1602 }; // Lock released here
1603
1604 // Then perform cleanup (no lock held - I/O operations)
1605 if let Some(container) = removed_container {
1606 // Abort the health monitor task if it exists
1607 if let Some(handle) = container.health_monitor {
1608 handle.abort();
1609 }
1610
1611 // Unpublish this container's node-loopback ports (mirror of
1612 // the publish in the start path above). Recomputes the same
1613 // backend from the container's stored runtime-resolved IP and
1614 // port override; the last replica's removal frees the
1615 // loopback listener. Gated identically to publish.
1616 if self.spec.publish_to_node_loopback() {
1617 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip)
1618 {
1619 proxy
1620 .unpublish_loopback_for_container(
1621 &self.spec,
1622 ip,
1623 container.port_override,
1624 )
1625 .await;
1626 }
1627 }
1628
1629 // Unpublish explicit port mappings for this container (mirror
1630 // of the publish in the start path). NOT gated on
1631 // `publish_to_node_loopback()`. Only explicit host ports are
1632 // torn down precisely; ephemeral mappings (no `host_port`) are
1633 // freed when their listener task is aborted on shutdown.
1634 if let (Some(proxy), Some(ip)) = (&self.proxy_manager, container.overlay_ip) {
1635 for mapping in &self.spec.port_mappings {
1636 if let Some(hp) = mapping.host_port.filter(|p| *p != 0) {
1637 let pm_backend = SocketAddr::new(
1638 ip,
1639 container.port_override.unwrap_or(mapping.container_port),
1640 );
1641 proxy
1642 .unpublish_port_mapping(hp, mapping.protocol, pm_backend)
1643 .await;
1644 }
1645 }
1646 }
1647
1648 // Remove DNS records for this container
1649 if let Some(dns) = &self.dns_server {
1650 // Remove replica-specific DNS entry
1651 let replica_hostname =
1652 format!("{}.{}.service.local", id.replica, self.service_name);
1653 if let Err(e) = dns.remove_record(&replica_hostname).await {
1654 tracing::warn!(
1655 hostname = %replica_hostname,
1656 error = %e,
1657 "failed to remove replica DNS record"
1658 );
1659 } else {
1660 tracing::debug!(
1661 hostname = %replica_hostname,
1662 "removed replica DNS record"
1663 );
1664 }
1665
1666 // Remove per-role DNS entry if this was a non-default group.
1667 // Note: this is best-effort and removes the record even if
1668 // other replicas in the same role still need it — the DNS
1669 // server's add/remove API is single-record so we can't keep
1670 // it alive for siblings. P2.3-bis (round-robin per-role)
1671 // can fix this later via a per-role refcount; for now the
1672 // service-level hostname keeps cluster-internal clients
1673 // working even when the role-specific record briefly
1674 // disappears.
1675 if id.role != "default" {
1676 let role_hostname =
1677 format!("{}.{}.service.local", id.role, self.service_name);
1678 if let Err(e) = dns.remove_record(&role_hostname).await {
1679 tracing::warn!(
1680 hostname = %role_hostname,
1681 error = %e,
1682 "failed to remove role DNS record"
1683 );
1684 } else {
1685 tracing::debug!(
1686 hostname = %role_hostname,
1687 "removed role DNS record"
1688 );
1689 }
1690 }
1691
1692 // Note: We don't remove the service-level hostname here because
1693 // other replicas may still be using it. The service-level record
1694 // should be cleaned up when the entire service is removed.
1695 }
1696
1697 // Detach from overlay network if manager available.
1698 //
1699 // Done BEFORE stop_container because:
1700 // - The container init process must still be in
1701 // /proc to look up its PID via `get_container_pid`.
1702 // - `OverlayManager::detach_container` deletes host-side
1703 // veth interfaces by name (`veth-<pid>-*`) and
1704 // releases the allocated overlay IPs back to the
1705 // per-node slice. Without this the IPs leak across
1706 // container churn and the slice exhausts.
1707 //
1708 // Best-effort: failures are logged but never abort the
1709 // scale-down. The periodic orphan sweep
1710 // (`start_periodic_orphan_sweep`) catches anything we
1711 // missed.
1712 if let Some(overlay) = &self.overlay_manager {
1713 // VM guests have no host veth/PID — release the overlayd
1714 // allocation (IP + registered mesh peer) by container id
1715 // instead of by PID.
1716 let detach_kind = self.runtime.overlay_attach_kind_for(&id).await;
1717 if detach_kind == crate::runtime::OverlayAttachKind::HostProxy {
1718 let overlay_guard = overlay.read().await;
1719 if let Err(e) = overlay_guard
1720 .detach_container_host_shared(&id.to_string())
1721 .await
1722 {
1723 tracing::warn!(
1724 container = %id,
1725 error = %e,
1726 "overlay detach_container_host_shared failed; relying on orphan sweep"
1727 );
1728 }
1729 if let Err(e) = self.runtime.detach_overlay_ip(&id).await {
1730 tracing::warn!(
1731 container = %id,
1732 error = %e,
1733 "failed to stop host-shared overlay forwarders"
1734 );
1735 }
1736 } else if detach_kind == crate::runtime::OverlayAttachKind::GuestManaged {
1737 let overlay_guard = overlay.read().await;
1738 if let Err(e) =
1739 overlay_guard.detach_container_guest(&id.to_string()).await
1740 {
1741 tracing::warn!(
1742 container = %id,
1743 error = %e,
1744 "overlay detach_container_guest failed; relying on orphan sweep"
1745 );
1746 }
1747 } else if detach_kind == crate::runtime::OverlayAttachKind::HostIp {
1748 // HCS: overlay teardown happens inside overlayd at
1749 // container remove-time; nothing to detach here (and the
1750 // Windows pid is not a Linux netns PID).
1751 tracing::debug!(
1752 container = %id,
1753 "HostIp (HCS) detach handled at remove-time; skipping"
1754 );
1755 } else if let Some(pid) = container.pid {
1756 // Detach by the PID recorded AT START, not a live
1757 // re-query: a crashed/exited container has no live
1758 // PID, but overlayd still holds its attachment
1759 // (keyed by that PID) and the host-side veth
1760 // (`veth-<pid>-s`) — so detaching by the stored PID
1761 // releases the per-container `/28` IP + deletes the
1762 // veth, instead of leaking them.
1763 let overlay_guard = overlay.read().await;
1764 if let Err(e) = overlay_guard.detach_container(pid).await {
1765 tracing::warn!(
1766 container = %id,
1767 pid,
1768 error = %e,
1769 "overlay detach_container failed"
1770 );
1771 }
1772 } else {
1773 tracing::debug!(
1774 container = %id,
1775 "no recorded PID for overlay detach (never attached or non-Linux runtime)"
1776 );
1777 }
1778 }
1779
1780 // Stop container
1781 self.runtime
1782 .stop_container(&id, Duration::from_secs(30))
1783 .await?;
1784
1785 // Sync volumes to S3 before removal (no-op if not configured)
1786 if let Err(e) = self.runtime.sync_container_volumes(&id).await {
1787 tracing::warn!(
1788 container = %id,
1789 error = %e,
1790 "failed to sync volumes before removal"
1791 );
1792 }
1793
1794 // Remove container
1795 self.runtime.remove_container(&id).await?;
1796
1797 // Revoke the container's scoped token (best-effort). Recompute
1798 // the deterministic jti the runtime minted under:
1799 // `container:<service>:<service>-<replica>`.
1800 if let Some(sink) = self.token_sink.as_ref() {
1801 sink.revoke(&format!(
1802 "container:{}:{}-{}",
1803 id.service, id.service, id.replica
1804 ))
1805 .await;
1806 }
1807 }
1808 }
1809 }
1810
1811 Ok(())
1812 }
1813
1814 /// Get current number of replicas
1815 pub async fn replica_count(&self) -> usize {
1816 self.containers.read().await.len()
1817 }
1818
1819 /// Get all container IDs
1820 pub async fn container_ids(&self) -> Vec<ContainerId> {
1821 self.containers.read().await.keys().cloned().collect()
1822 }
1823
1824 /// Get per-container info (id, image, state, pid, overlay IP) for every
1825 /// live container in this instance.
1826 ///
1827 /// Surfaces the REAL image reference each container was created from and its
1828 /// REAL lifecycle state (lowercased via [`ContainerState::as_str`]) so the
1829 /// API/`ps` no longer reports a hardcoded `"running"` with no image.
1830 pub async fn container_infos(&self) -> Vec<ContainerInfo> {
1831 self.containers
1832 .read()
1833 .await
1834 .values()
1835 .map(|c| ContainerInfo {
1836 id: c.id.clone(),
1837 image: c.image.clone(),
1838 state: c.state.as_str().to_string(),
1839 pid: c.pid,
1840 overlay_ip: c.overlay_ip.map(|ip| ip.to_string()),
1841 })
1842 .collect()
1843 }
1844
1845 /// Get read access to the containers map
1846 ///
1847 /// This allows callers to access container overlay IPs and other metadata
1848 /// without copying the entire map.
1849 pub fn containers(
1850 &self,
1851 ) -> &tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>> {
1852 &self.containers
1853 }
1854
1855 /// Check if this service instance has an overlay manager configured
1856 pub fn has_overlay_manager(&self) -> bool {
1857 self.overlay_manager.is_some()
1858 }
1859
1860 /// Check if this service instance has a proxy manager configured
1861 pub fn has_proxy_manager(&self) -> bool {
1862 self.proxy_manager.is_some()
1863 }
1864
1865 /// Get the proxy manager for this instance, if configured.
1866 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
1867 self.proxy_manager.as_ref()
1868 }
1869
1870 /// Check if this service instance has a DNS server configured
1871 pub fn has_dns_server(&self) -> bool {
1872 self.dns_server.is_some()
1873 }
1874}
1875
1876/// Per-container summary surfaced to callers (API / `ps`).
1877///
1878/// Carries the REAL image reference and lifecycle state of a single live
1879/// container, replacing the previous id-only view that forced the API to
1880/// fabricate a hardcoded `"running"` state with no image.
1881#[derive(Debug, Clone)]
1882pub struct ContainerInfo {
1883 /// Container identity.
1884 pub id: ContainerId,
1885 /// Image reference the container was created from (canonical form).
1886 pub image: String,
1887 /// Lowercased lifecycle state (e.g. `"running"`, `"exited"`).
1888 pub state: String,
1889 /// Process ID, when the container is running.
1890 pub pid: Option<u32>,
1891 /// Overlay IP rendered as a string, when assigned.
1892 pub overlay_ip: Option<String>,
1893}
1894
1895/// A live deployment container enriched for Docker-compat `ps` rows and for
1896/// name resolution. Produced by [`ServiceManager::list_container_views`].
1897#[derive(Debug, Clone)]
1898pub struct DeploymentContainerView {
1899 /// Deployment (compose project) name, when known.
1900 pub deployment: Option<String>,
1901 /// Service name within the deployment.
1902 pub service: String,
1903 /// Concrete container identity.
1904 pub container_id: ContainerId,
1905 /// Compose `container_name:` (the user-facing Docker name), when set.
1906 pub container_name: Option<String>,
1907 /// Image reference the container was created from.
1908 pub image: String,
1909 /// Lowercased lifecycle state (e.g. `"running"`).
1910 pub state: String,
1911 /// Process id when running.
1912 pub pid: Option<u32>,
1913 /// The service's published port mappings.
1914 pub ports: Vec<zlayer_spec::PortMapping>,
1915}
1916
1917/// Service manager for multiple services
1918pub struct ServiceManager {
1919 runtime: Arc<dyn Runtime + Send + Sync>,
1920 services: tokio::sync::RwLock<std::collections::HashMap<String, ServiceInstance>>,
1921 scale_semaphore: Arc<Semaphore>,
1922 /// Overlay network manager for container networking
1923 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1924 /// Stream registry for L4 proxy route registration (TCP/UDP)
1925 stream_registry: Option<Arc<StreamRegistry>>,
1926 /// Proxy manager for health-aware load balancing (hyper-based proxy)
1927 proxy_manager: Option<Arc<ProxyManager>>,
1928 /// DNS server for service discovery
1929 dns_server: Option<Arc<DnsServer>>,
1930 /// Container-injectable overlay resolver IP. When set, new service
1931 /// instances inject `<ip>` into their `spec.dns` so containers resolve
1932 /// through the overlay DNS server (bound on `<ip>:53`) rather than the
1933 /// hijacked host resolv.conf.
1934 container_dns: Option<IpAddr>,
1935 /// Deployment name (used for generating hostnames)
1936 deployment_name: Option<String>,
1937 /// Health states for dependency condition checking
1938 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
1939 /// Job executor for run-to-completion workloads
1940 job_executor: Option<Arc<JobExecutor>>,
1941 /// Cron scheduler for time-based job triggers
1942 cron_scheduler: Option<Arc<CronScheduler>>,
1943 /// Container supervisor for crash/panic policy enforcement
1944 container_supervisor: Option<Arc<ContainerSupervisor>>,
1945 /// Cluster membership + dispatch handle. When `None`, scale operations
1946 /// run purely local (single-node mode). When `Some`, `scale_service`
1947 /// routes through the cluster (leader dispatches to peers; followers
1948 /// forward to the leader).
1949 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1950 /// Whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
1951 /// `NodeConfig.ingress`). Threaded onto each `ServiceInstance` so external
1952 /// service domains resolve to this node's overlay IP when it is the funnel.
1953 /// Defaults to `false`; set by the daemon from `NodeConfig.ingress`.
1954 ingress: bool,
1955 /// Sink for persisting/revoking per-container scoped tokens. `None`
1956 /// disables persistence (token minted without a `jti`, not revocable).
1957 token_sink: Option<Arc<dyn crate::auth::ContainerTokenSink>>,
1958 /// Sink that persists each service's resolved image digest into the
1959 /// deployment store after a successful pull (threaded onto every
1960 /// `ServiceInstance`). `None` disables digest persistence.
1961 digest_sink: Option<Arc<dyn crate::auth::DeploymentDigestSink>>,
1962 /// Registry credential store threaded onto every `ServiceInstance` so the
1963 /// supervisor can resolve `zlayer login` credentials + `~/.docker/config.json`
1964 /// for a service's image on recreate (restore / drift / scale). `None` =
1965 /// not wired (standalone) → anonymous / docker-config pulls, as before.
1966 registry_store: Option<RegistryCredentialStoreHandle>,
1967}
1968
1969// ---------------------------------------------------------------------------
1970// ServiceManagerBuilder
1971// ---------------------------------------------------------------------------
1972
1973/// Builder for constructing a [`ServiceManager`] with optional subsystems.
1974///
1975/// Prefer using `ServiceManager::builder(runtime)` to start building.
1976///
1977/// # Example
1978///
1979/// ```ignore
1980/// let manager = ServiceManager::builder(runtime)
1981/// .overlay_manager(om)
1982/// .proxy_manager(proxy)
1983/// .deployment_name("prod")
1984/// .build();
1985/// ```
1986pub struct ServiceManagerBuilder {
1987 runtime: Arc<dyn Runtime + Send + Sync>,
1988 overlay_manager: Option<Arc<RwLock<OverlayManager>>>,
1989 proxy_manager: Option<Arc<ProxyManager>>,
1990 stream_registry: Option<Arc<StreamRegistry>>,
1991 dns_server: Option<Arc<DnsServer>>,
1992 container_dns: Option<IpAddr>,
1993 deployment_name: Option<String>,
1994 job_executor: Option<Arc<JobExecutor>>,
1995 cron_scheduler: Option<Arc<CronScheduler>>,
1996 container_supervisor: Option<Arc<ContainerSupervisor>>,
1997 cluster: Option<Arc<dyn zlayer_scheduler::cluster::Cluster>>,
1998}
1999
2000impl ServiceManagerBuilder {
2001 /// Create a new builder with the required runtime.
2002 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
2003 Self {
2004 runtime,
2005 overlay_manager: None,
2006 proxy_manager: None,
2007 stream_registry: None,
2008 dns_server: None,
2009 container_dns: None,
2010 deployment_name: None,
2011 job_executor: None,
2012 cron_scheduler: None,
2013 container_supervisor: None,
2014 cluster: None,
2015 }
2016 }
2017
2018 /// Set the overlay network manager for container networking.
2019 #[must_use]
2020 pub fn overlay_manager(mut self, om: Arc<RwLock<OverlayManager>>) -> Self {
2021 self.overlay_manager = Some(om);
2022 self
2023 }
2024
2025 /// Set the proxy manager for health-aware load balancing.
2026 #[must_use]
2027 pub fn proxy_manager(mut self, pm: Arc<ProxyManager>) -> Self {
2028 self.proxy_manager = Some(pm);
2029 self
2030 }
2031
2032 /// Set the stream registry for TCP/UDP L4 proxy route registration.
2033 #[must_use]
2034 pub fn stream_registry(mut self, sr: Arc<StreamRegistry>) -> Self {
2035 self.stream_registry = Some(sr);
2036 self
2037 }
2038
2039 /// Set the DNS server for service discovery.
2040 #[must_use]
2041 pub fn dns_server(mut self, dns: Arc<DnsServer>) -> Self {
2042 self.dns_server = Some(dns);
2043 self
2044 }
2045
2046 /// Set the container-injectable overlay resolver IP.
2047 ///
2048 /// The daemon passes the IP it bound the overlay DNS server on at port 53
2049 /// (see `daemon.rs` Phase 4). New service instances inject it into
2050 /// `spec.dns` so containers resolve through the overlay instead of the
2051 /// hijacked host resolv.conf. The port is implicitly 53 (resolv.conf has no
2052 /// port syntax), which is why only the bare IP is threaded here.
2053 #[must_use]
2054 pub fn container_dns(mut self, ip: IpAddr) -> Self {
2055 self.container_dns = Some(ip);
2056 self
2057 }
2058
2059 /// Set the deployment name (used for hostname generation).
2060 #[must_use]
2061 pub fn deployment_name(mut self, name: impl Into<String>) -> Self {
2062 self.deployment_name = Some(name.into());
2063 self
2064 }
2065
2066 /// Set the job executor for run-to-completion workloads.
2067 #[must_use]
2068 pub fn job_executor(mut self, je: Arc<JobExecutor>) -> Self {
2069 self.job_executor = Some(je);
2070 self
2071 }
2072
2073 /// Set the cron scheduler for time-based job triggers.
2074 #[must_use]
2075 pub fn cron_scheduler(mut self, cs: Arc<CronScheduler>) -> Self {
2076 self.cron_scheduler = Some(cs);
2077 self
2078 }
2079
2080 /// Set the container supervisor for crash/panic policy enforcement.
2081 #[must_use]
2082 pub fn container_supervisor(mut self, cs: Arc<ContainerSupervisor>) -> Self {
2083 self.container_supervisor = Some(cs);
2084 self
2085 }
2086
2087 /// Set the cluster membership + dispatch handle. When set,
2088 /// [`ServiceManager::scale_service`] will route through the cluster
2089 /// (leader dispatches to peers; followers forward to the leader).
2090 /// When unset (the default), scale operations remain local-only.
2091 #[must_use]
2092 pub fn cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
2093 self.cluster = Some(cluster);
2094 self
2095 }
2096
2097 /// Consume the builder and produce a fully-wired [`ServiceManager`].
2098 ///
2099 /// Logs warnings for missing recommended subsystems (proxy,
2100 /// `stream_registry`, `container_supervisor`, `deployment_name`).
2101 pub fn build(self) -> ServiceManager {
2102 if self.proxy_manager.is_none() {
2103 tracing::warn!("ServiceManager built without proxy_manager");
2104 }
2105 if self.stream_registry.is_none() {
2106 tracing::warn!("ServiceManager built without stream_registry");
2107 }
2108 if self.container_supervisor.is_none() {
2109 tracing::warn!("ServiceManager built without container_supervisor");
2110 }
2111 if self.deployment_name.is_none() {
2112 tracing::warn!("ServiceManager built without deployment_name");
2113 }
2114
2115 ServiceManager {
2116 runtime: self.runtime,
2117 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2118 scale_semaphore: Arc::new(Semaphore::new(10)),
2119 overlay_manager: self.overlay_manager,
2120 stream_registry: self.stream_registry,
2121 proxy_manager: self.proxy_manager,
2122 dns_server: self.dns_server,
2123 container_dns: self.container_dns,
2124 deployment_name: self.deployment_name,
2125 health_states: Arc::new(RwLock::new(HashMap::new())),
2126 job_executor: self.job_executor,
2127 cron_scheduler: self.cron_scheduler,
2128 container_supervisor: self.container_supervisor,
2129 cluster: self.cluster,
2130 ingress: false,
2131 token_sink: None,
2132 digest_sink: None,
2133 registry_store: None,
2134 }
2135 }
2136}
2137
2138impl ServiceManager {
2139 /// Create a [`ServiceManagerBuilder`] for constructing a `ServiceManager`.
2140 ///
2141 /// This is the preferred way to construct a `ServiceManager` since v0.2.0.
2142 ///
2143 /// # Example
2144 ///
2145 /// ```ignore
2146 /// let manager = ServiceManager::builder(runtime)
2147 /// .overlay_manager(om)
2148 /// .proxy_manager(proxy)
2149 /// .build();
2150 /// ```
2151 pub fn builder(runtime: Arc<dyn Runtime + Send + Sync>) -> ServiceManagerBuilder {
2152 ServiceManagerBuilder::new(runtime)
2153 }
2154
2155 /// Create a new service manager
2156 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2157 pub fn new(runtime: Arc<dyn Runtime + Send + Sync>) -> Self {
2158 Self {
2159 runtime,
2160 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2161 scale_semaphore: Arc::new(Semaphore::new(10)), // Max 10 concurrent scaling operations
2162 overlay_manager: None,
2163 stream_registry: None,
2164 proxy_manager: None,
2165 dns_server: None,
2166 container_dns: None,
2167 deployment_name: None,
2168 health_states: Arc::new(RwLock::new(HashMap::new())),
2169 job_executor: None,
2170 cron_scheduler: None,
2171 container_supervisor: None,
2172 cluster: None,
2173 ingress: false,
2174 token_sink: None,
2175 digest_sink: None,
2176 registry_store: None,
2177 }
2178 }
2179
2180 /// Create a service manager with overlay network support
2181 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2182 pub fn with_overlay(
2183 runtime: Arc<dyn Runtime + Send + Sync>,
2184 overlay_manager: Arc<RwLock<OverlayManager>>,
2185 ) -> Self {
2186 Self {
2187 runtime,
2188 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2189 scale_semaphore: Arc::new(Semaphore::new(10)),
2190 overlay_manager: Some(overlay_manager),
2191 stream_registry: None,
2192 proxy_manager: None,
2193 dns_server: None,
2194 container_dns: None,
2195 deployment_name: None,
2196 health_states: Arc::new(RwLock::new(HashMap::new())),
2197 job_executor: None,
2198 cron_scheduler: None,
2199 container_supervisor: None,
2200 cluster: None,
2201 ingress: false,
2202 token_sink: None,
2203 digest_sink: None,
2204 registry_store: None,
2205 }
2206 }
2207
2208 /// Create a fully-configured service manager with overlay and proxy support
2209 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2210 pub fn with_full_config(
2211 runtime: Arc<dyn Runtime + Send + Sync>,
2212 overlay_manager: Arc<RwLock<OverlayManager>>,
2213 deployment_name: String,
2214 ) -> Self {
2215 Self {
2216 runtime,
2217 services: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2218 scale_semaphore: Arc::new(Semaphore::new(10)),
2219 overlay_manager: Some(overlay_manager),
2220 stream_registry: None,
2221 proxy_manager: None,
2222 dns_server: None,
2223 container_dns: None,
2224 deployment_name: Some(deployment_name),
2225 health_states: Arc::new(RwLock::new(HashMap::new())),
2226 job_executor: None,
2227 cron_scheduler: None,
2228 container_supervisor: None,
2229 cluster: None,
2230 ingress: false,
2231 token_sink: None,
2232 digest_sink: None,
2233 registry_store: None,
2234 }
2235 }
2236
2237 /// Get the health states map for external monitoring
2238 pub fn health_states(&self) -> Arc<RwLock<HashMap<String, HealthState>>> {
2239 Arc::clone(&self.health_states)
2240 }
2241
2242 /// Update health state for a service
2243 pub async fn update_health_state(&self, service_name: &str, state: HealthState) {
2244 let mut states = self.health_states.write().await;
2245 states.insert(service_name.to_string(), state);
2246 }
2247
2248 /// Set the deployment name (used for generating hostnames)
2249 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2250 pub fn set_deployment_name(&mut self, name: String) {
2251 self.deployment_name = Some(name);
2252 }
2253
2254 /// Set the stream registry for L4 proxy integration (TCP/UDP)
2255 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2256 pub fn set_stream_registry(&mut self, registry: Arc<StreamRegistry>) {
2257 self.stream_registry = Some(registry);
2258 }
2259
2260 /// Builder pattern: add stream registry for L4 proxy integration
2261 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2262 #[must_use]
2263 pub fn with_stream_registry(mut self, registry: Arc<StreamRegistry>) -> Self {
2264 self.stream_registry = Some(registry);
2265 self
2266 }
2267
2268 /// Get the stream registry (if configured)
2269 pub fn stream_registry(&self) -> Option<&Arc<StreamRegistry>> {
2270 self.stream_registry.as_ref()
2271 }
2272
2273 /// Set the overlay manager for container networking
2274 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2275 pub fn set_overlay_manager(&mut self, manager: Arc<RwLock<OverlayManager>>) {
2276 self.overlay_manager = Some(manager);
2277 }
2278
2279 /// Set the sink used to persist/revoke per-container scoped tokens.
2280 pub fn set_token_sink(&mut self, sink: Arc<dyn crate::auth::ContainerTokenSink>) {
2281 self.token_sink = Some(sink);
2282 }
2283
2284 /// Set the sink that persists each service's resolved image digest into the
2285 /// deployment store after a successful pull. Threaded onto every
2286 /// `ServiceInstance` created via `upsert_service`.
2287 pub fn set_digest_sink(&mut self, sink: Arc<dyn crate::auth::DeploymentDigestSink>) {
2288 self.digest_sink = Some(sink);
2289 }
2290
2291 /// Set the registry credential store threaded onto every `ServiceInstance`
2292 /// so the supervisor resolves `zlayer login` credentials +
2293 /// `~/.docker/config.json` for a service's image on recreate (restore /
2294 /// drift / scale). Mirrors the store wired into the API create/pull handlers.
2295 pub fn set_registry_store(&mut self, store: RegistryCredentialStoreHandle) {
2296 self.registry_store = Some(store);
2297 }
2298
2299 /// Pin a service's restore digest so its next pull resolves strictly from
2300 /// local layers (see [`ServiceInstance::set_restore_pin`]). Best-effort: a
2301 /// no-op when the service is not (yet) registered. Called by the restore
2302 /// path after `upsert_service` and before scaling.
2303 pub async fn set_service_restore_pin(&self, service: &str, digest: Option<String>) {
2304 if let Some(inst) = self.services.read().await.get(service) {
2305 inst.set_restore_pin(digest).await;
2306 }
2307 }
2308
2309 /// Set the proxy manager for health-aware load balancing
2310 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2311 pub fn set_proxy_manager(&mut self, proxy: Arc<ProxyManager>) {
2312 self.proxy_manager = Some(proxy);
2313 }
2314
2315 /// Builder pattern: add proxy manager for health-aware load balancing
2316 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2317 #[must_use]
2318 pub fn with_proxy_manager(mut self, proxy: Arc<ProxyManager>) -> Self {
2319 self.proxy_manager = Some(proxy);
2320 self
2321 }
2322
2323 /// Get the proxy manager (if configured)
2324 pub fn proxy_manager(&self) -> Option<&Arc<ProxyManager>> {
2325 self.proxy_manager.as_ref()
2326 }
2327
2328 /// Set the DNS server for service discovery
2329 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2330 pub fn set_dns_server(&mut self, dns: Arc<DnsServer>) {
2331 self.dns_server = Some(dns);
2332 }
2333
2334 /// Builder pattern: add DNS server for service discovery
2335 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2336 #[must_use]
2337 pub fn with_dns_server(mut self, dns: Arc<DnsServer>) -> Self {
2338 self.dns_server = Some(dns);
2339 self
2340 }
2341
2342 /// Get the DNS server (if configured)
2343 pub fn dns_server(&self) -> Option<&Arc<DnsServer>> {
2344 self.dns_server.as_ref()
2345 }
2346
2347 /// Set the job executor for run-to-completion workloads
2348 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2349 pub fn set_job_executor(&mut self, executor: Arc<JobExecutor>) {
2350 self.job_executor = Some(executor);
2351 }
2352
2353 /// Set the cron scheduler for time-based job triggers
2354 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2355 pub fn set_cron_scheduler(&mut self, scheduler: Arc<CronScheduler>) {
2356 self.cron_scheduler = Some(scheduler);
2357 }
2358
2359 /// Builder pattern: add job executor
2360 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2361 #[must_use]
2362 pub fn with_job_executor(mut self, executor: Arc<JobExecutor>) -> Self {
2363 self.job_executor = Some(executor);
2364 self
2365 }
2366
2367 /// Builder pattern: add cron scheduler
2368 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2369 #[must_use]
2370 pub fn with_cron_scheduler(mut self, scheduler: Arc<CronScheduler>) -> Self {
2371 self.cron_scheduler = Some(scheduler);
2372 self
2373 }
2374
2375 /// Set the cluster handle for cluster-aware scaling.
2376 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2377 pub fn set_cluster(&mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) {
2378 self.cluster = Some(cluster);
2379 }
2380
2381 /// Builder pattern: add a cluster handle for cluster-aware scaling.
2382 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2383 #[must_use]
2384 pub fn with_cluster(mut self, cluster: Arc<dyn zlayer_scheduler::cluster::Cluster>) -> Self {
2385 self.cluster = Some(cluster);
2386 self
2387 }
2388
2389 /// Get the cluster handle (if configured).
2390 pub fn cluster(&self) -> Option<&Arc<dyn zlayer_scheduler::cluster::Cluster>> {
2391 self.cluster.as_ref()
2392 }
2393
2394 /// Set whether THIS node holds the standing HTTP/HTTPS ingress (mirrors
2395 /// `NodeConfig.ingress`). Threaded onto each new `ServiceInstance` so its
2396 /// external service domains resolve to this node's overlay IP when it is
2397 /// the funnel. Defaults to `false`.
2398 pub fn set_ingress(&mut self, enabled: bool) {
2399 self.ingress = enabled;
2400 }
2401
2402 /// Whether THIS node holds the standing HTTP/HTTPS ingress.
2403 #[must_use]
2404 pub fn ingress(&self) -> bool {
2405 self.ingress
2406 }
2407
2408 /// Get the job executor (if configured)
2409 pub fn job_executor(&self) -> Option<&Arc<JobExecutor>> {
2410 self.job_executor.as_ref()
2411 }
2412
2413 /// Get the cron scheduler (if configured)
2414 pub fn cron_scheduler(&self) -> Option<&Arc<CronScheduler>> {
2415 self.cron_scheduler.as_ref()
2416 }
2417
2418 /// Set the container supervisor for crash/panic policy enforcement
2419 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2420 pub fn set_container_supervisor(&mut self, supervisor: Arc<ContainerSupervisor>) {
2421 self.container_supervisor = Some(supervisor);
2422 }
2423
2424 /// Builder pattern: add container supervisor
2425 #[deprecated(since = "0.2.0", note = "use ServiceManager::builder() instead")]
2426 #[must_use]
2427 pub fn with_container_supervisor(mut self, supervisor: Arc<ContainerSupervisor>) -> Self {
2428 self.container_supervisor = Some(supervisor);
2429 self
2430 }
2431
2432 /// Get the container supervisor (if configured)
2433 pub fn container_supervisor(&self) -> Option<&Arc<ContainerSupervisor>> {
2434 self.container_supervisor.as_ref()
2435 }
2436
2437 /// Start the container supervisor background task
2438 ///
2439 /// This spawns a background task that monitors containers for crashes
2440 /// and enforces the `on_panic` error policy.
2441 ///
2442 /// # Errors
2443 /// Returns an error if no container supervisor is configured.
2444 ///
2445 /// # Returns
2446 /// A `JoinHandle` for the supervisor task.
2447 pub fn start_container_supervisor(&self) -> Result<tokio::task::JoinHandle<()>> {
2448 let supervisor = self.container_supervisor.as_ref().ok_or_else(|| {
2449 AgentError::Configuration("Container supervisor not configured".to_string())
2450 })?;
2451
2452 let supervisor = Arc::clone(supervisor);
2453 Ok(tokio::spawn(async move {
2454 supervisor.run_loop().await;
2455 }))
2456 }
2457
2458 /// Shutdown the container supervisor
2459 pub fn shutdown_container_supervisor(&self) {
2460 if let Some(supervisor) = &self.container_supervisor {
2461 supervisor.shutdown();
2462 }
2463 }
2464
2465 /// Recorded init PID for `container_id` under `service_name`, if the instance
2466 /// still tracks it. This is the PID captured AT START (see the `pid` field on
2467 /// [`Container`]), so it survives the container's death — letting the overlay
2468 /// detach delete the host-side veth (`veth-<pid>-*`) by name even after the
2469 /// process is gone ("process id or not, kill the adapter").
2470 async fn recorded_container_pid(
2471 &self,
2472 service_name: &str,
2473 container_id: &ContainerId,
2474 ) -> Option<u32> {
2475 let services = self.services.read().await;
2476 let instance = services.get(service_name)?;
2477 let containers = instance.containers().read().await;
2478 containers.get(container_id).and_then(|c| c.pid)
2479 }
2480
2481 /// Spawn the container-down → overlay-detach bridge.
2482 ///
2483 /// The container supervisor (`run_loop`) detects a container going DOWN
2484 /// (exit/crash/crash-loop/isolate/service-shutdown) and emits a
2485 /// [`SupervisorEvent`]. Nothing consumed that stream before, so a container
2486 /// that died on its own — rather than via an explicit scale-down/delete —
2487 /// left its host-side veth (`veth-<pid>-*`) and its allocated overlay IP
2488 /// dangling until the whole service was torn down (or the PID-keyed periodic
2489 /// sweep eventually fired). This task drains the stream and, for every
2490 /// down-event, detaches the container's overlay attachment by the PID
2491 /// recorded AT START — so the veth is deleted and the IP released the moment
2492 /// the container goes down, regardless of restart policy.
2493 ///
2494 /// No-op (returns `None`) when no supervisor or no overlay manager is wired.
2495 /// Best-effort: detach failures are logged, never propagated.
2496 ///
2497 /// Takes the manager as `&Arc<RwLock<Self>>` (its post-`init_daemon` home)
2498 /// rather than `&Arc<Self>`: `serve()` calls `Arc::try_unwrap` on the
2499 /// `Arc<ServiceManager>` to re-wrap it in the `RwLock`, which REQUIRES sole
2500 /// ownership — so the detach task must NOT hold a strong `Arc<ServiceManager>`
2501 /// clone (that bumped the refcount and made `try_unwrap` fail, aborting
2502 /// daemon startup). Holding a clone of the shared `Arc<RwLock<Self>>` is
2503 /// safe; that Arc is meant to be shared (router, internal state, etc.).
2504 pub async fn spawn_overlay_detach_on_exit(
2505 sm: &Arc<RwLock<Self>>,
2506 ) -> Option<tokio::task::JoinHandle<()>> {
2507 let (overlay, mut events) = {
2508 let guard = sm.read().await;
2509 let overlay = guard.overlay_manager.as_ref().map(Arc::clone)?;
2510 let events = guard.take_supervisor_events().await?;
2511 (overlay, events)
2512 };
2513 let sm = Arc::clone(sm);
2514
2515 Some(tokio::spawn(async move {
2516 while let Some(event) = events.recv().await {
2517 // Every variant signals the container went DOWN; on
2518 // `ContainerRestarted` the supervisor already re-`start`ed the
2519 // SAME container, which gets a NEW PID + a NEW veth — so deleting
2520 // the OLD PID's veth/IP here is correct and never races the live
2521 // one (names are PID-scoped).
2522 let (service_name, container_id) = match &event {
2523 SupervisorEvent::ContainerRestarted {
2524 id, service_name, ..
2525 }
2526 | SupervisorEvent::CrashLoopBackOff {
2527 id, service_name, ..
2528 }
2529 | SupervisorEvent::ContainerIsolated {
2530 id, service_name, ..
2531 }
2532 | SupervisorEvent::ServiceShutdown {
2533 id, service_name, ..
2534 }
2535 | SupervisorEvent::ContainerCompleted { id, service_name } => {
2536 (service_name.clone(), id.clone())
2537 }
2538 };
2539
2540 // Grab the recorded PID + a clone of the token sink in one read
2541 // guard, then drop the guard before any await on revoke.
2542 let (pid, token_sink) = {
2543 let guard = sm.read().await;
2544 let pid = guard
2545 .recorded_container_pid(&service_name, &container_id)
2546 .await;
2547 (pid, guard.token_sink.as_ref().map(Arc::clone))
2548 };
2549
2550 // Revoke the container's scoped token (best-effort). The jti is
2551 // the deterministic `container:<service>:<service>-<replica>`
2552 // string the runtime minted under — `container_id` is already
2553 // the `{service}-{replica}` form here.
2554 if let Some(sink) = token_sink {
2555 sink.revoke(&format!("container:{service_name}:{container_id}"))
2556 .await;
2557 }
2558
2559 let Some(pid) = pid else {
2560 tracing::debug!(
2561 service = %service_name,
2562 container = %container_id,
2563 "container-down detach: no recorded PID (never attached or already cleaned up)"
2564 );
2565 continue;
2566 };
2567
2568 let overlay_guard = overlay.read().await;
2569 if let Err(e) = overlay_guard.detach_container(pid).await {
2570 tracing::warn!(
2571 service = %service_name,
2572 container = %container_id,
2573 pid,
2574 error = %e,
2575 "container-down overlay detach failed (veth may linger until periodic sweep)"
2576 );
2577 } else {
2578 tracing::info!(
2579 service = %service_name,
2580 container = %container_id,
2581 pid,
2582 "detached overlay for downed container (deleted veth + released IP)"
2583 );
2584 }
2585 }
2586 tracing::debug!("supervisor event stream closed; overlay-detach bridge exiting");
2587 }))
2588 }
2589
2590 /// Get the supervised state of a container
2591 pub async fn get_container_supervised_state(
2592 &self,
2593 container_id: &ContainerId,
2594 ) -> Option<SupervisedState> {
2595 if let Some(supervisor) = &self.container_supervisor {
2596 supervisor.get_state(container_id).await
2597 } else {
2598 None
2599 }
2600 }
2601
2602 /// Get supervisor events receiver
2603 ///
2604 /// Note: This can only be called once; the receiver is moved to the caller.
2605 pub async fn take_supervisor_events(
2606 &self,
2607 ) -> Option<tokio::sync::mpsc::Receiver<SupervisorEvent>> {
2608 if let Some(supervisor) = &self.container_supervisor {
2609 supervisor.take_event_receiver().await
2610 } else {
2611 None
2612 }
2613 }
2614
2615 // ==================== Dependency Orchestration ====================
2616
2617 /// Deploy all swarm-sharded services in `services` as coordinated gangs.
2618 ///
2619 /// Partitions the input by `gpu.sharding.swarm_id` (via [`swarm_id_of`]) and,
2620 /// for each swarm group, registers every member (`upsert_service`) and then
2621 /// places the whole group together. On the cluster **leader** this routes to
2622 /// [`Cluster::dispatch_swarm_scale`], which runs `place_swarm`'s all-or-nothing
2623 /// gang placement and pins each member to its chosen node. On a follower, or
2624 /// with no cluster handle, it falls back to per-member [`Self::scale_service`]
2625 /// (which forwards to the leader / scales locally respectively).
2626 ///
2627 /// **Gang co-location is only guaranteed on the leader path** — the normal
2628 /// single-root-node case. A follower-initiated deploy forwards each member
2629 /// independently (the leader's `/internal/scale` handler scales each member
2630 /// locally, bypassing gang placement), so co-location is best-effort there.
2631 ///
2632 /// Returns the set of service names handled here so the caller can skip them
2633 /// in the per-service deploy loop.
2634 ///
2635 /// # Errors
2636 /// Returns an error if a member fails to register or the gang fails to place.
2637 async fn deploy_swarm_groups(
2638 &self,
2639 services: &HashMap<String, ServiceSpec>,
2640 ) -> Result<std::collections::HashSet<String>> {
2641 let groups = partition_swarm_groups(services);
2642 let mut handled: std::collections::HashSet<String> = std::collections::HashSet::new();
2643
2644 for (swarm_id, mut members) in groups {
2645 // Deterministic order so logs / dispatch are stable across runs.
2646 members.sort_by(|a, b| a.0.cmp(&b.0));
2647
2648 tracing::info!(
2649 swarm_id = %swarm_id,
2650 member_count = members.len(),
2651 members = ?members.iter().map(|(n, _)| n.as_str()).collect::<Vec<_>>(),
2652 "Deploying inference swarm as a coordinated gang"
2653 );
2654
2655 // Register every member spec FIRST so the placement inputs and any
2656 // receiving node can see the service before the gang is dispatched.
2657 for (name, spec) in &members {
2658 Box::pin(self.upsert_service(name.clone(), spec.clone())).await?;
2659 handled.insert(name.clone());
2660 }
2661
2662 // Build the (name, spec, replicas) tuples the cluster dispatch wants.
2663 // Swarm members are one replica each; honor the spec's own intent.
2664 let member_reqs: Vec<(String, ServiceSpec, u32)> = members
2665 .iter()
2666 .map(|(name, spec)| {
2667 let replicas = swarm_member_replicas(spec);
2668 (name.clone(), spec.clone(), replicas)
2669 })
2670 .collect();
2671
2672 if let Some(cluster) = &self.cluster {
2673 if cluster.is_leader().await {
2674 // Leader: run the real gang placement (all-or-nothing).
2675 cluster
2676 .dispatch_swarm_scale(&member_reqs)
2677 .await
2678 .map_err(|e| AgentError::CreateFailed {
2679 id: swarm_id.clone(),
2680 reason: format!("swarm gang dispatch: {e}"),
2681 })?;
2682 } else {
2683 // Follower: forward each member to the leader independently
2684 // (best-effort co-location — see method docs).
2685 for (name, _spec, replicas) in &member_reqs {
2686 self.scale_service(name, *replicas).await?;
2687 }
2688 }
2689 } else {
2690 // No cluster handle (single-node, no Cluster wired): scale each
2691 // member locally. Co-location is trivial — one node.
2692 for (name, _spec, replicas) in &member_reqs {
2693 self.scale_service(name, *replicas).await?;
2694 }
2695 }
2696
2697 // Mark every member started in health states.
2698 for (name, _) in &members {
2699 self.update_health_state(name, HealthState::Unknown).await;
2700 }
2701 }
2702
2703 Ok(handled)
2704 }
2705
2706 /// Deploy multiple services respecting their dependency order
2707 ///
2708 /// This method:
2709 /// 1. Builds a dependency graph from the services
2710 /// 2. Validates no cycles exist
2711 /// 3. Computes topological order (services with no deps first)
2712 /// 4. For each service in order, waits for dependencies then starts the service
2713 ///
2714 /// # Arguments
2715 /// * `services` - Map of service name to service specification
2716 ///
2717 /// # Errors
2718 /// - Returns `AgentError::InvalidSpec` if there are cyclic dependencies
2719 /// - Returns `AgentError::DependencyTimeout` if a dependency times out with `on_timeout`: fail
2720 pub async fn deploy_with_dependencies(
2721 &self,
2722 services: HashMap<String, ServiceSpec>,
2723 ) -> Result<()> {
2724 if services.is_empty() {
2725 return Ok(());
2726 }
2727
2728 // Swarm pre-pass: any service whose spec carries `gpu.sharding` is part
2729 // of an inference swarm and must be placed as a coordinated GANG (the
2730 // whole pipeline co-located, all-or-nothing), NOT one-service-at-a-time
2731 // through the normal scale path. Handle every such service here and skip
2732 // them in the per-service loop below so non-swarm services are entirely
2733 // unchanged. (Swarm members are single-replica units placed together;
2734 // dependency ordering within a gang is the ring order `place_swarm`
2735 // imposes, not the per-service `depends` graph.)
2736 let swarm_handled = self.deploy_swarm_groups(&services).await?;
2737
2738 // Build dependency graph
2739 let graph = DependencyGraph::build(&services)?;
2740
2741 tracing::info!(
2742 service_count = services.len(),
2743 swarm_handled = swarm_handled.len(),
2744 "Starting deployment with dependency ordering"
2745 );
2746
2747 // Get startup order
2748 let order = graph.startup_order();
2749 tracing::debug!(order = ?order, "Computed startup order");
2750
2751 // Start services in dependency order
2752 for service_name in order {
2753 // Swarm-sharded services were already deployed as a gang above.
2754 if swarm_handled.contains(service_name) {
2755 continue;
2756 }
2757
2758 let service_spec = services
2759 .get(service_name)
2760 .ok_or_else(|| AgentError::Internal(format!("Service {service_name} not found")))?;
2761
2762 // Wait for dependencies first
2763 if !service_spec.depends.is_empty() {
2764 tracing::info!(
2765 service = %service_name,
2766 dependency_count = service_spec.depends.len(),
2767 "Waiting for dependencies"
2768 );
2769 self.wait_for_dependencies(service_name, &service_spec.depends)
2770 .await?;
2771 }
2772
2773 // Register and start service
2774 tracing::info!(service = %service_name, "Starting service");
2775 Box::pin(self.upsert_service(service_name.clone(), service_spec.clone())).await?;
2776
2777 // Get the desired replica count from scale config
2778 let replicas = match &service_spec.scale {
2779 zlayer_spec::ScaleSpec::Fixed { replicas } => *replicas,
2780 zlayer_spec::ScaleSpec::Adaptive { min, .. } => *min, // Start with min replicas
2781 zlayer_spec::ScaleSpec::Manual => 1, // Default to 1 for manual scaling
2782 };
2783 self.scale_service(service_name, replicas).await?;
2784
2785 // Mark service as started in health states (Unknown until health check runs)
2786 self.update_health_state(service_name, HealthState::Unknown)
2787 .await;
2788
2789 tracing::info!(
2790 service = %service_name,
2791 replicas = replicas,
2792 "Service started"
2793 );
2794 }
2795
2796 tracing::info!(service_count = services.len(), "Deployment complete");
2797
2798 Ok(())
2799 }
2800
2801 /// Wait for all dependencies of a service to be satisfied
2802 ///
2803 /// # Arguments
2804 /// * `service` - Name of the service waiting for dependencies
2805 /// * `deps` - Slice of dependency specifications
2806 ///
2807 /// # Errors
2808 /// Returns `AgentError::DependencyTimeout` if any dependency with `on_timeout`: fail times out
2809 async fn wait_for_dependencies(&self, service: &str, deps: &[DependsSpec]) -> Result<()> {
2810 let condition_checker = DependencyConditionChecker::new(
2811 Arc::clone(&self.runtime),
2812 Arc::clone(&self.health_states),
2813 None,
2814 );
2815
2816 let waiter = DependencyWaiter::new(condition_checker);
2817 let results = waiter.wait_for_all(deps).await?;
2818
2819 // Check results for failures
2820 for result in results {
2821 match result {
2822 WaitResult::TimedOutFail {
2823 service: dep_service,
2824 condition,
2825 timeout,
2826 } => {
2827 return Err(AgentError::DependencyTimeout {
2828 service: service.to_string(),
2829 dependency: dep_service,
2830 condition: format!("{condition:?}"),
2831 timeout,
2832 });
2833 }
2834 WaitResult::TimedOutWarn {
2835 service: dep_service,
2836 condition,
2837 } => {
2838 tracing::warn!(
2839 service = %service,
2840 dependency = %dep_service,
2841 condition = ?condition,
2842 "Dependency timed out but continuing"
2843 );
2844 }
2845 WaitResult::TimedOutContinue | WaitResult::Satisfied => {
2846 // Continue silently
2847 }
2848 }
2849 }
2850
2851 Ok(())
2852 }
2853
2854 /// Check if all dependencies for a service are currently satisfied
2855 ///
2856 /// This is a one-shot check (no waiting). Useful for pre-flight validation.
2857 ///
2858 /// # Errors
2859 /// Returns an error if a dependency check fails unexpectedly.
2860 pub async fn check_dependencies(&self, deps: &[DependsSpec]) -> Result<bool> {
2861 let condition_checker = DependencyConditionChecker::new(
2862 Arc::clone(&self.runtime),
2863 Arc::clone(&self.health_states),
2864 None,
2865 );
2866
2867 for dep in deps {
2868 if !condition_checker.check(dep).await? {
2869 return Ok(false);
2870 }
2871 }
2872
2873 Ok(true)
2874 }
2875
2876 /// Add or update a workload (service, job, or cron)
2877 ///
2878 /// This method handles different resource types appropriately:
2879 /// - **Service**: Traditional long-running containers with scaling and health checks
2880 /// - **Job**: Run-to-completion workloads triggered on-demand (stores spec for later)
2881 /// - **Cron**: Scheduled run-to-completion workloads (registers with cron scheduler)
2882 ///
2883 /// # Errors
2884 /// Returns an error if service creation, scaling, or cron registration fails.
2885 #[allow(clippy::too_many_lines)]
2886 pub async fn upsert_service(&self, name: String, spec: ServiceSpec) -> Result<()> {
2887 match spec.rtype {
2888 ResourceType::Service => {
2889 // Long-running service: create/update instance
2890 let mut services = self.services.write().await;
2891
2892 if let Some(instance) = services.get_mut(&name) {
2893 // Update existing service. We need to:
2894 // 1. Update the in-memory spec (so future scale-ups use the new image).
2895 // 2. Recreate the local replicas when the image actually changed —
2896 // either a different image *reference* (e.g. tag bump
2897 // 1.28 -> 1.29), which is a new image regardless of pull
2898 // policy, or, under Always/Newer, observed *digest* drift on
2899 // the same reference.
2900 // The recreate is LOCAL (`scale_service_local`): `upsert_service`
2901 // runs on whichever node owns the replicas (the leader for its
2902 // own share, each worker via the `/internal/scale` handler). Using
2903 // the cluster-routed `scale_service` here would bounce a worker's
2904 // recreate back to the leader and re-enter dispatch. Cluster-wide
2905 // distribution is the caller's job (orchestrate_deployment + the
2906 // scale dispatch that carries this spec to every node).
2907 let image_changed = instance.spec.image.name != spec.image.name;
2908 instance.spec = spec.clone();
2909 if let Some(dns) = &self.dns_server {
2910 instance.set_dns_server(Arc::clone(dns));
2911 }
2912 // Re-apply overlay resolver injection: the spec was just
2913 // replaced wholesale, so any prior injection on the old
2914 // spec is gone. Honors host_network / user-supplied dns.
2915 if let Some(ip) = self.container_dns {
2916 instance.set_container_dns(ip);
2917 }
2918
2919 let effective = spec.image.pull_policy;
2920 let old_digest = instance.last_pulled_digest().await;
2921 let current_replicas =
2922 u32::try_from(instance.replica_count().await).unwrap_or(u32::MAX);
2923 drop(services); // Release write lock before pull / scale (which take their own locks).
2924
2925 // A changed image reference always recreates. Same-reference
2926 // refreshes are governed by pull policy + digest drift.
2927 let mut should_recreate = image_changed;
2928 let mut new_digest = old_digest.clone();
2929
2930 match effective {
2931 PullPolicy::Never | PullPolicy::IfNotPresent => {
2932 // No proactive pull. If the reference changed we still
2933 // recreate below; the scale-up path pulls the (absent) new
2934 // image per IfNotPresent. A same-reference redeploy under
2935 // these policies is a genuine no-op.
2936 tracing::debug!(
2937 service = %name,
2938 policy = ?effective,
2939 image_changed,
2940 "re-deploy under no-refresh pull policy"
2941 );
2942 }
2943 PullPolicy::Always | PullPolicy::Newer => {
2944 // Pull (this updates the cached digest as a side-effect).
2945 // We need a read guard to keep the instance alive while
2946 // calling its &self method.
2947 let services_ro = self.services.read().await;
2948 new_digest = if let Some(inst) = services_ro.get(&name) {
2949 inst.pull_and_refresh_digest().await?
2950 } else {
2951 // The service vanished between our write-lock release
2952 // and read-lock acquisition (race with remove_service).
2953 // Treat this as a no-op; the caller will see the removal.
2954 tracing::warn!(
2955 service = %name,
2956 "service removed during upsert; skipping drift recreate"
2957 );
2958 return Ok(());
2959 };
2960 drop(services_ro);
2961
2962 // Always forces a recreate. Newer recreates on digest
2963 // drift. When digests are unknown (runtime doesn't expose
2964 // them), we can't observe drift safely under Newer, so the
2965 // reference check above is the only trigger.
2966 should_recreate = should_recreate
2967 || match effective {
2968 PullPolicy::Always => true,
2969 PullPolicy::Newer => match (&old_digest, &new_digest) {
2970 (Some(old), Some(new)) => old != new,
2971 _ => false,
2972 },
2973 _ => false,
2974 };
2975 }
2976 }
2977
2978 if should_recreate && current_replicas > 0 {
2979 tracing::info!(
2980 service = %name,
2981 policy = ?effective,
2982 image_changed,
2983 old_digest = ?old_digest,
2984 new_digest = ?new_digest,
2985 replicas = current_replicas,
2986 "image changed; performing local rolling recreate"
2987 );
2988 self.scale_service_local(&name, 0).await?;
2989 self.scale_service_local(&name, current_replicas).await?;
2990 tracing::info!(
2991 service = %name,
2992 new_digest = ?new_digest,
2993 "service recreated with refreshed image"
2994 );
2995 } else {
2996 tracing::debug!(
2997 service = %name,
2998 policy = ?effective,
2999 old_digest = ?old_digest,
3000 new_digest = ?new_digest,
3001 "service up to date; no recreate required"
3002 );
3003 }
3004 return Ok(());
3005 }
3006 // Create new service with proxy manager for health-aware load balancing
3007 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
3008 let mut instance = if let Some(proxy) = &self.proxy_manager {
3009 ServiceInstance::with_proxy(
3010 name.clone(),
3011 spec,
3012 self.runtime.clone(),
3013 overlay,
3014 Arc::clone(proxy),
3015 )
3016 } else {
3017 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
3018 };
3019 // Thread the local cluster node id so new `ContainerId`s carry
3020 // owning-node identity. Defaults to `0` in single-node mode.
3021 instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
3022 // Thread ingress capability + cluster handle so external service
3023 // domains resolve to an ingress-capable node's overlay IP.
3024 instance.set_ingress_enabled(self.ingress);
3025 if let Some(cluster) = &self.cluster {
3026 instance.set_cluster(Arc::clone(cluster));
3027 }
3028 // Thread the token sink so the container's scoped token is
3029 // revoked on scale-down / removal.
3030 instance.set_token_sink(self.token_sink.clone());
3031 // Thread the digest sink so a successful pull persists the
3032 // resolved image digest for restore-from-local.
3033 instance.set_digest_sink(self.digest_sink.clone());
3034 // Thread the registry credential store so the supervisor's
3035 // recreate pulls resolve `zlayer login` / docker-config auth.
3036 instance.set_registry_store(self.registry_store.clone());
3037 // Set DNS server if configured
3038 if let Some(dns) = &self.dns_server {
3039 instance.set_dns_server(Arc::clone(dns));
3040 }
3041 // Inject the overlay resolver into the spec so containers use it
3042 // instead of the hijacked host resolv.conf (no-op for
3043 // host_network / user-supplied dns).
3044 if let Some(ip) = self.container_dns {
3045 instance.set_container_dns(ip);
3046 }
3047 // Wire shared health states so callbacks bridge back to ServiceManager
3048 instance.set_health_states(Arc::clone(&self.health_states));
3049 // Register HTTP routes via proxy manager
3050 if let Some(proxy) = &self.proxy_manager {
3051 proxy.add_service(&name, &instance.spec).await;
3052 }
3053 // Register TCP/UDP endpoints in stream registry
3054 if let Some(stream_registry) = &self.stream_registry {
3055 for endpoint in &instance.spec.endpoints {
3056 let svc = StreamService::new(
3057 name.clone(),
3058 Vec::new(), // No backends yet; added on scale-up
3059 );
3060 match endpoint.protocol {
3061 Protocol::Tcp => {
3062 stream_registry.register_tcp(endpoint.port, svc);
3063 tracing::debug!(
3064 service = %name,
3065 port = endpoint.port,
3066 "Registered TCP stream route"
3067 );
3068 }
3069 Protocol::Udp => {
3070 stream_registry.register_udp(endpoint.port, svc);
3071 tracing::debug!(
3072 service = %name,
3073 port = endpoint.port,
3074 "Registered UDP stream route"
3075 );
3076 }
3077 _ => {} // HTTP routes handled by proxy manager
3078 }
3079 }
3080 }
3081 services.insert(name, instance);
3082 }
3083 ResourceType::Job => {
3084 // Job: Just store the spec for later triggering
3085 // Jobs don't start containers immediately; they're triggered on-demand
3086 if let Some(executor) = &self.job_executor {
3087 executor.register_job(&name, spec).await;
3088 tracing::info!(job = %name, "Registered job spec");
3089 } else {
3090 tracing::warn!(
3091 job = %name,
3092 "Job executor not configured, storing as service for reference"
3093 );
3094 // Fallback: store as service instance for reference
3095 let mut services = self.services.write().await;
3096 let overlay = self.overlay_manager.as_ref().map(Arc::clone);
3097 let mut instance = if let Some(proxy) = &self.proxy_manager {
3098 ServiceInstance::with_proxy(
3099 name.clone(),
3100 spec,
3101 self.runtime.clone(),
3102 overlay,
3103 Arc::clone(proxy),
3104 )
3105 } else {
3106 ServiceInstance::new(name.clone(), spec, self.runtime.clone(), overlay)
3107 };
3108 // Thread the local cluster node id (same as the Service
3109 // branch above) so the fallback-as-service Job entry also
3110 // carries owning-node identity.
3111 instance.set_node_id(self.cluster.as_ref().map_or(0, |c| c.node_id()));
3112 // Thread ingress capability + cluster handle (same as the
3113 // Service branch).
3114 instance.set_ingress_enabled(self.ingress);
3115 if let Some(cluster) = &self.cluster {
3116 instance.set_cluster(Arc::clone(cluster));
3117 }
3118 // Thread the token sink so the container's scoped token is
3119 // revoked on scale-down / removal.
3120 instance.set_token_sink(self.token_sink.clone());
3121 // Thread the digest sink so a successful pull persists the
3122 // resolved image digest for restore-from-local.
3123 instance.set_digest_sink(self.digest_sink.clone());
3124 // Thread the registry credential store so the supervisor's
3125 // recreate pulls resolve `zlayer login` / docker-config auth.
3126 instance.set_registry_store(self.registry_store.clone());
3127 // Set DNS server if configured
3128 if let Some(dns) = &self.dns_server {
3129 instance.set_dns_server(Arc::clone(dns));
3130 }
3131 // Inject the overlay resolver (no-op for host_network /
3132 // user-supplied dns).
3133 if let Some(ip) = self.container_dns {
3134 instance.set_container_dns(ip);
3135 }
3136 services.insert(name, instance);
3137 }
3138 }
3139 ResourceType::Cron => {
3140 // Cron: Register with the cron scheduler
3141 if let Some(scheduler) = &self.cron_scheduler {
3142 scheduler.register(&name, &spec).await?;
3143 tracing::info!(cron = %name, "Registered cron job with scheduler");
3144 } else {
3145 return Err(AgentError::Configuration(format!(
3146 "Cron scheduler not configured for cron job '{name}'"
3147 )));
3148 }
3149 }
3150 }
3151
3152 Ok(())
3153 }
3154
3155 /// Update backend addresses via `ProxyManager` after scaling, applying
3156 /// per-endpoint `target_role` filtering.
3157 ///
3158 /// For each L7 endpoint of the service, this collects the subset of
3159 /// containers whose `ContainerId.role` matches `endpoint.target_role`
3160 /// (or all containers when `target_role` is `None`) and updates the
3161 /// proxy's backend pool for that specific endpoint via
3162 /// [`ProxyManager::update_endpoint_backends`].
3163 async fn update_proxy_backends(&self, instance: &ServiceInstance) {
3164 let Some(proxy) = &self.proxy_manager else {
3165 return;
3166 };
3167 for endpoint in &instance.spec.endpoints {
3168 // Only L7 endpoints flow through the proxy (HTTP/HTTPS/WS).
3169 if !matches!(
3170 endpoint.protocol,
3171 Protocol::Http | Protocol::Https | Protocol::Websocket
3172 ) {
3173 continue;
3174 }
3175 let addrs = self.collect_endpoint_backends(instance, endpoint).await;
3176 proxy
3177 .update_endpoint_backends(&instance.service_name, &endpoint.name, addrs)
3178 .await;
3179 }
3180 }
3181
3182 /// Update backend addresses in the `StreamRegistry` for TCP/UDP endpoints after scaling
3183 ///
3184 /// For containers with a port override (macOS sandbox), the addresses already
3185 /// carry the runtime-assigned port. In that case, the container listens on the
3186 /// override port for all traffic, so we use the address port directly. For
3187 /// containers without a port override (Linux, VMs), we reconstruct addresses
3188 /// using the endpoint's declared port, since each container has its own IP
3189 /// and can bind any port independently.
3190 async fn update_stream_backends(&self, instance: &ServiceInstance) {
3191 let Some(stream_registry) = &self.stream_registry else {
3192 return;
3193 };
3194
3195 for endpoint in &instance.spec.endpoints {
3196 match endpoint.protocol {
3197 Protocol::Tcp => {
3198 let tcp_backends = self.collect_endpoint_backends(instance, endpoint).await;
3199 let backend_count = tcp_backends.len();
3200 stream_registry.update_tcp_backends(endpoint.port, tcp_backends);
3201 tracing::debug!(
3202 endpoint = %endpoint.name,
3203 port = endpoint.port,
3204 backend_count = backend_count,
3205 target_role = ?endpoint.target_role,
3206 "Updated TCP stream backends"
3207 );
3208 }
3209 Protocol::Udp => {
3210 let udp_backends = self.collect_endpoint_backends(instance, endpoint).await;
3211 let backend_count = udp_backends.len();
3212 stream_registry.update_udp_backends(endpoint.port, udp_backends);
3213 tracing::debug!(
3214 endpoint = %endpoint.name,
3215 port = endpoint.port,
3216 backend_count = backend_count,
3217 target_role = ?endpoint.target_role,
3218 "Updated UDP stream backends"
3219 );
3220 }
3221 _ => {} // HTTP endpoints handled by update_proxy_backends
3222 }
3223 }
3224 }
3225
3226 /// Scale a service. Cluster-aware: if this node has a `Cluster` handle
3227 /// and we're not the leader, forward to the leader; if leader, compute
3228 /// affinity-aware placement and dispatch each node its share via
3229 /// `dispatch_scale_distributed`; else (single-node) just scale locally.
3230 ///
3231 /// # Errors
3232 /// Returns an error if scaling fails on any participating node.
3233 #[allow(clippy::cast_possible_truncation)]
3234 pub async fn scale_service(&self, name: &str, replicas: u32) -> Result<()> {
3235 use zlayer_scheduler::cluster::InternalScaleRequest;
3236
3237 tracing::info!(
3238 target: "zlayer::scale_distribute",
3239 service = name,
3240 replicas,
3241 has_cluster = self.cluster.is_some(),
3242 "scale_service ENTER"
3243 );
3244
3245 // Attach the current spec so every receiving node can register/update
3246 // the service before scaling. This is what propagates an image change
3247 // to worker containers and lets a fresh worker run a replica it has
3248 // never seen. `None` if the service isn't registered locally (the
3249 // receiver then falls back to its own cached spec).
3250 let spec = self
3251 .services
3252 .read()
3253 .await
3254 .get(name)
3255 .map(|inst| inst.spec.clone());
3256 let build_req = |replicas: u32| {
3257 let req = InternalScaleRequest::new(name, replicas);
3258 match spec.clone() {
3259 Some(s) => req.with_spec(s),
3260 None => req,
3261 }
3262 };
3263
3264 if let Some(cluster) = &self.cluster {
3265 let is_leader = cluster.is_leader().await;
3266 tracing::info!(
3267 target: "zlayer::scale_distribute",
3268 service = name,
3269 replicas,
3270 is_leader,
3271 spec_affinity = ?spec.as_ref().and_then(|s| s.affinity.clone()),
3272 "scale_service: cluster path"
3273 );
3274 if !is_leader {
3275 // Follower: forward to the leader and let it dispatch.
3276 return cluster
3277 .forward_scale(build_req(replicas))
3278 .await
3279 .map_err(|e| AgentError::CreateFailed {
3280 id: name.to_string(),
3281 reason: format!("cluster forward: {e}"),
3282 });
3283 }
3284
3285 // Leader path. Compute affinity-aware placement across the Ready
3286 // node set and dispatch each node its share. `dispatch_scale_distributed`
3287 // reuses the same placement machinery as one-off container placement
3288 // (`cluster_nodes_to_node_states` + `place_service_replicas`), honoring
3289 // `ServiceSpec.affinity` (`spread`/`pack`/`pin`). The leader's own
3290 // share short-circuits to a local call (no localhost HTTP round-trip),
3291 // and the attached spec lets fresh workers register the service before
3292 // scaling. Single-node clusters fall through the default impl, which
3293 // dispatches everything to this node (unchanged behavior).
3294 return cluster
3295 .dispatch_scale_distributed(build_req(replicas))
3296 .await
3297 .map_err(|e| AgentError::CreateFailed {
3298 id: name.to_string(),
3299 reason: format!("cluster dispatch: {e}"),
3300 });
3301 }
3302
3303 // No cluster handle — single-node mode.
3304 self.scale_service_local(name, replicas).await
3305 }
3306
3307 /// Local (single-node) scale: directly creates/destroys containers on
3308 /// this node only. Called by:
3309 /// - `scale_service` in single-node mode (when `self.cluster` is None).
3310 /// - The `/api/v1/internal/scale` handler (which the leader's
3311 /// `Cluster::dispatch_scale` HTTP-POSTs to, bottoming out the
3312 /// recursive loop on each receiving node).
3313 /// - The cluster impls' `local_dispatch` closure (for the leader's own
3314 /// share — short-circuited to avoid a localhost round-trip).
3315 ///
3316 /// # Errors
3317 /// Returns an error if the service is not found or scaling fails.
3318 #[allow(clippy::cast_possible_truncation)]
3319 pub async fn scale_service_local(&self, name: &str, replicas: u32) -> Result<()> {
3320 tracing::info!(
3321 target: "zlayer::scale_distribute",
3322 service = name,
3323 replicas,
3324 "scale_service_local ENTER"
3325 );
3326 let _permit = self.scale_semaphore.acquire().await;
3327
3328 let services = self.services.read().await;
3329 let Some(instance) = services.get(name) else {
3330 // Draining a service this node never hosted is a no-op (e.g. the
3331 // leader fans out `count=0` to a node to drain it during a
3332 // scale-down, but that node never ran the service).
3333 if replicas == 0 {
3334 return Ok(());
3335 }
3336 return Err(AgentError::NotFound {
3337 container: name.to_string(),
3338 reason: "service not found".to_string(),
3339 });
3340 };
3341
3342 // Get current replica count before scaling
3343 let current_replicas = instance.replica_count().await as u32;
3344
3345 // Perform the scaling operation. `Box::pin` keeps the (large)
3346 // per-replica `scale_to` future off this funnel's stack frame so the
3347 // chain of callers above (`scale_service`, the autoscale evaluators,
3348 // `deploy_swarm_groups`) stays under clippy's `large_futures` threshold.
3349 Box::pin(instance.scale_to(replicas)).await?;
3350
3351 // After scaling, update proxy and stream backends for each endpoint.
3352 // Per-endpoint collection (rather than a single service-wide list)
3353 // is what makes `EndpointSpec.target_role` filtering possible:
3354 // each endpoint receives only the containers whose
3355 // `ContainerId.role` matches its declared role.
3356 if self.proxy_manager.is_some() {
3357 self.update_proxy_backends(instance).await;
3358 }
3359 if self.stream_registry.is_some() {
3360 self.update_stream_backends(instance).await;
3361 }
3362
3363 // Register new containers with supervisor for crash monitoring.
3364 //
3365 // Container ids here must match what `ServiceInstance::scale_to`
3366 // constructed — same role (derived from `replica_groups`) and same
3367 // local node id. Otherwise supervise/unsupervise miss the live entry
3368 // and crash-restart bookkeeping leaks across scale events.
3369 let local_node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
3370 if let Some(supervisor) = &self.container_supervisor {
3371 // For scale-up, register new containers
3372 if replicas > current_replicas {
3373 for i in current_replicas..replicas {
3374 let replica_idx = i + 1;
3375 let container_id = ContainerId::with_role_and_node(
3376 name.to_string(),
3377 replica_idx,
3378 instance.role_for_replica(replica_idx),
3379 local_node_id,
3380 );
3381 supervisor.supervise(&container_id, &instance.spec).await;
3382 }
3383 }
3384 // For scale-down, unregister removed containers
3385 if replicas < current_replicas {
3386 for i in replicas..current_replicas {
3387 let replica_idx = i + 1;
3388 let container_id = ContainerId::with_role_and_node(
3389 name.to_string(),
3390 replica_idx,
3391 instance.role_for_replica(replica_idx),
3392 local_node_id,
3393 );
3394 supervisor.unsupervise(&container_id).await;
3395 }
3396 }
3397 }
3398
3399 Ok(())
3400 }
3401
3402 /// Collect backend addresses for a single endpoint of a service.
3403 ///
3404 /// This queries the service instance's containers for their overlay
3405 /// network IP addresses and constructs backend addresses using the
3406 /// endpoint's container target port.
3407 ///
3408 /// Containers are filtered by `endpoint.target_role`:
3409 /// - `None` (default): all containers of the service are eligible
3410 /// (legacy behavior).
3411 /// - `Some(role)`: only containers whose `ContainerId.role` equals
3412 /// `role` are included. Implements
3413 /// [`zlayer_spec::EndpointSpec::target_role`].
3414 ///
3415 /// If a container has a `port_override` (e.g., macOS sandbox where all
3416 /// containers share the host network), that port is used instead of
3417 /// the spec-declared endpoint port. This allows multiple replicas on
3418 /// the same IP (`127.0.0.1`) to be distinguished by port.
3419 async fn collect_endpoint_backends(
3420 &self,
3421 instance: &ServiceInstance,
3422 endpoint: &zlayer_spec::EndpointSpec,
3423 ) -> Vec<SocketAddr> {
3424 let mut addrs = Vec::new();
3425 let endpoint_port = endpoint.target_port();
3426 let containers = instance.containers().read().await;
3427
3428 for (container_id, container) in containers.iter() {
3429 // target_role filter: skip containers whose role doesn't match.
3430 if let Some(required_role) = endpoint.target_role.as_ref() {
3431 if container_id.role != *required_role {
3432 continue;
3433 }
3434 }
3435 let Some(ip) = container.overlay_ip else {
3436 continue;
3437 };
3438 // Use the runtime-assigned port override if present (macOS
3439 // sandbox), otherwise fall back to the endpoint's declared
3440 // target port.
3441 let port = container.port_override.unwrap_or(endpoint_port);
3442 addrs.push(SocketAddr::new(ip, port));
3443 }
3444
3445 // If we expected backends but found none, log a hint so operators
3446 // can debug. Distinguish "no containers" from "role filter
3447 // excluded everything" from "no overlay IPs".
3448 if addrs.is_empty() && !containers.is_empty() {
3449 tracing::warn!(
3450 service = %instance.service_name,
3451 endpoint = %endpoint.name,
3452 target_role = ?endpoint.target_role,
3453 container_count = containers.len(),
3454 "no backends collected for endpoint - either no matching role, no overlay IPs, or filtering excluded all"
3455 );
3456 }
3457
3458 addrs
3459 }
3460
3461 /// Get service replica count
3462 ///
3463 /// # Errors
3464 /// Returns an error if the service is not found.
3465 pub async fn service_replica_count(&self, name: &str) -> Result<usize> {
3466 let services = self.services.read().await;
3467 let instance = services.get(name).ok_or_else(|| AgentError::NotFound {
3468 container: name.to_string(),
3469 reason: "service not found".to_string(),
3470 })?;
3471
3472 Ok(instance.replica_count().await)
3473 }
3474
3475 /// Remove a workload (service, job, or cron)
3476 ///
3477 /// This method handles cleanup for different resource types:
3478 /// - **Service**: Unregisters proxy routes, supervisor, and removes from service map
3479 /// - **Job**: Unregisters from job executor
3480 /// - **Cron**: Unregisters from cron scheduler
3481 ///
3482 /// # Errors
3483 /// Returns an error if the service cannot be removed or scale-down fails.
3484 pub async fn remove_service(&self, name: &str) -> Result<()> {
3485 // Try to unregister from cron scheduler first
3486 if let Some(scheduler) = &self.cron_scheduler {
3487 scheduler.unregister(name).await;
3488 }
3489
3490 // Try to unregister from job executor
3491 if let Some(executor) = &self.job_executor {
3492 executor.unregister_job(name).await;
3493 }
3494
3495 // Unregister stream routes (TCP/UDP) from the stream registry
3496 if let Some(stream_registry) = &self.stream_registry {
3497 // Need to get the service spec to know which ports to unregister
3498 let services = self.services.read().await;
3499 if let Some(instance) = services.get(name) {
3500 for endpoint in &instance.spec.endpoints {
3501 match endpoint.protocol {
3502 Protocol::Tcp => {
3503 let _ = stream_registry.unregister_tcp(endpoint.port);
3504 tracing::debug!(
3505 service = %name,
3506 port = endpoint.port,
3507 "Unregistered TCP stream route"
3508 );
3509 }
3510 Protocol::Udp => {
3511 let _ = stream_registry.unregister_udp(endpoint.port);
3512 tracing::debug!(
3513 service = %name,
3514 port = endpoint.port,
3515 "Unregistered UDP stream route"
3516 );
3517 }
3518 _ => {} // HTTP routes handled above
3519 }
3520 }
3521 }
3522 drop(services); // Release read lock
3523 }
3524
3525 // Unpublish node-loopback ports for every live replica of this
3526 // service so the loopback listeners are freed (mirror of the
3527 // per-replica unpublish in `ServiceInstance::scale_to`). Gated on the
3528 // spec's policy; recomputes each backend from the container's stored
3529 // runtime-resolved IP and port override.
3530 {
3531 let services = self.services.read().await;
3532 if let Some(instance) = services.get(name) {
3533 if instance.spec.publish_to_node_loopback() {
3534 if let Some(proxy) = instance.proxy_manager() {
3535 let containers = instance.containers().read().await;
3536 for container in containers.values() {
3537 if let Some(ip) = container.overlay_ip {
3538 proxy
3539 .unpublish_loopback_for_container(
3540 &instance.spec,
3541 ip,
3542 container.port_override,
3543 )
3544 .await;
3545 }
3546 }
3547 }
3548 }
3549 }
3550 drop(services); // Release read lock
3551 }
3552
3553 // Unpublish explicit port mappings for every live replica (mirror of the
3554 // loopback unpublish above). NOT gated on `publish_to_node_loopback()`.
3555 {
3556 let services = self.services.read().await;
3557 if let Some(instance) = services.get(name) {
3558 if let Some(proxy) = instance.proxy_manager() {
3559 let containers = instance.containers().read().await;
3560 for container in containers.values() {
3561 if let Some(ip) = container.overlay_ip {
3562 for mapping in &instance.spec.port_mappings {
3563 if let Some(hp) = mapping.host_port.filter(|p| *p != 0) {
3564 let pm_backend = SocketAddr::new(
3565 ip,
3566 container.port_override.unwrap_or(mapping.container_port),
3567 );
3568 proxy
3569 .unpublish_port_mapping(hp, mapping.protocol, pm_backend)
3570 .await;
3571 }
3572 }
3573 }
3574 }
3575 }
3576 }
3577 drop(services); // Release read lock
3578 }
3579
3580 // Unregister containers from the supervisor
3581 if let Some(supervisor) = &self.container_supervisor {
3582 let containers = self.get_service_containers(name).await;
3583 for container_id in containers {
3584 supervisor.unsupervise(&container_id).await;
3585 }
3586 tracing::debug!(service = %name, "Unregistered containers from supervisor");
3587 }
3588
3589 // Clean up DNS records for the service (bare name + FQDNs).
3590 self.cleanup_service_dns(name).await;
3591
3592 // Tear down the per-service overlay bridge THROUGH overlayd so it isn't
3593 // leaked when a service is removed via a path that doesn't go through the
3594 // API deployment handler (which does its own `teardown_service_overlay`
3595 // in its teardown_order loop). Routing through overlayd keeps its
3596 // in-memory state synced — that is the only safe way to delete a `zl-…-b`
3597 // bridge (a hand `ip link del` deletes the link out from under overlayd
3598 // and desyncs it, requiring an overlayd restart). Idempotent with the
3599 // handler's call: deleting an already-gone bridge by deterministic name
3600 // is a non-fatal no-op.
3601 if let Some(overlay) = &self.overlay_manager {
3602 overlay.read().await.teardown_service_overlay(name).await;
3603 }
3604
3605 // Remove from services map (may or may not exist depending on rtype)
3606 let mut services = self.services.write().await;
3607 if services.remove(name).is_some() {
3608 tracing::debug!(service = %name, "Removed service from manager");
3609 }
3610
3611 Ok(())
3612 }
3613
3614 /// Remove every DNS record this service registered on attach: the bare
3615 /// compose service name (`{service}`), the service-level FQDN
3616 /// (`{service}.service.local`), and each replica's FQDN
3617 /// (`{replica}.{service}.service.local`). Best-effort; failures are logged.
3618 async fn cleanup_service_dns(&self, name: &str) {
3619 let Some(dns) = &self.dns_server else {
3620 return;
3621 };
3622
3623 // Bare compose service-name record (compose discovery).
3624 if let Err(e) = dns.remove_record(name).await {
3625 tracing::warn!(
3626 hostname = %name,
3627 error = %e,
3628 "failed to remove bare service-name DNS record"
3629 );
3630 }
3631
3632 // Service-level FQDN.
3633 let service_hostname = format!("{name}.service.local");
3634 if let Err(e) = dns.remove_record(&service_hostname).await {
3635 tracing::warn!(
3636 hostname = %service_hostname,
3637 error = %e,
3638 "failed to remove service DNS record"
3639 );
3640 } else {
3641 tracing::debug!(hostname = %service_hostname, "removed service DNS record");
3642 }
3643
3644 // Any remaining replica-specific FQDNs.
3645 let services = self.services.read().await;
3646 if let Some(instance) = services.get(name) {
3647 let containers = instance.containers().read().await;
3648 for (id, _) in containers.iter() {
3649 let replica_hostname = format!("{}.{}.service.local", id.replica, name);
3650 if let Err(e) = dns.remove_record(&replica_hostname).await {
3651 tracing::warn!(
3652 hostname = %replica_hostname,
3653 error = %e,
3654 "failed to remove replica DNS record during service removal"
3655 );
3656 }
3657 }
3658 }
3659 }
3660
3661 /// Introspect service infrastructure wiring.
3662 /// Returns (`has_overlay`, `has_proxy`, `has_dns`), or None if service not found.
3663 pub async fn service_infrastructure(&self, name: &str) -> Option<(bool, bool, bool)> {
3664 let services = self.services.read().await;
3665 services.get(name).map(|i| {
3666 (
3667 i.has_overlay_manager(),
3668 i.has_proxy_manager(),
3669 i.has_dns_server(),
3670 )
3671 })
3672 }
3673
3674 /// List all services
3675 pub async fn list_services(&self) -> Vec<String> {
3676 self.services.read().await.keys().cloned().collect()
3677 }
3678
3679 /// Get logs for a service, aggregated from all container replicas.
3680 ///
3681 /// # Arguments
3682 /// * `service_name` - Name of the service to fetch logs for
3683 /// * `tail` - Number of lines to return per container (0 = all)
3684 /// * `instance` - Optional specific instance (container ID suffix like "1", "2")
3685 ///
3686 /// # Errors
3687 /// Returns an error if the service or instance is not found.
3688 ///
3689 /// # Returns
3690 /// Structured log entries from all (or specific) container replicas. Each
3691 /// entry has its `service` and `deployment` fields populated when available.
3692 pub async fn get_service_logs(
3693 &self,
3694 service_name: &str,
3695 tail: usize,
3696 instance: Option<&str>,
3697 ) -> Result<Vec<LogEntry>> {
3698 let container_ids = self.get_service_containers(service_name).await;
3699
3700 if container_ids.is_empty() {
3701 return Err(AgentError::NotFound {
3702 container: service_name.to_string(),
3703 reason: "no containers found for service".to_string(),
3704 });
3705 }
3706
3707 // If a specific instance is requested, filter to just that one
3708 let target_ids: Vec<&ContainerId> = if let Some(inst) = instance {
3709 if let Ok(replica_num) = inst.parse::<u32>() {
3710 container_ids
3711 .iter()
3712 .filter(|id| id.replica == replica_num)
3713 .collect()
3714 } else {
3715 // Try matching by full container ID string suffix
3716 container_ids
3717 .iter()
3718 .filter(|id| id.to_string().contains(inst))
3719 .collect()
3720 }
3721 } else {
3722 container_ids.iter().collect()
3723 };
3724
3725 if target_ids.is_empty() {
3726 return Err(AgentError::NotFound {
3727 container: format!("{}/{}", service_name, instance.unwrap_or("?")),
3728 reason: "instance not found".to_string(),
3729 });
3730 }
3731
3732 let mut all_entries: Vec<LogEntry> = Vec::new();
3733
3734 for id in &target_ids {
3735 match self.runtime.container_logs(id, tail).await {
3736 Ok(mut entries) => {
3737 // Populate service and deployment metadata on each entry
3738 for entry in &mut entries {
3739 if entry.service.is_none() {
3740 entry.service = Some(service_name.to_string());
3741 }
3742 if entry.deployment.is_none() {
3743 entry.deployment.clone_from(&self.deployment_name);
3744 }
3745 }
3746 all_entries.extend(entries);
3747 }
3748 Err(e) => {
3749 tracing::warn!(
3750 service = service_name,
3751 container = %id,
3752 error = %e,
3753 "Failed to read container logs"
3754 );
3755 }
3756 }
3757 }
3758
3759 Ok(all_entries)
3760 }
3761
3762 /// Get all container IDs for a specific service
3763 ///
3764 /// Returns an empty vector if the service doesn't exist.
3765 ///
3766 /// # Arguments
3767 /// * `service_name` - Name of the service to query
3768 ///
3769 /// # Returns
3770 /// Vector of `ContainerIds` for all replicas of the service
3771 pub async fn get_service_containers(&self, service_name: &str) -> Vec<ContainerId> {
3772 let services = self.services.read().await;
3773 if let Some(instance) = services.get(service_name) {
3774 instance.container_ids().await
3775 } else {
3776 Vec::new()
3777 }
3778 }
3779
3780 /// Get per-container info (id, image, real state, pid, overlay IP) for a
3781 /// specific service.
3782 ///
3783 /// Unlike [`get_service_containers`](Self::get_service_containers) (which
3784 /// returns ids only), this surfaces the REAL image reference and lifecycle
3785 /// state of each live container so the API/`ps` can report them accurately.
3786 ///
3787 /// Returns an empty vector if the service doesn't exist.
3788 pub async fn get_service_container_infos(&self, service_name: &str) -> Vec<ContainerInfo> {
3789 let services = self.services.read().await;
3790 if let Some(instance) = services.get(service_name) {
3791 instance.container_infos().await
3792 } else {
3793 Vec::new()
3794 }
3795 }
3796
3797 /// This node's **local** view of `service` (running replica count, health,
3798 /// containers), used for cluster-wide aggregation. Served by the internal
3799 /// `/api/v1/internal/services/{svc}/state` endpoint and used as the local
3800 /// part of [`Self::cluster_service_states`].
3801 pub async fn local_service_state(
3802 &self,
3803 service: &str,
3804 ) -> zlayer_types::cluster::NodeServiceState {
3805 use zlayer_types::cluster::{ClusterContainerSummary, NodeServiceState};
3806 let node_id = self.cluster.as_ref().map_or(0, |c| c.node_id());
3807 let infos = self.get_service_container_infos(service).await;
3808 #[allow(clippy::cast_possible_truncation)]
3809 let running = infos
3810 .iter()
3811 .filter(|i| i.state.eq_ignore_ascii_case("running"))
3812 .count() as u32;
3813 // A node running 0 replicas is trivially healthy (it can't drag the
3814 // cluster-wide aggregate). Otherwise require a Healthy health state.
3815 let healthy = if running == 0 {
3816 true
3817 } else {
3818 let states = self.health_states();
3819 let guard = states.read().await;
3820 matches!(guard.get(service), Some(HealthState::Healthy))
3821 };
3822 let containers = infos
3823 .into_iter()
3824 .map(|i| ClusterContainerSummary {
3825 node_id,
3826 id: i.id.to_string(),
3827 service: i.id.service.clone(),
3828 replica: i.id.replica,
3829 image: i.image,
3830 state: i.state,
3831 pid: i.pid,
3832 overlay_ip: i.overlay_ip,
3833 })
3834 .collect();
3835 NodeServiceState {
3836 node_id,
3837 running,
3838 healthy,
3839 containers,
3840 }
3841 }
3842
3843 /// Cluster-wide per-node states for `service`: this node's local view plus
3844 /// every other node's (fetched via the cluster handle's
3845 /// `fetch_remote_service_states`). When not clustered, returns just the
3846 /// local view. This is the source of truth for distributed-service replica
3847 /// counts, health, and the `ps` container listing on the leader.
3848 pub async fn cluster_service_states(
3849 &self,
3850 service: &str,
3851 ) -> Vec<zlayer_types::cluster::NodeServiceState> {
3852 let mut states = vec![self.local_service_state(service).await];
3853 if let Some(cluster) = &self.cluster {
3854 states.extend(cluster.fetch_remote_service_states(service).await);
3855 }
3856 states
3857 }
3858
3859 /// Execute a command inside a running container for a service
3860 ///
3861 /// Picks a specific replica if provided, otherwise uses the first available container.
3862 ///
3863 /// # Arguments
3864 /// * `service_name` - Name of the service
3865 /// * `replica` - Optional replica number to target
3866 /// * `cmd` - Command and arguments to execute
3867 ///
3868 /// # Errors
3869 /// Returns an error if the service or replica is not found, or if exec fails.
3870 ///
3871 /// # Panics
3872 /// Panics if no replica is specified and the container list is unexpectedly empty
3873 /// after the emptiness check (should not happen in practice).
3874 ///
3875 /// # Returns
3876 /// Tuple of (`exit_code`, stdout, stderr)
3877 pub async fn exec_in_container(
3878 &self,
3879 service_name: &str,
3880 replica: Option<u32>,
3881 cmd: &[String],
3882 ) -> Result<(i32, String, String)> {
3883 let container_ids = self.get_service_containers(service_name).await;
3884
3885 if container_ids.is_empty() {
3886 return Err(AgentError::NotFound {
3887 container: service_name.to_string(),
3888 reason: "no containers found for service".to_string(),
3889 });
3890 }
3891
3892 // Pick the target container
3893 let target = if let Some(rep) = replica {
3894 container_ids
3895 .into_iter()
3896 .find(|cid| cid.replica == rep)
3897 .ok_or_else(|| AgentError::NotFound {
3898 container: format!("{service_name}-rep-{rep}"),
3899 reason: format!("replica {rep} not found for service"),
3900 })?
3901 } else {
3902 // Use the first container (lowest replica number)
3903 container_ids.into_iter().next().unwrap()
3904 };
3905
3906 self.runtime.exec(&target, cmd).await
3907 }
3908
3909 /// List every live container across all services, enriched with the data a
3910 /// Docker `ps` row needs and the data the Docker-name resolver needs.
3911 ///
3912 /// For each running container this surfaces the deployment name, the service
3913 /// name, the concrete [`ContainerId`], the compose `container_name:` label
3914 /// (when set, the user-facing Docker name), the real image, the lifecycle
3915 /// state, and the service's published port mappings. Used by the unified
3916 /// container-name resolver and by `docker ps` so compose deployments show up
3917 /// and resolve by their `container_name`.
3918 pub async fn list_container_views(&self) -> Vec<DeploymentContainerView> {
3919 let deployment = self.deployment_name.clone();
3920 let services = self.services.read().await;
3921 let mut out = Vec::new();
3922 for (service_name, instance) in services.iter() {
3923 let container_name = instance
3924 .spec
3925 .labels
3926 .get("com.docker.compose.container_name")
3927 .cloned();
3928 let ports = instance.spec.port_mappings.clone();
3929 for info in instance.container_infos().await {
3930 out.push(DeploymentContainerView {
3931 deployment: deployment.clone(),
3932 service: service_name.clone(),
3933 container_id: info.id,
3934 container_name: container_name.clone(),
3935 image: info.image,
3936 state: info.state,
3937 pid: info.pid,
3938 ports: ports.clone(),
3939 });
3940 }
3941 }
3942 out
3943 }
3944
3945 /// Resolve a Docker-style container name/id to a live deployment
3946 /// [`ContainerId`].
3947 ///
3948 /// Matching precedence (first hit wins):
3949 /// 1. The compose `container_name:` label (e.g. `forgejo-e2e`).
3950 /// 2. The conventional compose names `{deployment}-{service}-{replica}` and
3951 /// `{deployment}_{service}_{replica}` (replica is 1-based, mirroring
3952 /// Docker Compose; `ContainerId.replica` is 0-based so we add 1).
3953 /// 3. The bare service name (`{service}`), targeting its first replica.
3954 /// 4. The [`ContainerId`] `Display` form.
3955 ///
3956 /// Returns `None` when nothing matches a *running* container.
3957 pub async fn resolve_container_name(&self, name: &str) -> Option<ContainerId> {
3958 let views = self.list_container_views().await;
3959 // 1. explicit container_name label.
3960 if let Some(v) = views
3961 .iter()
3962 .find(|v| v.container_name.as_deref() == Some(name))
3963 {
3964 return Some(v.container_id.clone());
3965 }
3966 // 2 & 3. conventional names + bare service name.
3967 for v in &views {
3968 let dep = v.deployment.as_deref().unwrap_or("");
3969 let svc = &v.service;
3970 let rep1 = v.container_id.replica + 1;
3971 let candidates = [
3972 format!("{dep}-{svc}-{rep1}"),
3973 format!("{dep}_{svc}_{rep1}"),
3974 svc.clone(),
3975 ];
3976 if candidates.iter().any(|c| c == name) {
3977 return Some(v.container_id.clone());
3978 }
3979 }
3980 // 4. ContainerId Display form.
3981 for v in &views {
3982 if v.container_id.to_string() == name {
3983 return Some(v.container_id.clone());
3984 }
3985 }
3986 None
3987 }
3988
3989 /// Execute a command in a specific deployment container (by its concrete
3990 /// [`ContainerId`]) honouring Docker `exec` options (`--user`, `-w`, `-e`).
3991 ///
3992 /// Routes through [`Runtime::exec_with_opts`] so runtimes that support
3993 /// dropping to a uid/gid + chdir + env injection (macOS VZ-Linux) apply
3994 /// them; others fall back to a plain buffered exec.
3995 ///
3996 /// # Errors
3997 /// Propagates the runtime's exec error.
3998 pub async fn exec_in_container_id_with_opts(
3999 &self,
4000 id: &ContainerId,
4001 opts: &crate::runtime::ExecOptions,
4002 ) -> Result<(i32, String, String)> {
4003 self.runtime.exec_with_opts(id, opts).await
4004 }
4005
4006 // ==================== Job Management ====================
4007
4008 /// Trigger a job execution
4009 ///
4010 /// # Arguments
4011 /// * `name` - Name of the registered job
4012 /// * `trigger` - How the job was triggered (endpoint, cli, etc.)
4013 ///
4014 /// # Returns
4015 /// The execution ID for tracking the job
4016 ///
4017 /// # Errors
4018 /// - Returns error if job executor is not configured
4019 /// - Returns error if the job is not registered
4020 pub async fn trigger_job(&self, name: &str, trigger: JobTrigger) -> Result<JobExecutionId> {
4021 let executor = self
4022 .job_executor
4023 .as_ref()
4024 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
4025
4026 let spec = executor
4027 .get_job_spec(name)
4028 .await
4029 .ok_or_else(|| AgentError::NotFound {
4030 container: name.to_string(),
4031 reason: "job not registered".to_string(),
4032 })?;
4033
4034 executor.trigger(name, &spec, trigger).await
4035 }
4036
4037 /// Get the status of a job execution
4038 ///
4039 /// # Arguments
4040 /// * `id` - The execution ID returned from `trigger_job`
4041 ///
4042 /// # Returns
4043 /// The job execution details, or None if not found
4044 pub async fn get_job_execution(&self, id: &JobExecutionId) -> Option<JobExecution> {
4045 if let Some(executor) = &self.job_executor {
4046 executor.get_execution(id).await
4047 } else {
4048 None
4049 }
4050 }
4051
4052 /// List all executions for a specific job
4053 ///
4054 /// # Arguments
4055 /// * `name` - Name of the job
4056 ///
4057 /// # Returns
4058 /// Vector of job executions for the specified job
4059 pub async fn list_job_executions(&self, name: &str) -> Vec<JobExecution> {
4060 if let Some(executor) = &self.job_executor {
4061 executor.list_executions(name).await
4062 } else {
4063 Vec::new()
4064 }
4065 }
4066
4067 /// Cancel a running job execution
4068 ///
4069 /// # Arguments
4070 /// * `id` - The execution ID to cancel
4071 ///
4072 /// # Errors
4073 /// Returns error if job executor is not configured or if cancellation fails
4074 pub async fn cancel_job(&self, id: &JobExecutionId) -> Result<()> {
4075 let executor = self
4076 .job_executor
4077 .as_ref()
4078 .ok_or_else(|| AgentError::Configuration("Job executor not configured".to_string()))?;
4079
4080 executor.cancel(id).await
4081 }
4082
4083 // ==================== Cron Management ====================
4084
4085 /// Manually trigger a cron job (outside of its schedule)
4086 ///
4087 /// # Arguments
4088 /// * `name` - Name of the cron job
4089 ///
4090 /// # Returns
4091 /// The execution ID for tracking the triggered job
4092 ///
4093 /// # Errors
4094 /// Returns error if cron scheduler is not configured or job not found
4095 pub async fn trigger_cron(&self, name: &str) -> Result<JobExecutionId> {
4096 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
4097 AgentError::Configuration("Cron scheduler not configured".to_string())
4098 })?;
4099
4100 scheduler.trigger_now(name).await
4101 }
4102
4103 /// Enable or disable a cron job
4104 ///
4105 /// # Arguments
4106 /// * `name` - Name of the cron job
4107 /// * `enabled` - Whether to enable or disable the job
4108 pub async fn set_cron_enabled(&self, name: &str, enabled: bool) {
4109 if let Some(scheduler) = &self.cron_scheduler {
4110 scheduler.set_enabled(name, enabled).await;
4111 }
4112 }
4113
4114 /// List all registered cron jobs
4115 pub async fn list_cron_jobs(&self) -> Vec<crate::cron_scheduler::CronJobInfo> {
4116 if let Some(scheduler) = &self.cron_scheduler {
4117 scheduler.list_jobs().await
4118 } else {
4119 Vec::new()
4120 }
4121 }
4122
4123 /// Start the cron scheduler background task
4124 ///
4125 /// This spawns a background task that checks for due cron jobs every second.
4126 /// Returns a `JoinHandle` that can be used to wait for the scheduler to stop.
4127 ///
4128 /// # Errors
4129 /// Returns error if cron scheduler is not configured
4130 pub fn start_cron_scheduler(&self) -> Result<tokio::task::JoinHandle<()>> {
4131 let scheduler = self.cron_scheduler.as_ref().ok_or_else(|| {
4132 AgentError::Configuration("Cron scheduler not configured".to_string())
4133 })?;
4134
4135 let scheduler: Arc<CronScheduler> = Arc::clone(scheduler);
4136 Ok(tokio::spawn(async move {
4137 scheduler.run_loop().await;
4138 }))
4139 }
4140
4141 /// Shutdown the cron scheduler
4142 pub fn shutdown_cron(&self) {
4143 if let Some(scheduler) = &self.cron_scheduler {
4144 scheduler.shutdown();
4145 }
4146 }
4147
4148 /// Snapshot every live service's name paired with its full [`ServiceSpec`].
4149 ///
4150 /// Used by the agent-local autoscale controller to self-discover services
4151 /// that should be registered for adaptive scaling (and to seed the
4152 /// rolling-restart template). Returns a cloned, point-in-time view so the
4153 /// caller never holds the services lock across an await.
4154 pub async fn service_specs(&self) -> Vec<(String, ServiceSpec)> {
4155 self.services
4156 .read()
4157 .await
4158 .iter()
4159 .map(|(name, instance)| (name.clone(), instance.spec.clone()))
4160 .collect()
4161 }
4162}
4163
4164/// Bridge the post-`Arc::try_unwrap` [`ServiceManager`] (which the daemon holds
4165/// as `Arc<RwLock<ServiceManager>>`) into the proxy's [`ScaleTrigger`] so the
4166/// scale-to-zero activator can wake an idle service on the next inbound request.
4167///
4168/// Implementing the trait on `RwLock<ServiceManager>` lets the daemon's
4169/// `Arc<RwLock<ServiceManager>>` coerce directly to `Arc<dyn ScaleTrigger>`
4170/// without any wrapper type. Each `scale_to` takes a short read guard and
4171/// forwards to [`ServiceManager::scale_service`] (which itself routes through
4172/// the cluster when one is configured).
4173#[async_trait::async_trait]
4174impl crate::proxy_manager::ScaleTrigger for RwLock<ServiceManager> {
4175 async fn scale_to(&self, service: &str, replicas: u32) -> std::result::Result<(), String> {
4176 self.read()
4177 .await
4178 .scale_service(service, replicas)
4179 .await
4180 .map_err(|e| e.to_string())
4181 }
4182}
4183
4184/// The swarm identity of a service spec, if it is a member of an inference
4185/// swarm: `spec.resources.gpu.sharding.swarm_id`. `None` for ordinary services.
4186///
4187/// This is the partition key that groups swarm members so they can be placed
4188/// together as a coordinated gang (see [`partition_swarm_groups`]).
4189#[must_use]
4190pub fn swarm_id_of(spec: &ServiceSpec) -> Option<String> {
4191 spec.resources
4192 .gpu
4193 .as_ref()
4194 .and_then(|g| g.sharding.as_ref())
4195 .map(|s| s.swarm_id.clone())
4196}
4197
4198/// Desired replica count for a single swarm member. Swarm members are
4199/// single-unit ring stages / coordinators, so the gang treats each as one
4200/// replica; we still honor an explicit `Fixed`/`Adaptive` intent (defaulting to
4201/// 1) so a spec that requests more is not silently clamped.
4202#[must_use]
4203pub fn swarm_member_replicas(spec: &ServiceSpec) -> u32 {
4204 match &spec.scale {
4205 zlayer_spec::ScaleSpec::Fixed { replicas } => (*replicas).max(1),
4206 zlayer_spec::ScaleSpec::Adaptive { min, .. } => (*min).max(1),
4207 zlayer_spec::ScaleSpec::Manual => 1,
4208 }
4209}
4210
4211/// Partition a deploy's services into swarm groups keyed by `swarm_id`.
4212///
4213/// Only services whose spec carries `gpu.sharding` (see [`swarm_id_of`]) are
4214/// included; ordinary services are omitted entirely (they stay on the normal
4215/// per-service deploy path). The returned map is `swarm_id -> [(name, spec)]`.
4216#[must_use]
4217pub(crate) fn partition_swarm_groups(
4218 services: &HashMap<String, ServiceSpec>,
4219) -> HashMap<String, Vec<(String, ServiceSpec)>> {
4220 let mut groups: HashMap<String, Vec<(String, ServiceSpec)>> = HashMap::new();
4221 for (name, spec) in services {
4222 if let Some(swarm_id) = swarm_id_of(spec) {
4223 groups
4224 .entry(swarm_id)
4225 .or_default()
4226 .push((name.clone(), spec.clone()));
4227 }
4228 }
4229 groups
4230}
4231
4232#[cfg(test)]
4233#[allow(deprecated)]
4234mod tests {
4235 use super::*;
4236 use crate::runtime::MockRuntime;
4237
4238 #[tokio::test]
4239 async fn test_service_manager() {
4240 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4241 let manager = ServiceManager::new(runtime);
4242
4243 // Add service
4244 let spec = mock_spec();
4245 Box::pin(manager.upsert_service("test".to_string(), spec))
4246 .await
4247 .unwrap();
4248
4249 // Scale up
4250 manager.scale_service("test", 3).await.unwrap();
4251
4252 // Check count
4253 let count = manager.service_replica_count("test").await.unwrap();
4254 assert_eq!(count, 3);
4255
4256 // List services
4257 let services = manager.list_services().await;
4258 assert_eq!(services, vec!["test".to_string()]);
4259 }
4260
4261 #[tokio::test]
4262 async fn test_service_manager_basic_lifecycle() {
4263 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4264 let manager = ServiceManager::new(runtime);
4265
4266 // Add service with HTTP endpoint
4267 let spec = mock_spec();
4268 Box::pin(manager.upsert_service("api".to_string(), spec))
4269 .await
4270 .unwrap();
4271
4272 // Scale up
4273 manager.scale_service("api", 2).await.unwrap();
4274
4275 // Check count
4276 let count = manager.service_replica_count("api").await.unwrap();
4277 assert_eq!(count, 2);
4278
4279 // Remove service
4280 manager.remove_service("api").await.unwrap();
4281
4282 // Verify service is gone
4283 let services = manager.list_services().await;
4284 assert!(!services.contains(&"api".to_string()));
4285 }
4286
4287 #[tokio::test]
4288 async fn test_service_manager_with_full_config() {
4289 use tokio::sync::RwLock;
4290
4291 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4292
4293 // Create a mock overlay manager (skip actual network setup)
4294 let overlay_manager = Arc::new(RwLock::new(
4295 OverlayManager::new("test-deployment".to_string(), "test".to_string())
4296 .await
4297 .unwrap(),
4298 ));
4299
4300 let manager =
4301 ServiceManager::with_full_config(runtime, overlay_manager, "prod".to_string());
4302
4303 // Add service
4304 let spec = mock_spec();
4305 Box::pin(manager.upsert_service("web".to_string(), spec))
4306 .await
4307 .unwrap();
4308
4309 // Verify service is registered
4310 let services = manager.list_services().await;
4311 assert!(services.contains(&"web".to_string()));
4312 }
4313
4314 #[test]
4315 fn test_container_state_as_str() {
4316 use crate::runtime::ContainerState;
4317 assert_eq!(ContainerState::Pending.as_str(), "pending");
4318 assert_eq!(ContainerState::Initializing.as_str(), "initializing");
4319 assert_eq!(ContainerState::Running.as_str(), "running");
4320 assert_eq!(ContainerState::Stopping.as_str(), "stopping");
4321 assert_eq!(ContainerState::Exited { code: 0 }.as_str(), "exited");
4322 assert_eq!(
4323 ContainerState::Failed {
4324 reason: "boom".to_string()
4325 }
4326 .as_str(),
4327 "failed"
4328 );
4329 // Display delegates to as_str.
4330 assert_eq!(ContainerState::Running.to_string(), "running");
4331 }
4332
4333 /// A container created from image X must report image X and its real
4334 /// lifecycle state through the new `container_infos` accessor, replacing
4335 /// the previously hardcoded `"running"` / empty-image behavior.
4336 #[tokio::test]
4337 async fn test_container_infos_surfaces_image_and_state() {
4338 use crate::runtime::{Container, ContainerState};
4339
4340 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4341 let manager = ServiceManager::new(runtime);
4342
4343 let spec = mock_spec(); // image name = "test:latest"
4344 let image = spec.image.name.to_string();
4345 Box::pin(manager.upsert_service("web".to_string(), spec))
4346 .await
4347 .unwrap();
4348
4349 // Inject containers directly with distinct states.
4350 {
4351 let services = manager.services.read().await;
4352 let instance = services.get("web").unwrap();
4353 let mut containers = instance.containers().write().await;
4354
4355 let running_id = ContainerId::new("web", 0);
4356 containers.insert(
4357 running_id.clone(),
4358 Container {
4359 id: running_id,
4360 image: image.clone(),
4361 state: ContainerState::Running,
4362 pid: Some(4242),
4363 task: None,
4364 overlay_ip: None,
4365 health_monitor: None,
4366 port_override: None,
4367 },
4368 );
4369
4370 let exited_id = ContainerId::new("web", 1);
4371 containers.insert(
4372 exited_id.clone(),
4373 Container {
4374 id: exited_id,
4375 image: image.clone(),
4376 state: ContainerState::Exited { code: 1 },
4377 pid: None,
4378 task: None,
4379 overlay_ip: None,
4380 health_monitor: None,
4381 port_override: None,
4382 },
4383 );
4384 }
4385
4386 let mut infos = manager.get_service_container_infos("web").await;
4387 infos.sort_by_key(|i| i.id.replica);
4388 assert_eq!(infos.len(), 2);
4389
4390 // Every container reports the real image it was created from.
4391 assert!(infos.iter().all(|i| i.image == image));
4392 assert!(infos.iter().all(|i| i.image == "test:latest"));
4393
4394 // Real per-container state is surfaced (not a hardcoded "running").
4395 assert_eq!(infos[0].state, "running");
4396 assert_eq!(infos[0].pid, Some(4242));
4397 assert_eq!(infos[1].state, "exited");
4398 assert_eq!(infos[1].pid, None);
4399
4400 // Unknown service yields an empty list.
4401 assert!(manager
4402 .get_service_container_infos("missing")
4403 .await
4404 .is_empty());
4405 }
4406
4407 /// Bug 2 (`cluster_upgrade`): a changed image *reference* (tag bump) under
4408 /// `if_not_present` must still recreate the local replicas. Previously the
4409 /// recreate only fired on digest drift under `Always`/`Newer`, so a tag
4410 /// change was silently ignored and containers stayed on the old image.
4411 #[tokio::test]
4412 async fn upsert_recreates_local_replicas_on_image_reference_change() {
4413 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4414 let manager = ServiceManager::new(runtime);
4415
4416 // Deploy v1 with the e2e's pull policy (if_not_present) and scale up.
4417 let mut spec = mock_spec();
4418 spec.image.name = "docker.io/library/nginx:1.28-alpine".parse().unwrap();
4419 spec.image.pull_policy = zlayer_spec::PullPolicy::IfNotPresent;
4420 Box::pin(manager.upsert_service("web".to_string(), spec.clone()))
4421 .await
4422 .unwrap();
4423 manager.scale_service_local("web", 2).await.unwrap();
4424
4425 let v1: Vec<String> = manager
4426 .get_service_container_infos("web")
4427 .await
4428 .into_iter()
4429 .map(|i| i.image)
4430 .collect();
4431 assert_eq!(v1.len(), 2);
4432 assert!(
4433 v1.iter().all(|img| img.contains("1.28")),
4434 "expected v1 images, got {v1:?}"
4435 );
4436
4437 // Upgrade to v2 under the SAME if_not_present policy.
4438 let mut spec_v2 = spec;
4439 spec_v2.image.name = "docker.io/library/nginx:1.29-alpine".parse().unwrap();
4440 Box::pin(manager.upsert_service("web".to_string(), spec_v2))
4441 .await
4442 .unwrap();
4443
4444 let v2: Vec<String> = manager
4445 .get_service_container_infos("web")
4446 .await
4447 .into_iter()
4448 .map(|i| i.image)
4449 .collect();
4450 assert_eq!(v2.len(), 2, "replica count preserved across upgrade");
4451 assert!(
4452 v2.iter().all(|img| img.contains("1.29")),
4453 "containers must be recreated on the new image, got {v2:?}"
4454 );
4455 }
4456
4457 fn mock_spec() -> ServiceSpec {
4458 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
4459 r"
4460version: v1
4461deployment: test
4462services:
4463 test:
4464 rtype: service
4465 image:
4466 name: test:latest
4467 endpoints:
4468 - name: http
4469 protocol: http
4470 port: 8080
4471 scale:
4472 mode: fixed
4473 replicas: 1
4474",
4475 )
4476 .unwrap()
4477 .services
4478 .remove("test")
4479 .unwrap()
4480 }
4481
4482 /// Build a swarm-member spec via YAML so it exercises the real deserialize
4483 /// path: a GPU block with a `sharding` entry tied to `swarm_id`.
4484 fn swarm_member_spec(swarm_id: &str, role: zlayer_spec::SwarmRole) -> ServiceSpec {
4485 let role_str = match role {
4486 zlayer_spec::SwarmRole::Coordinator => "coordinator",
4487 zlayer_spec::SwarmRole::Stage => "stage",
4488 };
4489 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(&format!(
4490 r"
4491version: v1
4492deployment: test
4493services:
4494 test:
4495 rtype: service
4496 image:
4497 name: test:latest
4498 resources:
4499 gpu:
4500 count: 1
4501 sharding:
4502 swarm_id: {swarm_id}
4503 layer_start: 0
4504 layer_end: 12
4505 layer_count: 36
4506 role: {role_str}
4507 scale:
4508 mode: fixed
4509 replicas: 1
4510"
4511 ))
4512 .unwrap()
4513 .services
4514 .remove("test")
4515 .unwrap()
4516 }
4517
4518 #[test]
4519 fn partition_swarm_groups_buckets_by_swarm_id_and_skips_plain_services() {
4520 let mut services: HashMap<String, ServiceSpec> = HashMap::new();
4521 services.insert("plain".to_string(), mock_spec()); // non-swarm: excluded
4522 services.insert(
4523 "stage-0".to_string(),
4524 swarm_member_spec("sw1", zlayer_spec::SwarmRole::Stage),
4525 );
4526 services.insert(
4527 "stage-1".to_string(),
4528 swarm_member_spec("sw1", zlayer_spec::SwarmRole::Stage),
4529 );
4530 services.insert(
4531 "other-stage".to_string(),
4532 swarm_member_spec("sw2", zlayer_spec::SwarmRole::Stage),
4533 );
4534
4535 let groups = partition_swarm_groups(&services);
4536
4537 // Only swarm members are bucketed; "plain" is omitted entirely.
4538 assert_eq!(groups.len(), 2, "two distinct swarm_ids");
4539 assert!(!groups.contains_key("plain"));
4540 let mut sw1: Vec<&str> = groups
4541 .get("sw1")
4542 .expect("sw1 group")
4543 .iter()
4544 .map(|(n, _)| n.as_str())
4545 .collect();
4546 sw1.sort_unstable();
4547 assert_eq!(sw1, vec!["stage-0", "stage-1"]);
4548 assert_eq!(groups.get("sw2").expect("sw2 group").len(), 1);
4549 }
4550
4551 #[test]
4552 fn swarm_id_of_is_none_for_plain_service() {
4553 assert_eq!(swarm_id_of(&mock_spec()), None);
4554 assert_eq!(
4555 swarm_id_of(&swarm_member_spec(
4556 "sw9",
4557 zlayer_spec::SwarmRole::Coordinator
4558 )),
4559 Some("sw9".to_string())
4560 );
4561 }
4562
4563 #[test]
4564 fn swarm_member_replicas_defaults_to_one() {
4565 // mock_spec uses Fixed{replicas:1}.
4566 assert_eq!(swarm_member_replicas(&mock_spec()), 1);
4567 let mut spec = mock_spec();
4568 spec.scale = zlayer_spec::ScaleSpec::Manual;
4569 assert_eq!(swarm_member_replicas(&spec), 1);
4570 }
4571
4572 #[test]
4573 fn test_set_container_dns_injects_when_empty() {
4574 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4575 let spec = mock_spec(); // spec.dns defaults to empty, host_network false
4576 let mut instance =
4577 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
4578 instance.set_container_dns("10.42.0.1".parse().unwrap());
4579 assert_eq!(instance.spec.dns, vec!["10.42.0.1".to_string()]);
4580 }
4581
4582 #[test]
4583 fn test_set_container_dns_skips_host_network() {
4584 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4585 let mut spec = mock_spec();
4586 spec.host_network = true;
4587 let mut instance =
4588 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
4589 instance.set_container_dns("10.42.0.1".parse().unwrap());
4590 assert!(
4591 instance.spec.dns.is_empty(),
4592 "host_network containers must inherit the host resolv.conf"
4593 );
4594 }
4595
4596 /// REGRESSION: a `host_network` container must NOT be attached to the
4597 /// overlay even when an overlay manager IS present. Attaching by the
4598 /// host-netns PID wires a stray `zl-*` veth into the host stack (and
4599 /// host-network containers have no private netns / must not get an overlay
4600 /// IP). The `start_container` attach is gated on `should_attach_overlay()`.
4601 #[tokio::test]
4602 async fn should_attach_overlay_skips_host_network_even_with_overlay_present() {
4603 use crate::overlay_manager::OverlayManager;
4604 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4605 let om = Arc::new(RwLock::new(
4606 OverlayManager::new("test".to_string(), "i0".to_string())
4607 .await
4608 .unwrap(),
4609 ));
4610
4611 // Normal service with the overlay present -> attaches.
4612 let normal = ServiceInstance::new(
4613 "web".to_string(),
4614 mock_spec(),
4615 Arc::clone(&runtime),
4616 Some(Arc::clone(&om)),
4617 );
4618 assert!(
4619 normal.should_attach_overlay(),
4620 "a non-host-network service must attach to the overlay"
4621 );
4622
4623 // Host-network service, overlay STILL present -> must skip attach.
4624 let mut hn_spec = mock_spec();
4625 hn_spec.host_network = true;
4626 let host_net =
4627 ServiceInstance::new("web".to_string(), hn_spec, Arc::clone(&runtime), Some(om));
4628 assert!(
4629 !host_net.should_attach_overlay(),
4630 "host_network containers must never attach to the overlay (stray zl-* link / no private netns)"
4631 );
4632 }
4633
4634 /// A lifecycle-recording runtime that captures the ORDER of the container
4635 /// lifecycle calls the scale-up path makes, and (like youki) makes the init
4636 /// PID available in the "created" state — at `create_container`, BEFORE
4637 /// `start_container`. Used to assert the overlay-attach window opens before
4638 /// start for host-process runtimes, and that an init/start failure detaches
4639 /// + removes (no leaked veth/container).
4640 struct LifecycleRecordingRuntime {
4641 inner: MockRuntime,
4642 /// Ordered lifecycle events: "create" | "pid" | "start" | "remove".
4643 events: std::sync::Mutex<Vec<&'static str>>,
4644 /// When true, `start_container` fails (exercises the start-failure
4645 /// cleanup path).
4646 fail_start: bool,
4647 }
4648
4649 impl std::fmt::Debug for LifecycleRecordingRuntime {
4650 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4651 f.debug_struct("LifecycleRecordingRuntime")
4652 .field("fail_start", &self.fail_start)
4653 .finish_non_exhaustive()
4654 }
4655 }
4656
4657 impl LifecycleRecordingRuntime {
4658 fn new(fail_start: bool) -> Self {
4659 Self {
4660 inner: MockRuntime::new(),
4661 events: std::sync::Mutex::new(Vec::new()),
4662 fail_start,
4663 }
4664 }
4665
4666 fn events(&self) -> Vec<&'static str> {
4667 self.events.lock().unwrap().clone()
4668 }
4669
4670 fn record(&self, ev: &'static str) {
4671 self.events.lock().unwrap().push(ev);
4672 }
4673 }
4674
4675 #[async_trait::async_trait]
4676 impl Runtime for LifecycleRecordingRuntime {
4677 async fn pull_image(&self, image: &str) -> Result<()> {
4678 self.inner.pull_image(image).await
4679 }
4680 async fn pull_image_with_policy(
4681 &self,
4682 image: &str,
4683 policy: PullPolicy,
4684 auth: Option<&zlayer_spec::RegistryAuth>,
4685 source: zlayer_spec::SourcePolicy,
4686 ) -> Result<()> {
4687 self.inner
4688 .pull_image_with_policy(image, policy, auth, source)
4689 .await
4690 }
4691 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
4692 self.record("create");
4693 self.inner.create_container(id, spec).await
4694 }
4695 async fn start_container(&self, id: &ContainerId) -> Result<()> {
4696 self.record("start");
4697 if self.fail_start {
4698 return Err(AgentError::StartFailed {
4699 id: id.to_string(),
4700 reason: "injected start failure".to_string(),
4701 });
4702 }
4703 self.inner.start_container(id).await
4704 }
4705 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
4706 self.inner.stop_container(id, timeout).await
4707 }
4708 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
4709 self.record("remove");
4710 self.inner.remove_container(id).await
4711 }
4712 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
4713 self.inner.container_state(id).await
4714 }
4715 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
4716 self.inner.container_logs(id, tail).await
4717 }
4718 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
4719 self.inner.exec(id, cmd).await
4720 }
4721 async fn get_container_stats(
4722 &self,
4723 id: &ContainerId,
4724 ) -> Result<crate::cgroups_stats::ContainerStats> {
4725 self.inner.get_container_stats(id).await
4726 }
4727 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
4728 self.inner.wait_container(id).await
4729 }
4730 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
4731 self.inner.get_logs(id).await
4732 }
4733 async fn get_container_pid(&self, _id: &ContainerId) -> Result<Option<u32>> {
4734 // youki writes the init pid-file at create time, so the PID is
4735 // available in the "created" state — even before start. Mirror that
4736 // here so the host overlay attach (which the scale-up path now runs
4737 // between create and start) sees a PID.
4738 self.record("pid");
4739 Ok(Some(std::process::id()))
4740 }
4741 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
4742 self.inner.get_container_ip(id).await
4743 }
4744 }
4745
4746 /// REGRESSION: for a host-process (Linux youki) runtime, the scale-up path
4747 /// must capture the container PID and open the overlay-attach window in the
4748 /// "created" state — AFTER `create_container` and BEFORE `start_container`.
4749 /// Before this fix the PID was captured (and the overlay attached) only
4750 /// AFTER start, by which point the entrypoint had execve'd + dropped to a
4751 /// non-root user (non-dumpable netns → overlayd EACCES) or a one-shot had
4752 /// already exited (ENOENT). We assert ordering on the runtime calls the
4753 /// new flow makes: create → pid → start.
4754 #[tokio::test]
4755 async fn scale_up_captures_pid_before_start_for_host_runtime() {
4756 let runtime = Arc::new(LifecycleRecordingRuntime::new(false));
4757 let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
4758
4759 // No overlay manager: the attach itself is best-effort; this test pins
4760 // the ordering invariant (PID captured in the created state) that makes
4761 // a host overlay attach BEFORE start possible. The host runtime kind is
4762 // the default (`HostNetns`).
4763 let instance = ServiceInstance::new("web".to_string(), mock_spec(), runtime_dyn, None);
4764 instance.scale_to(1).await.unwrap();
4765
4766 let events = runtime.events();
4767 let create = events.iter().position(|e| *e == "create");
4768 let pid = events.iter().position(|e| *e == "pid");
4769 let start = events.iter().position(|e| *e == "start");
4770
4771 assert!(
4772 create.is_some(),
4773 "create_container must be called: {events:?}"
4774 );
4775 assert!(
4776 pid.is_some(),
4777 "the PID must be captured (get_container_pid called): {events:?}"
4778 );
4779 assert!(
4780 start.is_some(),
4781 "start_container must be called: {events:?}"
4782 );
4783 assert!(
4784 create < pid,
4785 "PID must be captured AFTER create_container: {events:?}"
4786 );
4787 assert!(
4788 pid < start,
4789 "PID must be captured (overlay-attach window opened) BEFORE \
4790 start_container for the host runtime kind: {events:?}"
4791 );
4792 }
4793
4794 /// REGRESSION: with the overlay attach moved BEFORE start, a
4795 /// `start_container` failure must not strand a half-created container — the
4796 /// cleanup path must `remove_container` (and, when attached, detach the
4797 /// overlay) before returning the error. Mirrors the job path.
4798 #[tokio::test]
4799 async fn scale_up_start_failure_removes_container() {
4800 let runtime = Arc::new(LifecycleRecordingRuntime::new(true));
4801 let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
4802
4803 let instance = ServiceInstance::new("web".to_string(), mock_spec(), runtime_dyn, None);
4804 let err = instance
4805 .scale_to(1)
4806 .await
4807 .expect_err("a start failure must propagate as an error");
4808 assert!(
4809 matches!(err, AgentError::StartFailed { .. }),
4810 "expected StartFailed, got {err:?}"
4811 );
4812
4813 let events = runtime.events();
4814 assert!(
4815 events.contains(&"start"),
4816 "start_container must have been attempted: {events:?}"
4817 );
4818 assert!(
4819 events.contains(&"remove"),
4820 "a failed start must remove the half-created container (no leak): {events:?}"
4821 );
4822 let start = events.iter().position(|e| *e == "start");
4823 let remove = events.iter().position(|e| *e == "remove");
4824 assert!(
4825 start < remove,
4826 "remove must run AFTER the failed start (cleanup): {events:?}"
4827 );
4828 }
4829
4830 /// REGRESSION: an init-action failure (now AFTER create + host overlay
4831 /// attach, BEFORE start) must also clean up — `remove_container` runs and
4832 /// the container is NEVER started.
4833 #[tokio::test]
4834 async fn scale_up_init_failure_removes_container_and_never_starts() {
4835 let runtime = Arc::new(LifecycleRecordingRuntime::new(false));
4836 let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
4837
4838 // An unknown init action fails immediately (UnknownAction →
4839 // InitActionFailed); the default error policy is `Fail` (no retries).
4840 let mut spec = mock_spec();
4841 spec.init = zlayer_spec::InitSpec {
4842 steps: vec![zlayer_spec::InitStep {
4843 id: "boom".to_string(),
4844 uses: "init.does_not_exist".to_string(),
4845 with: zlayer_spec::InitParams::default(),
4846 retry: None,
4847 timeout: None,
4848 on_failure: zlayer_spec::FailureAction::Fail,
4849 }],
4850 };
4851
4852 let instance = ServiceInstance::new("web".to_string(), spec, runtime_dyn, None);
4853 let err = instance
4854 .scale_to(1)
4855 .await
4856 .expect_err("an init-action failure must propagate as an error");
4857 assert!(
4858 matches!(err, AgentError::InitActionFailed { .. }),
4859 "expected InitActionFailed, got {err:?}"
4860 );
4861
4862 let events = runtime.events();
4863 assert!(
4864 events.contains(&"create"),
4865 "create_container must have run: {events:?}"
4866 );
4867 assert!(
4868 events.contains(&"remove"),
4869 "a failed init must remove the half-created container (no leak): {events:?}"
4870 );
4871 assert!(
4872 !events.contains(&"start"),
4873 "the container must NEVER be started when init fails: {events:?}"
4874 );
4875 }
4876
4877 #[test]
4878 fn test_set_container_dns_preserves_user_dns() {
4879 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4880 let mut spec = mock_spec();
4881 spec.dns = vec!["1.1.1.1".to_string()];
4882 let mut instance =
4883 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
4884 instance.set_container_dns("10.42.0.1".parse().unwrap());
4885 assert_eq!(
4886 instance.spec.dns,
4887 vec!["1.1.1.1".to_string()],
4888 "user-supplied spec.dns must win over the overlay resolver"
4889 );
4890 }
4891
4892 /// Helper to create a `ServiceSpec` with dependencies
4893 fn mock_spec_with_deps(deps: Vec<DependsSpec>) -> ServiceSpec {
4894 let mut spec = mock_spec();
4895 spec.depends = deps;
4896 spec
4897 }
4898
4899 /// Helper to create a `DependsSpec`
4900 fn dep(
4901 service: &str,
4902 condition: zlayer_spec::DependencyCondition,
4903 timeout_ms: u64,
4904 on_timeout: zlayer_spec::TimeoutAction,
4905 ) -> DependsSpec {
4906 DependsSpec {
4907 service: service.to_string(),
4908 condition,
4909 timeout: Some(Duration::from_millis(timeout_ms)),
4910 on_timeout,
4911 }
4912 }
4913
4914 #[tokio::test]
4915 async fn test_deploy_with_dependencies_no_deps() {
4916 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4917 let manager = ServiceManager::new(runtime);
4918
4919 // Services with no dependencies
4920 let mut services = HashMap::new();
4921 services.insert("a".to_string(), mock_spec());
4922 services.insert("b".to_string(), mock_spec());
4923
4924 // Should deploy both without issue
4925 Box::pin(manager.deploy_with_dependencies(services))
4926 .await
4927 .unwrap();
4928
4929 // Both services should be registered
4930 let service_list = manager.list_services().await;
4931 assert_eq!(service_list.len(), 2);
4932 }
4933
4934 #[tokio::test]
4935 async fn test_deploy_with_dependencies_linear() {
4936 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4937 let manager = ServiceManager::new(runtime);
4938
4939 // A -> B -> C (A depends on B, B depends on C)
4940 // All use "started" condition which is satisfied when container is running
4941 let mut services = HashMap::new();
4942 services.insert("c".to_string(), mock_spec());
4943 services.insert(
4944 "b".to_string(),
4945 mock_spec_with_deps(vec![dep(
4946 "c",
4947 zlayer_spec::DependencyCondition::Started,
4948 5000,
4949 zlayer_spec::TimeoutAction::Fail,
4950 )]),
4951 );
4952 services.insert(
4953 "a".to_string(),
4954 mock_spec_with_deps(vec![dep(
4955 "b",
4956 zlayer_spec::DependencyCondition::Started,
4957 5000,
4958 zlayer_spec::TimeoutAction::Fail,
4959 )]),
4960 );
4961
4962 // Should deploy in order: c, b, a
4963 Box::pin(manager.deploy_with_dependencies(services))
4964 .await
4965 .unwrap();
4966
4967 // All services should be registered
4968 let service_list = manager.list_services().await;
4969 assert_eq!(service_list.len(), 3);
4970 }
4971
4972 #[tokio::test]
4973 async fn test_deploy_with_dependencies_cycle_detection() {
4974 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
4975 let manager = ServiceManager::new(runtime);
4976
4977 // A -> B -> A (cycle)
4978 let mut services = HashMap::new();
4979 services.insert(
4980 "a".to_string(),
4981 mock_spec_with_deps(vec![dep(
4982 "b",
4983 zlayer_spec::DependencyCondition::Started,
4984 5000,
4985 zlayer_spec::TimeoutAction::Fail,
4986 )]),
4987 );
4988 services.insert(
4989 "b".to_string(),
4990 mock_spec_with_deps(vec![dep(
4991 "a",
4992 zlayer_spec::DependencyCondition::Started,
4993 5000,
4994 zlayer_spec::TimeoutAction::Fail,
4995 )]),
4996 );
4997
4998 // Should fail with cycle detection
4999 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
5000 assert!(result.is_err());
5001 let err = result.unwrap_err().to_string();
5002 assert!(err.contains("Cyclic dependency"));
5003 }
5004
5005 #[tokio::test]
5006 async fn test_deploy_with_dependencies_timeout_continue() {
5007 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5008 let manager = ServiceManager::new(runtime);
5009
5010 // A depends on B (healthy), but B never becomes healthy
5011 // Using continue action, so it should proceed anyway
5012 let mut services = HashMap::new();
5013 services.insert("b".to_string(), mock_spec());
5014 services.insert(
5015 "a".to_string(),
5016 mock_spec_with_deps(vec![dep(
5017 "b",
5018 zlayer_spec::DependencyCondition::Healthy, // B won't pass healthy check
5019 100, // Short timeout
5020 zlayer_spec::TimeoutAction::Continue, // But continue anyway
5021 )]),
5022 );
5023
5024 // Should deploy both despite timeout
5025 Box::pin(manager.deploy_with_dependencies(services))
5026 .await
5027 .unwrap();
5028
5029 let service_list = manager.list_services().await;
5030 assert_eq!(service_list.len(), 2);
5031 }
5032
5033 #[tokio::test]
5034 async fn test_deploy_with_dependencies_timeout_warn() {
5035 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5036 let manager = ServiceManager::new(runtime);
5037
5038 // A depends on B (healthy), but B never becomes healthy
5039 // Using warn action, so it should proceed with a warning
5040 let mut services = HashMap::new();
5041 services.insert("b".to_string(), mock_spec());
5042 services.insert(
5043 "a".to_string(),
5044 mock_spec_with_deps(vec![dep(
5045 "b",
5046 zlayer_spec::DependencyCondition::Healthy,
5047 100,
5048 zlayer_spec::TimeoutAction::Warn,
5049 )]),
5050 );
5051
5052 // Should deploy both despite timeout (with warning)
5053 Box::pin(manager.deploy_with_dependencies(services))
5054 .await
5055 .unwrap();
5056
5057 let service_list = manager.list_services().await;
5058 assert_eq!(service_list.len(), 2);
5059 }
5060
5061 #[tokio::test]
5062 async fn test_deploy_with_dependencies_timeout_fail() {
5063 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5064 let manager = ServiceManager::new(runtime);
5065
5066 // A depends on B (healthy), but B never becomes healthy
5067 // Using fail action, so deployment should fail
5068 let mut services = HashMap::new();
5069 services.insert("b".to_string(), mock_spec());
5070 services.insert(
5071 "a".to_string(),
5072 mock_spec_with_deps(vec![dep(
5073 "b",
5074 zlayer_spec::DependencyCondition::Healthy,
5075 100,
5076 zlayer_spec::TimeoutAction::Fail,
5077 )]),
5078 );
5079
5080 // Should fail after B is started but doesn't become healthy
5081 let result = Box::pin(manager.deploy_with_dependencies(services)).await;
5082 assert!(result.is_err());
5083
5084 // B should be started (it has no deps), but A should fail
5085 let err = result.unwrap_err().to_string();
5086 assert!(err.contains("Dependency timeout"));
5087 }
5088
5089 #[tokio::test]
5090 async fn test_check_dependencies_all_satisfied() {
5091 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5092 let manager = ServiceManager::new(runtime);
5093
5094 // Mark a service as healthy
5095 manager
5096 .update_health_state("db", HealthState::Healthy)
5097 .await;
5098
5099 let deps = vec![DependsSpec {
5100 service: "db".to_string(),
5101 condition: zlayer_spec::DependencyCondition::Healthy,
5102 timeout: Some(Duration::from_secs(60)),
5103 on_timeout: zlayer_spec::TimeoutAction::Fail,
5104 }];
5105
5106 let satisfied = manager.check_dependencies(&deps).await.unwrap();
5107 assert!(satisfied);
5108 }
5109
5110 #[tokio::test]
5111 async fn test_check_dependencies_not_satisfied() {
5112 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5113 let manager = ServiceManager::new(runtime);
5114
5115 // Service not healthy (no state set = Unknown)
5116 let deps = vec![DependsSpec {
5117 service: "db".to_string(),
5118 condition: zlayer_spec::DependencyCondition::Healthy,
5119 timeout: Some(Duration::from_secs(60)),
5120 on_timeout: zlayer_spec::TimeoutAction::Fail,
5121 }];
5122
5123 let satisfied = manager.check_dependencies(&deps).await.unwrap();
5124 assert!(!satisfied);
5125 }
5126
5127 #[tokio::test]
5128 async fn test_health_state_tracking() {
5129 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5130 let manager = ServiceManager::new(runtime);
5131
5132 // Update health states
5133 manager
5134 .update_health_state("db", HealthState::Healthy)
5135 .await;
5136 manager
5137 .update_health_state("cache", HealthState::Unknown)
5138 .await;
5139
5140 // Verify states
5141 let states = manager.health_states();
5142 let states_read = states.read().await;
5143
5144 assert!(matches!(states_read.get("db"), Some(HealthState::Healthy)));
5145 assert!(matches!(
5146 states_read.get("cache"),
5147 Some(HealthState::Unknown)
5148 ));
5149 }
5150
5151 /// Regression test for the stabilization timeout that blocked the raft-e2e
5152 /// `cluster_scaling` / `cluster_upgrade` suites.
5153 ///
5154 /// Previously the callback that bridges a container's health result into the
5155 /// `ServiceManager` `health_states` map was only attached when BOTH a proxy
5156 /// manager AND a reachable overlay IP existed. In degraded-overlay / no-proxy
5157 /// deployments that `if let` was false, so `health_states` was never written,
5158 /// the service stayed `healthy=false` forever, and stabilization timed out
5159 /// even though the container was running and its health check passing.
5160 ///
5161 /// This test drives the real `scale_to` create path with:
5162 /// * NO `proxy_manager` (so `proxy_backend` resolves to None), and
5163 /// * a `Command { command: "true" }` health check (always passes host-side),
5164 /// then asserts the shared `health_states` map receives `Healthy` for the
5165 /// service — proving the bridge fires unconditionally.
5166 ///
5167 /// Gated to `#[cfg(unix)]` because `HealthCheck::Command` is executed via
5168 /// `sh -c <command>` in `crate::health::HealthChecker::check_command`. On
5169 /// Windows hosts without `sh` on PATH (the default Windows CI image), no
5170 /// Command-based health check can ever pass, so the test would fail for
5171 /// reasons unrelated to the bridge it is regression-testing. The bridge
5172 /// behavior under test is platform-agnostic; only the test fixture's
5173 /// "always-passes command" needs a Unix shell.
5174 #[cfg(unix)]
5175 #[tokio::test]
5176 async fn test_health_states_bridge_fires_without_proxy() {
5177 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5178
5179 // Service spec with a host-side command health check that always passes.
5180 // Zero start-grace + a short interval keep the test fast.
5181 let mut spec = mock_spec();
5182 spec.health = zlayer_spec::HealthSpec {
5183 start_grace: Some(Duration::from_millis(0)),
5184 interval: Some(Duration::from_millis(50)),
5185 timeout: Some(Duration::from_secs(5)),
5186 retries: 1,
5187 check: HealthCheck::Command {
5188 command: "true".to_string(),
5189 },
5190 };
5191
5192 // Build a ServiceInstance with NO proxy_manager and NO overlay_manager,
5193 // then wire in the shared health_states map exactly as ServiceManager does.
5194 let mut instance =
5195 ServiceInstance::new("web".to_string(), spec, Arc::clone(&runtime), None);
5196 let health_states: Arc<RwLock<HashMap<String, HealthState>>> =
5197 Arc::new(RwLock::new(HashMap::new()));
5198 instance.set_health_states(Arc::clone(&health_states));
5199
5200 // Drive the real create path (no proxy, MockRuntime IP present but proxy
5201 // absent => proxy_backend is None, hitting the previously-broken branch).
5202 instance.scale_to(1).await.unwrap();
5203
5204 // Poll for the bridged Healthy state (the monitor checks asynchronously
5205 // after its start grace). Bounded so a regression fails fast.
5206 let mut bridged = false;
5207 for _ in 0..100 {
5208 if matches!(
5209 health_states.read().await.get("web"),
5210 Some(HealthState::Healthy)
5211 ) {
5212 bridged = true;
5213 break;
5214 }
5215 tokio::time::sleep(Duration::from_millis(50)).await;
5216 }
5217
5218 assert!(
5219 bridged,
5220 "health_states must receive Healthy for the service even without a \
5221 proxy or overlay IP; the bridge regressed and stabilization would time out"
5222 );
5223 }
5224
5225 // ==================== Job/Cron Integration Tests ====================
5226
5227 fn mock_job_spec() -> ServiceSpec {
5228 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5229 r"
5230version: v1
5231deployment: test
5232services:
5233 backup:
5234 rtype: job
5235 image:
5236 name: backup:latest
5237",
5238 )
5239 .unwrap()
5240 .services
5241 .remove("backup")
5242 .unwrap()
5243 }
5244
5245 fn mock_cron_spec() -> ServiceSpec {
5246 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5247 r#"
5248version: v1
5249deployment: test
5250services:
5251 cleanup:
5252 rtype: cron
5253 schedule: "0 0 * * * * *"
5254 image:
5255 name: cleanup:latest
5256"#,
5257 )
5258 .unwrap()
5259 .services
5260 .remove("cleanup")
5261 .unwrap()
5262 }
5263
5264 #[tokio::test]
5265 async fn test_service_manager_with_job_executor() {
5266 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5267 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5268
5269 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
5270
5271 // Register job
5272 let job_spec = mock_job_spec();
5273 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
5274 .await
5275 .unwrap();
5276
5277 // Trigger job
5278 let exec_id = manager
5279 .trigger_job("backup", JobTrigger::Cli)
5280 .await
5281 .unwrap();
5282
5283 // Give job time to start
5284 tokio::time::sleep(Duration::from_millis(50)).await;
5285
5286 // Check execution exists
5287 let execution = manager.get_job_execution(&exec_id).await;
5288 assert!(execution.is_some());
5289 assert_eq!(execution.unwrap().job_name, "backup");
5290 }
5291
5292 #[tokio::test]
5293 async fn test_service_manager_with_cron_scheduler() {
5294 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5295 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5296 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
5297
5298 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
5299
5300 // Register cron job
5301 let cron_spec = mock_cron_spec();
5302 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
5303 .await
5304 .unwrap();
5305
5306 // List cron jobs
5307 let cron_jobs = manager.list_cron_jobs().await;
5308 assert_eq!(cron_jobs.len(), 1);
5309 assert_eq!(cron_jobs[0].name, "cleanup");
5310 assert!(cron_jobs[0].enabled);
5311 }
5312
5313 #[tokio::test]
5314 async fn test_service_manager_trigger_cron() {
5315 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5316 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5317 let cron_scheduler = Arc::new(CronScheduler::new(job_executor.clone()));
5318
5319 let manager = ServiceManager::new(runtime)
5320 .with_job_executor(job_executor)
5321 .with_cron_scheduler(cron_scheduler);
5322
5323 // Register cron job
5324 let cron_spec = mock_cron_spec();
5325 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
5326 .await
5327 .unwrap();
5328
5329 // Manually trigger the cron job
5330 let exec_id = manager.trigger_cron("cleanup").await.unwrap();
5331 assert!(!exec_id.0.is_empty());
5332 }
5333
5334 #[tokio::test]
5335 async fn test_service_manager_enable_disable_cron() {
5336 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5337 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5338 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
5339
5340 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler);
5341
5342 // Register cron job
5343 let cron_spec = mock_cron_spec();
5344 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
5345 .await
5346 .unwrap();
5347
5348 // Initially enabled
5349 let cron_jobs = manager.list_cron_jobs().await;
5350 assert!(cron_jobs[0].enabled);
5351
5352 // Disable
5353 manager.set_cron_enabled("cleanup", false).await;
5354 let cron_jobs = manager.list_cron_jobs().await;
5355 assert!(!cron_jobs[0].enabled);
5356
5357 // Re-enable
5358 manager.set_cron_enabled("cleanup", true).await;
5359 let cron_jobs = manager.list_cron_jobs().await;
5360 assert!(cron_jobs[0].enabled);
5361 }
5362
5363 #[tokio::test]
5364 async fn test_service_manager_remove_cleans_up_job() {
5365 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5366 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5367
5368 let manager = ServiceManager::new(runtime).with_job_executor(job_executor.clone());
5369
5370 // Register job
5371 let job_spec = mock_job_spec();
5372 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
5373 .await
5374 .unwrap();
5375
5376 // Verify job is registered
5377 let spec = job_executor.get_job_spec("backup").await;
5378 assert!(spec.is_some());
5379
5380 // Remove job
5381 manager.remove_service("backup").await.unwrap();
5382
5383 // Verify job is unregistered
5384 let spec = job_executor.get_job_spec("backup").await;
5385 assert!(spec.is_none());
5386 }
5387
5388 #[tokio::test]
5389 async fn test_service_manager_remove_cleans_up_cron() {
5390 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5391 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5392 let cron_scheduler = Arc::new(CronScheduler::new(job_executor));
5393
5394 let manager = ServiceManager::new(runtime).with_cron_scheduler(cron_scheduler.clone());
5395
5396 // Register cron job
5397 let cron_spec = mock_cron_spec();
5398 Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec))
5399 .await
5400 .unwrap();
5401
5402 // Verify cron job is registered
5403 assert_eq!(cron_scheduler.job_count().await, 1);
5404
5405 // Remove cron job
5406 manager.remove_service("cleanup").await.unwrap();
5407
5408 // Verify cron job is unregistered
5409 assert_eq!(cron_scheduler.job_count().await, 0);
5410 }
5411
5412 #[tokio::test]
5413 async fn test_service_manager_job_without_executor() {
5414 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5415 let manager = ServiceManager::new(runtime);
5416
5417 // Try to trigger job without executor configured
5418 let result = manager.trigger_job("nonexistent", JobTrigger::Cli).await;
5419 assert!(result.is_err());
5420 assert!(result.unwrap_err().to_string().contains("not configured"));
5421 }
5422
5423 #[tokio::test]
5424 async fn test_service_manager_cron_without_scheduler() {
5425 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5426 let manager = ServiceManager::new(runtime);
5427
5428 // Try to register cron job without scheduler configured
5429 let cron_spec = mock_cron_spec();
5430 let result = Box::pin(manager.upsert_service("cleanup".to_string(), cron_spec)).await;
5431 assert!(result.is_err());
5432 assert!(result.unwrap_err().to_string().contains("not configured"));
5433 }
5434
5435 #[tokio::test]
5436 async fn test_service_manager_list_job_executions() {
5437 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5438 let job_executor = Arc::new(JobExecutor::new(runtime.clone()));
5439
5440 let manager = ServiceManager::new(runtime).with_job_executor(job_executor);
5441
5442 // Register job
5443 let job_spec = mock_job_spec();
5444 Box::pin(manager.upsert_service("backup".to_string(), job_spec))
5445 .await
5446 .unwrap();
5447
5448 // Trigger job twice
5449 manager
5450 .trigger_job("backup", JobTrigger::Cli)
5451 .await
5452 .unwrap();
5453 manager
5454 .trigger_job("backup", JobTrigger::Scheduler)
5455 .await
5456 .unwrap();
5457
5458 // Give jobs time to start
5459 tokio::time::sleep(Duration::from_millis(50)).await;
5460
5461 // List executions
5462 let executions = manager.list_job_executions("backup").await;
5463 assert_eq!(executions.len(), 2);
5464 }
5465
5466 // ==================== Container Supervisor Integration Tests ====================
5467
5468 #[tokio::test]
5469 async fn test_service_manager_with_supervisor() {
5470 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5471 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
5472
5473 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
5474
5475 // Add service
5476 let spec = mock_spec();
5477 Box::pin(manager.upsert_service("api".to_string(), spec))
5478 .await
5479 .unwrap();
5480
5481 // Scale up - containers should be registered with supervisor
5482 manager.scale_service("api", 2).await.unwrap();
5483
5484 // Verify containers are supervised
5485 assert_eq!(supervisor.supervised_count().await, 2);
5486
5487 // Scale down - containers should be unregistered
5488 manager.scale_service("api", 1).await.unwrap();
5489 assert_eq!(supervisor.supervised_count().await, 1);
5490
5491 // Remove service - remaining containers should be unregistered
5492 manager.remove_service("api").await.unwrap();
5493 assert_eq!(supervisor.supervised_count().await, 0);
5494 }
5495
5496 #[tokio::test]
5497 async fn test_service_manager_supervisor_state() {
5498 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5499 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
5500
5501 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor);
5502
5503 // Add and scale service
5504 let spec = mock_spec();
5505 Box::pin(manager.upsert_service("web".to_string(), spec))
5506 .await
5507 .unwrap();
5508 manager.scale_service("web", 1).await.unwrap();
5509
5510 // Check supervised state
5511 let container_id = ContainerId::new("web".to_string(), 1);
5512 let state = manager.get_container_supervised_state(&container_id).await;
5513 assert_eq!(state, Some(SupervisedState::Running));
5514 }
5515
5516 #[tokio::test]
5517 async fn test_service_manager_start_supervisor() {
5518 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5519 let supervisor = Arc::new(ContainerSupervisor::new(runtime.clone()));
5520
5521 let manager = ServiceManager::new(runtime).with_container_supervisor(supervisor.clone());
5522
5523 // Start the supervisor
5524 let handle = manager.start_container_supervisor().unwrap();
5525
5526 // Give it time to start
5527 tokio::time::sleep(Duration::from_millis(50)).await;
5528 assert!(supervisor.is_running());
5529
5530 // Shutdown
5531 manager.shutdown_container_supervisor();
5532
5533 // Wait for it to stop
5534 tokio::time::timeout(Duration::from_secs(1), handle)
5535 .await
5536 .unwrap()
5537 .unwrap();
5538
5539 assert!(!supervisor.is_running());
5540 }
5541
5542 #[tokio::test]
5543 async fn test_service_manager_supervisor_not_configured() {
5544 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5545 let manager = ServiceManager::new(runtime);
5546
5547 // Try to start supervisor without configuring it
5548 let result = manager.start_container_supervisor();
5549 assert!(result.is_err());
5550 assert!(result.unwrap_err().to_string().contains("not configured"));
5551 }
5552
5553 // ==================== Stream Registry Integration Tests ====================
5554
5555 fn mock_tcp_spec() -> ServiceSpec {
5556 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5557 r"
5558version: v1
5559deployment: test
5560services:
5561 database:
5562 rtype: service
5563 image:
5564 name: postgres:latest
5565 endpoints:
5566 - name: postgresql
5567 protocol: tcp
5568 port: 5432
5569 scale:
5570 mode: fixed
5571 replicas: 1
5572",
5573 )
5574 .unwrap()
5575 .services
5576 .remove("database")
5577 .unwrap()
5578 }
5579
5580 fn mock_udp_spec() -> ServiceSpec {
5581 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5582 r"
5583version: v1
5584deployment: test
5585services:
5586 dns:
5587 rtype: service
5588 image:
5589 name: dns:latest
5590 endpoints:
5591 - name: dns
5592 protocol: udp
5593 port: 53
5594 scale:
5595 mode: fixed
5596 replicas: 1
5597",
5598 )
5599 .unwrap()
5600 .services
5601 .remove("dns")
5602 .unwrap()
5603 }
5604
5605 fn mock_mixed_spec() -> ServiceSpec {
5606 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(
5607 r"
5608version: v1
5609deployment: test
5610services:
5611 mixed:
5612 rtype: service
5613 image:
5614 name: mixed:latest
5615 endpoints:
5616 - name: http
5617 protocol: http
5618 port: 8080
5619 - name: grpc
5620 protocol: tcp
5621 port: 9000
5622 - name: metrics
5623 protocol: udp
5624 port: 8125
5625 scale:
5626 mode: fixed
5627 replicas: 1
5628",
5629 )
5630 .unwrap()
5631 .services
5632 .remove("mixed")
5633 .unwrap()
5634 }
5635
5636 #[tokio::test]
5637 async fn test_service_manager_with_stream_registry_tcp() {
5638 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5639 let stream_registry = Arc::new(StreamRegistry::new());
5640
5641 let mut manager = ServiceManager::new(runtime);
5642 manager.set_stream_registry(stream_registry.clone());
5643 manager.set_deployment_name("test".to_string());
5644
5645 // Add TCP-only service
5646 let spec = mock_tcp_spec();
5647 Box::pin(manager.upsert_service("database".to_string(), spec))
5648 .await
5649 .unwrap();
5650
5651 // Verify TCP route was registered
5652 assert_eq!(stream_registry.tcp_count(), 1);
5653 assert!(stream_registry.tcp_ports().contains(&5432));
5654
5655 // Remove service and verify cleanup
5656 manager.remove_service("database").await.unwrap();
5657 assert_eq!(stream_registry.tcp_count(), 0);
5658 }
5659
5660 #[tokio::test]
5661 async fn test_service_manager_with_stream_registry_udp() {
5662 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5663 let stream_registry = Arc::new(StreamRegistry::new());
5664
5665 let mut manager = ServiceManager::new(runtime);
5666 manager.set_stream_registry(stream_registry.clone());
5667 manager.set_deployment_name("test".to_string());
5668
5669 // Add UDP-only service
5670 let spec = mock_udp_spec();
5671 Box::pin(manager.upsert_service("dns".to_string(), spec))
5672 .await
5673 .unwrap();
5674
5675 // Verify UDP route was registered
5676 assert_eq!(stream_registry.udp_count(), 1);
5677 assert!(stream_registry.udp_ports().contains(&53));
5678
5679 // Remove service and verify cleanup
5680 manager.remove_service("dns").await.unwrap();
5681 assert_eq!(stream_registry.udp_count(), 0);
5682 }
5683
5684 #[tokio::test]
5685 async fn test_service_manager_with_stream_registry_mixed() {
5686 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5687 let stream_registry = Arc::new(StreamRegistry::new());
5688
5689 let mut manager = ServiceManager::new(runtime);
5690 manager.set_stream_registry(stream_registry.clone());
5691 manager.set_deployment_name("test".to_string());
5692
5693 // Add mixed service (HTTP + TCP + UDP)
5694 let spec = mock_mixed_spec();
5695 Box::pin(manager.upsert_service("mixed".to_string(), spec))
5696 .await
5697 .unwrap();
5698
5699 // Verify stream routes were registered
5700 assert_eq!(stream_registry.tcp_count(), 1); // TCP: 9000
5701 assert_eq!(stream_registry.udp_count(), 1); // UDP: 8125
5702
5703 assert!(stream_registry.tcp_ports().contains(&9000));
5704 assert!(stream_registry.udp_ports().contains(&8125));
5705
5706 // Remove service and verify stream cleanup
5707 manager.remove_service("mixed").await.unwrap();
5708 assert_eq!(stream_registry.tcp_count(), 0);
5709 assert_eq!(stream_registry.udp_count(), 0);
5710 }
5711
5712 #[tokio::test]
5713 async fn test_service_manager_stream_registry_builder() {
5714 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5715 let stream_registry = Arc::new(StreamRegistry::new());
5716
5717 // Test builder pattern
5718 let manager = ServiceManager::new(runtime).with_stream_registry(stream_registry.clone());
5719
5720 // Verify stream registry is accessible
5721 assert!(manager.stream_registry().is_some());
5722 }
5723
5724 #[tokio::test]
5725 async fn test_tcp_service_without_stream_registry() {
5726 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5727
5728 // Manager without stream registry
5729 let mut manager = ServiceManager::new(runtime);
5730 manager.set_deployment_name("test".to_string());
5731
5732 // Add TCP service - should log warning but not fail
5733 let spec = mock_tcp_spec();
5734 Box::pin(manager.upsert_service("database".to_string(), spec))
5735 .await
5736 .unwrap();
5737
5738 // No stream registry to check, but service should be tracked
5739 let services = manager.list_services().await;
5740 assert!(services.contains(&"database".to_string()));
5741 }
5742
5743 /// Verify `collect_endpoint_backends` filters containers by
5744 /// `EndpointSpec.target_role`.
5745 ///
5746 /// Given two replica groups (`primary` × 1, `read` × 2) and two
5747 /// endpoints — one with `target_role: primary` and one with
5748 /// `target_role: read` — each endpoint should receive only the
5749 /// matching containers' overlay addresses. The legacy no-filter
5750 /// endpoint (`target_role: None`) should receive all of them.
5751 #[tokio::test]
5752 #[allow(clippy::too_many_lines)]
5753 async fn test_collect_endpoint_backends_respects_target_role() {
5754 use crate::runtime::Container;
5755 use std::collections::HashMap as StdHashMap;
5756 use std::net::{IpAddr, Ipv4Addr};
5757 use zlayer_spec::{
5758 EndpointSpec, ExposeType, GroupAffinity, Protocol, ReplicaGroup, ScaleSpec,
5759 };
5760
5761 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
5762 let manager = ServiceManager::new(runtime.clone());
5763
5764 // Build a spec with replica_groups and three endpoints:
5765 // - "write" targets role "primary"
5766 // - "read" targets role "read"
5767 // - "any" has no target_role (legacy)
5768 let mut spec = mock_spec();
5769 spec.replica_groups = Some(vec![
5770 ReplicaGroup {
5771 role: "primary".to_string(),
5772 count: 1,
5773 image: None,
5774 env: StdHashMap::new(),
5775 command: None,
5776 resources: None,
5777 affinity: GroupAffinity::default(),
5778 },
5779 ReplicaGroup {
5780 role: "read".to_string(),
5781 count: 2,
5782 image: None,
5783 env: StdHashMap::new(),
5784 command: None,
5785 resources: None,
5786 affinity: GroupAffinity::default(),
5787 },
5788 ]);
5789 spec.scale = ScaleSpec::Fixed { replicas: 3 };
5790 spec.endpoints = vec![
5791 EndpointSpec {
5792 name: "write".to_string(),
5793 protocol: Protocol::Tcp,
5794 port: 5432,
5795 target_port: Some(5432),
5796 path: None,
5797 host: None,
5798 expose: ExposeType::Internal,
5799 stream: None,
5800 tunnel: None,
5801 target_role: Some("primary".to_string()),
5802 },
5803 EndpointSpec {
5804 name: "read".to_string(),
5805 protocol: Protocol::Tcp,
5806 port: 5433,
5807 target_port: Some(5432),
5808 path: None,
5809 host: None,
5810 expose: ExposeType::Internal,
5811 stream: None,
5812 tunnel: None,
5813 target_role: Some("read".to_string()),
5814 },
5815 EndpointSpec {
5816 name: "any".to_string(),
5817 protocol: Protocol::Tcp,
5818 port: 5434,
5819 target_port: Some(5432),
5820 path: None,
5821 host: None,
5822 expose: ExposeType::Internal,
5823 stream: None,
5824 tunnel: None,
5825 target_role: None,
5826 },
5827 ];
5828
5829 let instance = ServiceInstance::new(
5830 "postgres".to_string(),
5831 spec.clone(),
5832 runtime,
5833 None, // overlay_manager — not exercised by this test
5834 );
5835
5836 // Inject three containers directly: one primary, two read replicas.
5837 let cid_primary = ContainerId::with_role_and_node("postgres", 1, "primary", 0);
5838 let cid_first_read = ContainerId::with_role_and_node("postgres", 2, "read", 0);
5839 let cid_second_read = ContainerId::with_role_and_node("postgres", 3, "read", 0);
5840
5841 let ip_primary = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 1));
5842 let ip_first_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 2));
5843 let ip_second_read = IpAddr::V4(Ipv4Addr::new(10, 200, 0, 3));
5844
5845 {
5846 let mut containers = instance.containers().write().await;
5847 containers.insert(
5848 cid_primary.clone(),
5849 Container {
5850 id: cid_primary,
5851 image: spec.image.name.to_string(),
5852 state: crate::runtime::ContainerState::Running,
5853 pid: None,
5854 task: None,
5855 overlay_ip: Some(ip_primary),
5856 health_monitor: None,
5857 port_override: None,
5858 },
5859 );
5860 containers.insert(
5861 cid_first_read.clone(),
5862 Container {
5863 id: cid_first_read,
5864 image: spec.image.name.to_string(),
5865 state: crate::runtime::ContainerState::Running,
5866 pid: None,
5867 task: None,
5868 overlay_ip: Some(ip_first_read),
5869 health_monitor: None,
5870 port_override: None,
5871 },
5872 );
5873 containers.insert(
5874 cid_second_read.clone(),
5875 Container {
5876 id: cid_second_read,
5877 image: spec.image.name.to_string(),
5878 state: crate::runtime::ContainerState::Running,
5879 pid: None,
5880 task: None,
5881 overlay_ip: Some(ip_second_read),
5882 health_monitor: None,
5883 port_override: None,
5884 },
5885 );
5886 }
5887
5888 let write_ep = &spec.endpoints[0];
5889 let read_ep = &spec.endpoints[1];
5890 let any_ep = &spec.endpoints[2];
5891
5892 let write_backends = manager.collect_endpoint_backends(&instance, write_ep).await;
5893 let read_backends = manager.collect_endpoint_backends(&instance, read_ep).await;
5894 let any_backends = manager.collect_endpoint_backends(&instance, any_ep).await;
5895
5896 // write endpoint -> only the primary container
5897 assert_eq!(write_backends.len(), 1, "write should match only primary");
5898 assert!(
5899 write_backends.iter().any(|a| a.ip() == ip_primary),
5900 "write backends missing primary IP: {write_backends:?}"
5901 );
5902
5903 // read endpoint -> both read containers, no primary
5904 assert_eq!(
5905 read_backends.len(),
5906 2,
5907 "read should match both read replicas"
5908 );
5909 assert!(read_backends.iter().any(|a| a.ip() == ip_first_read));
5910 assert!(read_backends.iter().any(|a| a.ip() == ip_second_read));
5911 assert!(
5912 !read_backends.iter().any(|a| a.ip() == ip_primary),
5913 "read backends must not contain primary: {read_backends:?}"
5914 );
5915
5916 // legacy endpoint (target_role = None) -> every container
5917 assert_eq!(
5918 any_backends.len(),
5919 3,
5920 "any-role endpoint should see all containers"
5921 );
5922 }
5923
5924 /// Build a `ServiceInstance` whose spec carries a deployment, so the
5925 /// instance's deployment-scoping helpers (`dns_hostnames` /
5926 /// `dns_search_domain`) are exercised through the real construction path
5927 /// (the constructors capture `spec.deployment`).
5928 fn instance_in_deployment(service: &str, deployment: &str) -> ServiceInstance {
5929 let mut spec = ServiceSpec::minimal(service, "postgres:16-alpine");
5930 spec.deployment = Some(deployment.to_string());
5931 ServiceInstance::new(
5932 service.to_string(),
5933 spec,
5934 Arc::new(MockRuntime::new()),
5935 None,
5936 )
5937 }
5938
5939 /// Register every hostname an instance would register into a real
5940 /// [`DnsServer`]'s authority, then resolve a name through that authority and
5941 /// assert the answer. Uses the DNS handle's direct authority lookup (no UDP
5942 /// roundtrip — a blocking sync DNS client inside a tokio runtime would
5943 /// deadlock the current-thread executor), so the test exercises the actual
5944 /// record store deterministically.
5945 async fn resolve_through_authority(
5946 handle: &zlayer_overlay::DnsHandle,
5947 fqdn: &str,
5948 ) -> Option<IpAddr> {
5949 handle.lookup_a(fqdn).await
5950 }
5951
5952 /// Bug 6 (the CRITICAL leak): two DIFFERENT deployments each with a service
5953 /// named `postgres`, registered into ONE daemon-global DNS authority, must
5954 /// resolve to their OWN instance's IP — no last-writer-wins clobber across
5955 /// deployments. The deployment-scoped FQDNs are what a guest's
5956 /// `search <D>.zlayer.local` expands the bare `postgres` / `postgres.service`
5957 /// queries into, so distinct scoped keys are exactly what prevents the leak.
5958 #[tokio::test]
5959 async fn deployment_scoped_dns_no_cross_deployment_clobber() {
5960 use zlayer_overlay::DnsServer;
5961
5962 let bind = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
5963 let server = DnsServer::new(bind, "zlayer.local.").expect("build DNS server");
5964 let handle = server.handle();
5965
5966 let inst_a = instance_in_deployment("postgres", "deploy-a");
5967 let inst_b = instance_in_deployment("postgres", "deploy-b");
5968
5969 let cid = ContainerId::with_role_and_node("postgres", 1, "default", 0);
5970 let ip_a = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 0, 11));
5971 let ip_b = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 0, 22));
5972
5973 // Register every name each instance would register for this container,
5974 // in deployment order, into the SAME authority.
5975 for name in inst_a.dns_hostnames(&cid) {
5976 handle.add_record(&name, ip_a).await.expect("add a record");
5977 }
5978 for name in inst_b.dns_hostnames(&cid) {
5979 handle.add_record(&name, ip_b).await.expect("add b record");
5980 }
5981
5982 // The deployment-scoped FQDNs stay distinct and resolve to their OWN IP.
5983 assert_eq!(
5984 resolve_through_authority(&handle, "postgres.deploy-a.zlayer.local").await,
5985 Some(ip_a),
5986 "deploy-a's postgres must resolve to deploy-a's IP"
5987 );
5988 assert_eq!(
5989 resolve_through_authority(&handle, "postgres.deploy-b.zlayer.local").await,
5990 Some(ip_b),
5991 "deploy-b's postgres must resolve to deploy-b's IP (no clobber)"
5992 );
5993 // `<svc>.service` form, scoped per deployment.
5994 assert_eq!(
5995 resolve_through_authority(&handle, "postgres.service.deploy-a.zlayer.local").await,
5996 Some(ip_a),
5997 );
5998 assert_eq!(
5999 resolve_through_authority(&handle, "postgres.service.deploy-b.zlayer.local").await,
6000 Some(ip_b),
6001 );
6002 }
6003
6004 /// Bug 6 part (a): two services in the SAME deployment resolve each other by
6005 /// their deployment-scoped names.
6006 #[tokio::test]
6007 async fn deployment_scoped_dns_same_deployment_siblings_resolve() {
6008 use zlayer_overlay::DnsServer;
6009
6010 let bind = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
6011 let server = DnsServer::new(bind, "zlayer.local.").expect("build DNS server");
6012 let handle = server.handle();
6013
6014 let db = instance_in_deployment("db", "myapp");
6015 let cache = instance_in_deployment("cache", "myapp");
6016 let cid = ContainerId::with_role_and_node("x", 1, "default", 0);
6017 let ip_db = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 1, 1));
6018 let ip_cache = IpAddr::V4(std::net::Ipv4Addr::new(10, 200, 1, 2));
6019
6020 for name in db.dns_hostnames(&cid) {
6021 handle.add_record(&name, ip_db).await.expect("add db");
6022 }
6023 for name in cache.dns_hostnames(&cid) {
6024 handle.add_record(&name, ip_cache).await.expect("add cache");
6025 }
6026
6027 // Within deployment `myapp`, the search-expanded sibling FQDNs resolve
6028 // to the same deployment's instances.
6029 assert_eq!(
6030 resolve_through_authority(&handle, "db.myapp.zlayer.local").await,
6031 Some(ip_db)
6032 );
6033 assert_eq!(
6034 resolve_through_authority(&handle, "cache.myapp.zlayer.local").await,
6035 Some(ip_cache)
6036 );
6037 }
6038
6039 /// Bug 6 part (b): the per-deployment resolv.conf `search` value is emitted
6040 /// correctly (deployment scope first, bare zone last for cross-deployment
6041 /// FQDN + global names), and is absent for a non-deployment instance.
6042 #[test]
6043 fn dns_search_domain_is_deployment_scoped() {
6044 let scoped = instance_in_deployment("api", "deploy-a");
6045 assert_eq!(
6046 scoped.dns_search_domain("zlayer.local"),
6047 Some("deploy-a.zlayer.local zlayer.local".to_string()),
6048 );
6049 // Trailing dot on the zone is normalized away.
6050 assert_eq!(
6051 scoped.dns_search_domain("zlayer.local."),
6052 Some("deploy-a.zlayer.local zlayer.local".to_string()),
6053 );
6054
6055 // No deployment => no override (caller falls back to the global zone).
6056 let unscoped = ServiceInstance::new(
6057 "api".to_string(),
6058 ServiceSpec::minimal("api", "nginx:latest"),
6059 Arc::new(MockRuntime::new()),
6060 None,
6061 );
6062 assert_eq!(unscoped.dns_search_domain("zlayer.local"), None);
6063 }
6064
6065 /// The deployment-scoped hostnames include both the bare-name scope
6066 /// (`<svc>.<D>`) and the `<svc>.service.<D>` form, and the legacy unscoped
6067 /// names are still emitted for native / compose back-compat.
6068 #[test]
6069 fn dns_hostnames_emit_scoped_and_legacy_families() {
6070 let inst = instance_in_deployment("postgres", "myapp");
6071 let cid = ContainerId::with_role_and_node("postgres", 2, "default", 0);
6072 let names = inst.dns_hostnames(&cid);
6073
6074 // Deployment-scoped family.
6075 assert!(names.contains(&"postgres.myapp".to_string()));
6076 assert!(names.contains(&"postgres.service.myapp".to_string()));
6077 assert!(names.contains(&"postgres.myapp.service".to_string()));
6078 assert!(names.contains(&"2.postgres.service.myapp".to_string()));
6079
6080 // Legacy / unscoped family (back-compat).
6081 assert!(names.contains(&"postgres".to_string()));
6082 assert!(names.contains(&"postgres.service.local".to_string()));
6083 assert!(names.contains(&"2.postgres.service.local".to_string()));
6084 }
6085
6086 /// Minimal [`Cluster`] stub for the external-domain DNS tests: only
6087 /// `select_ingress_overlay_ip` carries behavior (returns the configured
6088 /// peer overlay IP); every other method is an unreachable stub since these
6089 /// tests never exercise scaling/placement.
6090 struct IngressPeerCluster {
6091 ingress_overlay_ip: Option<String>,
6092 }
6093
6094 #[async_trait::async_trait]
6095 impl zlayer_scheduler::cluster::Cluster for IngressPeerCluster {
6096 fn node_id(&self) -> u64 {
6097 2
6098 }
6099 async fn select_ingress_overlay_ip(&self) -> Option<String> {
6100 self.ingress_overlay_ip.clone()
6101 }
6102 async fn is_leader(&self) -> bool {
6103 false
6104 }
6105 async fn leader_addr(&self) -> Option<std::net::SocketAddr> {
6106 None
6107 }
6108 async fn list_nodes(&self) -> Vec<zlayer_scheduler::cluster::NodeRecord> {
6109 Vec::new()
6110 }
6111 async fn dispatch_scale(
6112 &self,
6113 _target: u64,
6114 _req: zlayer_scheduler::cluster::InternalScaleRequest,
6115 ) -> Result<(), zlayer_scheduler::cluster::ClusterError> {
6116 unreachable!("not exercised by external-domain DNS tests")
6117 }
6118 async fn forward_scale(
6119 &self,
6120 _req: zlayer_scheduler::cluster::InternalScaleRequest,
6121 ) -> Result<(), zlayer_scheduler::cluster::ClusterError> {
6122 unreachable!("not exercised by external-domain DNS tests")
6123 }
6124 async fn place_container(
6125 &self,
6126 _spec: &zlayer_spec::ServiceSpec,
6127 ) -> Result<
6128 Option<zlayer_scheduler::cluster::ContainerPlacement>,
6129 zlayer_scheduler::cluster::ClusterError,
6130 > {
6131 unreachable!("not exercised by external-domain DNS tests")
6132 }
6133 }
6134
6135 /// A `ServiceSpec` whose single endpoint carries an external vhost domain.
6136 fn mock_spec_with_host(host: &str) -> ServiceSpec {
6137 serde_yaml::from_str::<zlayer_spec::DeploymentSpec>(&format!(
6138 r"
6139version: v1
6140deployment: test
6141services:
6142 web:
6143 rtype: service
6144 image:
6145 name: test:latest
6146 endpoints:
6147 - name: http
6148 protocol: http
6149 port: 8080
6150 host: {host}
6151 scale:
6152 mode: fixed
6153 replicas: 1
6154"
6155 ))
6156 .unwrap()
6157 .services
6158 .remove("web")
6159 .unwrap()
6160 }
6161
6162 #[tokio::test]
6163 async fn external_domain_registers_host_to_ingress_peer_ip() {
6164 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
6165 let dns = Arc::new(
6166 zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
6167 );
6168 let spec = mock_spec_with_host("app.example.com");
6169 let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
6170 instance.set_dns_server(Arc::clone(&dns));
6171 // THIS node is not the ingress; the cluster names a peer's overlay IP.
6172 instance.set_ingress_enabled(false);
6173 instance.set_cluster(Arc::new(IngressPeerCluster {
6174 ingress_overlay_ip: Some("10.200.0.7".to_string()),
6175 }));
6176
6177 instance.register_external_domains(&dns).await;
6178
6179 // The external domain resolves to the ingress peer's overlay IP.
6180 let handle = dns.handle();
6181 let ip = handle.lookup_a("app.example.com.").await;
6182 assert_eq!(ip, Some("10.200.0.7".parse().unwrap()));
6183 }
6184
6185 #[tokio::test]
6186 async fn external_domain_skips_when_no_ingress_node() {
6187 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
6188 let dns = Arc::new(
6189 zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
6190 );
6191 let spec = mock_spec_with_host("app.example.com");
6192 let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
6193 instance.set_dns_server(Arc::clone(&dns));
6194 // No ingress locally and the cluster has no ingress node → WARN + skip,
6195 // never error, and no record is registered.
6196 instance.set_ingress_enabled(false);
6197 instance.set_cluster(Arc::new(IngressPeerCluster {
6198 ingress_overlay_ip: None,
6199 }));
6200
6201 instance.register_external_domains(&dns).await;
6202
6203 let handle = dns.handle();
6204 assert_eq!(handle.lookup_a("app.example.com.").await, None);
6205 }
6206
6207 #[tokio::test]
6208 async fn external_domain_skips_wildcard_host_patterns() {
6209 let runtime: Arc<dyn Runtime + Send + Sync> = Arc::new(MockRuntime::new());
6210 let dns = Arc::new(
6211 zlayer_overlay::DnsServer::new("127.0.0.1:0".parse().unwrap(), "zlayer.local").unwrap(),
6212 );
6213 // A wildcard host is a routing matcher, not a resolvable name.
6214 let spec = mock_spec_with_host("'*.example.com'");
6215 let mut instance = ServiceInstance::new("web".to_string(), spec, runtime, None);
6216 instance.set_dns_server(Arc::clone(&dns));
6217 instance.set_ingress_enabled(false);
6218 instance.set_cluster(Arc::new(IngressPeerCluster {
6219 ingress_overlay_ip: Some("10.200.0.7".to_string()),
6220 }));
6221
6222 instance.register_external_domains(&dns).await;
6223
6224 let handle = dns.handle();
6225 // Wildcard skipped → nothing registered under the literal pattern.
6226 assert_eq!(handle.lookup_a("*.example.com.").await, None);
6227 }
6228
6229 // -----------------------------------------------------------------------
6230 // Digest persistence + recreate-from-local
6231 // -----------------------------------------------------------------------
6232
6233 /// A `Runtime` test double that records every `source` policy passed to
6234 /// `pull_image_with_policy` and lets a test simulate "image is / isn't
6235 /// present locally" by failing the `LocalOnly` probe. `list_images` reports
6236 /// a fixed digest so `pull_and_refresh_digest` has something to record.
6237 ///
6238 /// Container-lifecycle methods delegate to an inner `MockRuntime` so the
6239 /// double is usable beyond the pull path; only `pull_image_with_policy` and
6240 /// `list_images` carry the recording/branching behaviour under test.
6241 struct RecordingRuntime {
6242 inner: MockRuntime,
6243 /// Every `(image, source_policy)` seen by `pull_image_with_policy`.
6244 pulls: std::sync::Mutex<Vec<(String, zlayer_spec::SourcePolicy)>>,
6245 /// When `false`, a `LocalOnly` pull FAILS (simulating a local miss);
6246 /// any non-`LocalOnly` (remote-capable) pull always succeeds.
6247 local_present: bool,
6248 /// Digest reported by `list_images` for the `test:latest` reference.
6249 digest: Option<String>,
6250 }
6251
6252 impl std::fmt::Debug for RecordingRuntime {
6253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
6254 f.debug_struct("RecordingRuntime")
6255 .field("local_present", &self.local_present)
6256 .field("digest", &self.digest)
6257 .finish_non_exhaustive()
6258 }
6259 }
6260
6261 impl RecordingRuntime {
6262 fn new(local_present: bool, digest: Option<String>) -> Self {
6263 Self {
6264 inner: MockRuntime::new(),
6265 pulls: std::sync::Mutex::new(Vec::new()),
6266 local_present,
6267 digest,
6268 }
6269 }
6270
6271 fn recorded_pulls(&self) -> Vec<(String, zlayer_spec::SourcePolicy)> {
6272 self.pulls.lock().unwrap().clone()
6273 }
6274 }
6275
6276 #[async_trait::async_trait]
6277 impl Runtime for RecordingRuntime {
6278 async fn pull_image(&self, image: &str) -> Result<()> {
6279 self.pull_image_with_policy(
6280 image,
6281 PullPolicy::IfNotPresent,
6282 None,
6283 zlayer_spec::SourcePolicy::default(),
6284 )
6285 .await
6286 }
6287
6288 async fn pull_image_with_policy(
6289 &self,
6290 image: &str,
6291 _policy: PullPolicy,
6292 _auth: Option<&zlayer_spec::RegistryAuth>,
6293 source: zlayer_spec::SourcePolicy,
6294 ) -> Result<()> {
6295 self.pulls.lock().unwrap().push((image.to_string(), source));
6296 // A LocalOnly pull only succeeds when the image is present locally;
6297 // otherwise it errors cleanly (mirroring the registry chain's
6298 // local-miss behaviour). Remote-capable policies always succeed.
6299 if matches!(source, zlayer_spec::SourcePolicy::LocalOnly) && !self.local_present {
6300 return Err(AgentError::PullFailed {
6301 image: image.to_string(),
6302 reason: "source_policy=local_only: not present in any local source".to_string(),
6303 });
6304 }
6305 Ok(())
6306 }
6307
6308 async fn list_images(&self) -> Result<Vec<crate::runtime::ImageInfo>> {
6309 Ok(vec![crate::runtime::ImageInfo {
6310 reference: "test:latest".to_string(),
6311 digest: self.digest.clone(),
6312 size_bytes: None,
6313 }])
6314 }
6315
6316 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
6317 self.inner.create_container(id, spec).await
6318 }
6319 async fn start_container(&self, id: &ContainerId) -> Result<()> {
6320 self.inner.start_container(id).await
6321 }
6322 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
6323 self.inner.stop_container(id, timeout).await
6324 }
6325 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
6326 self.inner.remove_container(id).await
6327 }
6328 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
6329 self.inner.container_state(id).await
6330 }
6331 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
6332 self.inner.container_logs(id, tail).await
6333 }
6334 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
6335 self.inner.exec(id, cmd).await
6336 }
6337 async fn get_container_stats(
6338 &self,
6339 id: &ContainerId,
6340 ) -> Result<crate::cgroups_stats::ContainerStats> {
6341 self.inner.get_container_stats(id).await
6342 }
6343 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
6344 self.inner.wait_container(id).await
6345 }
6346 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
6347 self.inner.get_logs(id).await
6348 }
6349 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
6350 self.inner.get_container_pid(id).await
6351 }
6352 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
6353 self.inner.get_container_ip(id).await
6354 }
6355 }
6356
6357 /// A recording [`crate::auth::DeploymentDigestSink`] that captures every
6358 /// `(deployment, service, digest)` tuple persisted after a pull.
6359 #[derive(Debug, Default)]
6360 struct RecordingDigestSink {
6361 records: std::sync::Mutex<Vec<(String, String, String)>>,
6362 }
6363
6364 impl RecordingDigestSink {
6365 fn records(&self) -> Vec<(String, String, String)> {
6366 self.records.lock().unwrap().clone()
6367 }
6368 }
6369
6370 #[async_trait::async_trait]
6371 impl crate::auth::DeploymentDigestSink for RecordingDigestSink {
6372 async fn record(&self, deployment: &str, service: &str, digest: &str) {
6373 self.records.lock().unwrap().push((
6374 deployment.to_string(),
6375 service.to_string(),
6376 digest.to_string(),
6377 ));
6378 }
6379 }
6380
6381 /// (3a) A successful pull records the resolved digest into the deployment
6382 /// store via the digest sink, keyed by `(deployment, service)`.
6383 #[tokio::test]
6384 async fn successful_pull_records_digest_into_sink() {
6385 let runtime = Arc::new(RecordingRuntime::new(
6386 true,
6387 Some("sha256:deadbeef".to_string()),
6388 ));
6389 let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
6390
6391 // The owning deployment name must be stamped on the per-service spec for
6392 // the sink to receive it (the deploy/restore paths stamp this; the
6393 // top-level DeploymentSpec `deployment` is not copied onto each service).
6394 let mut spec = mock_spec();
6395 spec.deployment = Some("test".to_string());
6396 let mut instance = ServiceInstance::new("test".to_string(), spec, runtime_dyn, None);
6397
6398 let sink = Arc::new(RecordingDigestSink::default());
6399 instance.set_digest_sink(Some(sink.clone()));
6400
6401 let digest = instance.pull_and_refresh_digest().await.unwrap();
6402 assert_eq!(digest.as_deref(), Some("sha256:deadbeef"));
6403
6404 let records = sink.records();
6405 assert_eq!(
6406 records,
6407 vec![(
6408 "test".to_string(),
6409 "test".to_string(),
6410 "sha256:deadbeef".to_string()
6411 )],
6412 "a successful pull must persist the resolved digest via the sink"
6413 );
6414 }
6415
6416 /// (3b) When a service carries a restore pin AND its image is present
6417 /// locally, the pull resolves STRICTLY locally (`SourcePolicy::LocalOnly`)
6418 /// and NEVER falls through to a remote/S3-capable policy.
6419 #[tokio::test]
6420 async fn restore_with_local_image_resolves_locally_only() {
6421 // local_present = true: the LocalOnly probe succeeds.
6422 let runtime = Arc::new(RecordingRuntime::new(
6423 true,
6424 Some("sha256:pinned".to_string()),
6425 ));
6426 let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
6427
6428 let spec = mock_spec();
6429 let instance = ServiceInstance::new("test".to_string(), spec, runtime_dyn, None);
6430
6431 // Pin the digest as the restore path does.
6432 instance
6433 .set_restore_pin(Some("sha256:pinned".to_string()))
6434 .await;
6435
6436 instance.pull_and_refresh_digest().await.unwrap();
6437
6438 let pulls = runtime.recorded_pulls();
6439 assert_eq!(
6440 pulls.len(),
6441 1,
6442 "a locally-present pinned image must be pulled exactly once"
6443 );
6444 assert!(
6445 matches!(pulls[0].1, zlayer_spec::SourcePolicy::LocalOnly),
6446 "restore-from-local must resolve via LocalOnly, got {:?}",
6447 pulls[0].1
6448 );
6449 assert!(
6450 !pulls
6451 .iter()
6452 .any(|(_, src)| !matches!(src, zlayer_spec::SourcePolicy::LocalOnly)),
6453 "no remote/S3-capable pull may occur when the image is local: {pulls:?}"
6454 );
6455 }
6456
6457 /// Restore counterpart: when the pinned image is NOT present locally, the
6458 /// pull falls back to the spec's (remote-capable) source policy so a
6459 /// genuinely-absent image still resolves normally.
6460 #[tokio::test]
6461 async fn restore_with_missing_local_image_falls_back_to_remote() {
6462 // local_present = false: the LocalOnly probe fails (local miss).
6463 let runtime = Arc::new(RecordingRuntime::new(
6464 false,
6465 Some("sha256:fresh".to_string()),
6466 ));
6467 let runtime_dyn: Arc<dyn Runtime + Send + Sync> = runtime.clone();
6468
6469 let spec = mock_spec();
6470 let instance = ServiceInstance::new("test".to_string(), spec, runtime_dyn, None);
6471 instance
6472 .set_restore_pin(Some("sha256:pinned".to_string()))
6473 .await;
6474
6475 instance.pull_and_refresh_digest().await.unwrap();
6476
6477 let pulls = runtime.recorded_pulls();
6478 assert_eq!(
6479 pulls.len(),
6480 2,
6481 "a local miss must try LocalOnly then fall back: {pulls:?}"
6482 );
6483 assert!(
6484 matches!(pulls[0].1, zlayer_spec::SourcePolicy::LocalOnly),
6485 "the first probe must be LocalOnly"
6486 );
6487 // mock_spec sets no source_policy → default (LocalFirst), which is
6488 // remote-capable.
6489 assert!(
6490 !matches!(pulls[1].1, zlayer_spec::SourcePolicy::LocalOnly),
6491 "the fallback must use the remote-capable spec policy, got {:?}",
6492 pulls[1].1
6493 );
6494 }
6495}