Skip to main content

zlayer_agent/
stabilization.rs

1//! Deployment stabilization polling.
2//!
3//! Provides a reusable function that waits for all services in a deployment to
4//! reach their desired replica count and pass health checks, or time out.
5//!
6//! This module lives in `zlayer-agent` (a library crate) so that both the
7//! runtime binary and the API server can share the same stabilization logic
8//! instead of duplicating it.
9
10use std::time::{Duration, Instant};
11
12use crate::service::ServiceManager;
13use zlayer_spec::{DeploymentSpec, Protocol, ScaleSpec};
14
15/// Per-service health summary returned by stabilization polling.
16#[derive(Debug, Clone, serde::Serialize)]
17pub struct ServiceHealthSummary {
18    /// Service name
19    pub name: String,
20    /// Running replica count
21    pub running: u32,
22    /// Desired replica count from the spec
23    pub desired: u32,
24    /// Whether health checks are passing for all running replicas
25    pub healthy: bool,
26    /// Endpoint URLs for this service (e.g. "<http://localhost:8080>")
27    pub endpoints: Vec<String>,
28}
29
30/// Outcome of the stabilization wait.
31///
32/// This is intentionally decoupled from `DeploymentStatus` (which lives in
33/// `zlayer-api`) to avoid circular dependencies. Callers should map this to
34/// their own status types.
35#[derive(Debug, Clone)]
36pub enum StabilizationOutcome {
37    /// All services reached their desired state within the timeout.
38    Ready,
39    /// The timeout expired before all services stabilized.
40    TimedOut {
41        /// Human-readable description of which services were not ready.
42        message: String,
43    },
44}
45
46/// Result of waiting for a deployment to stabilize.
47#[derive(Debug, Clone)]
48pub struct StabilizationResult {
49    /// Whether stabilization succeeded or timed out
50    pub outcome: StabilizationOutcome,
51    /// Per-service health summaries (always populated regardless of outcome)
52    pub services: Vec<ServiceHealthSummary>,
53}
54
55/// Wait for all services in a deployment to reach their desired replica count
56/// and pass health checks, or time out.
57///
58/// Polls every 500ms for up to `timeout`. Returns [`StabilizationOutcome::Ready`]
59/// if all services reach their desired state, or [`StabilizationOutcome::TimedOut`]
60/// if the timeout expires.
61pub async fn wait_for_stabilization(
62    manager: &ServiceManager,
63    spec: &DeploymentSpec,
64    timeout: Duration,
65) -> StabilizationResult {
66    let poll_interval = Duration::from_millis(500);
67    let start = Instant::now();
68
69    loop {
70        let mut all_ready = true;
71        let mut summaries = Vec::with_capacity(spec.services.len());
72
73        for (name, service_spec) in &spec.services {
74            let desired = match &service_spec.scale {
75                ScaleSpec::Fixed { replicas } => *replicas,
76                ScaleSpec::Adaptive { min, .. } => *min,
77                ScaleSpec::Manual => 0,
78            };
79
80            // Cluster-wide: sum running replicas + health across every node the
81            // service is placed on (distributed scaling places replicas on
82            // remote nodes; a leader-local count would read 0 and never
83            // stabilize). Falls back to the local view when not clustered.
84            let node_states = manager.cluster_service_states(name).await;
85            #[allow(clippy::cast_possible_truncation)]
86            let running: u32 = node_states.iter().map(|s| s.running).sum();
87            let healthy = if desired == 0 {
88                true // Manual scaling / 0 replicas is trivially healthy.
89            } else {
90                // Every node with replicas must report healthy; nodes running
91                // none are trivially healthy and don't drag the aggregate.
92                node_states.iter().all(|s| s.healthy)
93            };
94
95            let service_ready = running == desired && healthy;
96            if !service_ready && desired > 0 {
97                all_ready = false;
98            }
99
100            // Build endpoint URLs from the spec
101            let endpoints: Vec<String> = service_spec
102                .endpoints
103                .iter()
104                .map(|ep| {
105                    let proto = match ep.protocol {
106                        Protocol::Http => "http",
107                        Protocol::Https => "https",
108                        Protocol::Tcp => "tcp",
109                        Protocol::Udp => "udp",
110                        Protocol::Websocket => "ws",
111                    };
112                    format!("{}://localhost:{}", proto, ep.port)
113                })
114                .collect();
115
116            summaries.push(ServiceHealthSummary {
117                name: name.clone(),
118                running,
119                desired,
120                healthy,
121                endpoints,
122            });
123        }
124
125        if all_ready {
126            return StabilizationResult {
127                outcome: StabilizationOutcome::Ready,
128                services: summaries,
129            };
130        }
131
132        if start.elapsed() >= timeout {
133            // Build a failure message from unhealthy services, including
134            // the tail of each failing service's container logs so the
135            // user sees the real cause (e.g. "GLIBC_2.38 not found",
136            // "failed to prepare rootfs", "panicked at ...") instead of
137            // just "1/1 replicas, healthy=false".
138            let failing: Vec<&ServiceHealthSummary> = summaries
139                .iter()
140                .filter(|s| (s.running != s.desired || !s.healthy) && s.desired > 0)
141                .collect();
142
143            let mut parts: Vec<String> = Vec::with_capacity(failing.len());
144            for s in &failing {
145                let header = format!(
146                    "{}: {}/{} replicas, healthy={}",
147                    s.name, s.running, s.desired, s.healthy
148                );
149                // Try to fetch the last 20 log lines from this service's
150                // replicas. A miss (no containers, runtime error) falls
151                // back to just the header so we never block the error
152                // on log retrieval.
153                match manager.get_service_logs(&s.name, 20, None).await {
154                    Ok(entries) if !entries.is_empty() => {
155                        let body = entries
156                            .iter()
157                            .map(|e| format!("    {}", e.message))
158                            .collect::<Vec<_>>()
159                            .join("\n");
160                        parts.push(format!("{header}\n  logs:\n{body}"));
161                    }
162                    _ => parts.push(header),
163                }
164            }
165
166            let message = if parts.is_empty() {
167                "Stabilization timed out".to_string()
168            } else {
169                format!("Stabilization timed out:\n  {}", parts.join("\n  "))
170            };
171
172            return StabilizationResult {
173                outcome: StabilizationOutcome::TimedOut { message },
174                services: summaries,
175            };
176        }
177
178        tokio::time::sleep(poll_interval).await;
179    }
180}