zlayer_agent/stabilization.rs
1//! Deployment stabilization polling.
2//!
3//! Provides a reusable function that waits for all services in a deployment to
4//! reach their desired replica count and pass health checks, or time out.
5//!
6//! This module lives in `zlayer-agent` (a library crate) so that both the
7//! runtime binary and the API server can share the same stabilization logic
8//! instead of duplicating it.
9
10use std::time::{Duration, Instant};
11
12use crate::service::ServiceManager;
13use zlayer_spec::{DeploymentSpec, Protocol, ScaleSpec};
14
15/// Per-service health summary returned by stabilization polling.
16#[derive(Debug, Clone, serde::Serialize)]
17pub struct ServiceHealthSummary {
18 /// Service name
19 pub name: String,
20 /// Running replica count
21 pub running: u32,
22 /// Desired replica count from the spec
23 pub desired: u32,
24 /// Whether health checks are passing for all running replicas
25 pub healthy: bool,
26 /// Endpoint URLs for this service (e.g. "<http://localhost:8080>")
27 pub endpoints: Vec<String>,
28}
29
30/// Outcome of the stabilization wait.
31///
32/// This is intentionally decoupled from `DeploymentStatus` (which lives in
33/// `zlayer-api`) to avoid circular dependencies. Callers should map this to
34/// their own status types.
35#[derive(Debug, Clone)]
36pub enum StabilizationOutcome {
37 /// All services reached their desired state within the timeout.
38 Ready,
39 /// The timeout expired before all services stabilized.
40 TimedOut {
41 /// Human-readable description of which services were not ready.
42 message: String,
43 },
44}
45
46/// Result of waiting for a deployment to stabilize.
47#[derive(Debug, Clone)]
48pub struct StabilizationResult {
49 /// Whether stabilization succeeded or timed out
50 pub outcome: StabilizationOutcome,
51 /// Per-service health summaries (always populated regardless of outcome)
52 pub services: Vec<ServiceHealthSummary>,
53}
54
55/// Wait for all services in a deployment to reach their desired replica count
56/// and pass health checks, or time out.
57///
58/// Polls every 500ms for up to `timeout`. Returns [`StabilizationOutcome::Ready`]
59/// if all services reach their desired state, or [`StabilizationOutcome::TimedOut`]
60/// if the timeout expires.
61pub async fn wait_for_stabilization(
62 manager: &ServiceManager,
63 spec: &DeploymentSpec,
64 timeout: Duration,
65) -> StabilizationResult {
66 let poll_interval = Duration::from_millis(500);
67 let start = Instant::now();
68
69 loop {
70 let mut all_ready = true;
71 let mut summaries = Vec::with_capacity(spec.services.len());
72
73 for (name, service_spec) in &spec.services {
74 let desired = match &service_spec.scale {
75 ScaleSpec::Fixed { replicas } => *replicas,
76 ScaleSpec::Adaptive { min, .. } => *min,
77 ScaleSpec::Manual => 0,
78 };
79
80 // Cluster-wide: sum running replicas + health across every node the
81 // service is placed on (distributed scaling places replicas on
82 // remote nodes; a leader-local count would read 0 and never
83 // stabilize). Falls back to the local view when not clustered.
84 let node_states = manager.cluster_service_states(name).await;
85 #[allow(clippy::cast_possible_truncation)]
86 let running: u32 = node_states.iter().map(|s| s.running).sum();
87 let healthy = if desired == 0 {
88 true // Manual scaling / 0 replicas is trivially healthy.
89 } else {
90 // Every node with replicas must report healthy; nodes running
91 // none are trivially healthy and don't drag the aggregate.
92 node_states.iter().all(|s| s.healthy)
93 };
94
95 let service_ready = running == desired && healthy;
96 if !service_ready && desired > 0 {
97 all_ready = false;
98 }
99
100 // Build endpoint URLs from the spec
101 let endpoints: Vec<String> = service_spec
102 .endpoints
103 .iter()
104 .map(|ep| {
105 let proto = match ep.protocol {
106 Protocol::Http => "http",
107 Protocol::Https => "https",
108 Protocol::Tcp => "tcp",
109 Protocol::Udp => "udp",
110 Protocol::Websocket => "ws",
111 };
112 format!("{}://localhost:{}", proto, ep.port)
113 })
114 .collect();
115
116 summaries.push(ServiceHealthSummary {
117 name: name.clone(),
118 running,
119 desired,
120 healthy,
121 endpoints,
122 });
123 }
124
125 if all_ready {
126 return StabilizationResult {
127 outcome: StabilizationOutcome::Ready,
128 services: summaries,
129 };
130 }
131
132 if start.elapsed() >= timeout {
133 // Build a failure message from unhealthy services, including
134 // the tail of each failing service's container logs so the
135 // user sees the real cause (e.g. "GLIBC_2.38 not found",
136 // "failed to prepare rootfs", "panicked at ...") instead of
137 // just "1/1 replicas, healthy=false".
138 let failing: Vec<&ServiceHealthSummary> = summaries
139 .iter()
140 .filter(|s| (s.running != s.desired || !s.healthy) && s.desired > 0)
141 .collect();
142
143 let mut parts: Vec<String> = Vec::with_capacity(failing.len());
144 for s in &failing {
145 let header = format!(
146 "{}: {}/{} replicas, healthy={}",
147 s.name, s.running, s.desired, s.healthy
148 );
149 // Try to fetch the last 20 log lines from this service's
150 // replicas. A miss (no containers, runtime error) falls
151 // back to just the header so we never block the error
152 // on log retrieval.
153 match manager.get_service_logs(&s.name, 20, None).await {
154 Ok(entries) if !entries.is_empty() => {
155 let body = entries
156 .iter()
157 .map(|e| format!(" {}", e.message))
158 .collect::<Vec<_>>()
159 .join("\n");
160 parts.push(format!("{header}\n logs:\n{body}"));
161 }
162 _ => parts.push(header),
163 }
164 }
165
166 let message = if parts.is_empty() {
167 "Stabilization timed out".to_string()
168 } else {
169 format!("Stabilization timed out:\n {}", parts.join("\n "))
170 };
171
172 return StabilizationResult {
173 outcome: StabilizationOutcome::TimedOut { message },
174 services: summaries,
175 };
176 }
177
178 tokio::time::sleep(poll_interval).await;
179 }
180}