Skip to main content

zlayer_agent/
stabilization.rs

1//! Deployment stabilization polling.
2//!
3//! Provides a reusable function that waits for all services in a deployment to
4//! reach their desired replica count and pass health checks, or time out.
5//!
6//! This module lives in `zlayer-agent` (a library crate) so that both the
7//! runtime binary and the API server can share the same stabilization logic
8//! instead of duplicating it.
9
10use std::time::{Duration, Instant};
11
12use crate::health::HealthState;
13use crate::service::ServiceManager;
14use zlayer_spec::{DeploymentSpec, Protocol, ScaleSpec};
15
16/// Per-service health summary returned by stabilization polling.
17#[derive(Debug, Clone, serde::Serialize)]
18pub struct ServiceHealthSummary {
19    /// Service name
20    pub name: String,
21    /// Running replica count
22    pub running: u32,
23    /// Desired replica count from the spec
24    pub desired: u32,
25    /// Whether health checks are passing for all running replicas
26    pub healthy: bool,
27    /// Endpoint URLs for this service (e.g. "<http://localhost:8080>")
28    pub endpoints: Vec<String>,
29}
30
31/// Outcome of the stabilization wait.
32///
33/// This is intentionally decoupled from `DeploymentStatus` (which lives in
34/// `zlayer-api`) to avoid circular dependencies. Callers should map this to
35/// their own status types.
36#[derive(Debug, Clone)]
37pub enum StabilizationOutcome {
38    /// All services reached their desired state within the timeout.
39    Ready,
40    /// The timeout expired before all services stabilized.
41    TimedOut {
42        /// Human-readable description of which services were not ready.
43        message: String,
44    },
45}
46
47/// Result of waiting for a deployment to stabilize.
48#[derive(Debug, Clone)]
49pub struct StabilizationResult {
50    /// Whether stabilization succeeded or timed out
51    pub outcome: StabilizationOutcome,
52    /// Per-service health summaries (always populated regardless of outcome)
53    pub services: Vec<ServiceHealthSummary>,
54}
55
56/// Wait for all services in a deployment to reach their desired replica count
57/// and pass health checks, or time out.
58///
59/// Polls every 500ms for up to `timeout`. Returns [`StabilizationOutcome::Ready`]
60/// if all services reach their desired state, or [`StabilizationOutcome::TimedOut`]
61/// if the timeout expires.
62pub async fn wait_for_stabilization(
63    manager: &ServiceManager,
64    spec: &DeploymentSpec,
65    timeout: Duration,
66) -> StabilizationResult {
67    let poll_interval = Duration::from_millis(500);
68    let start = Instant::now();
69
70    loop {
71        let mut all_ready = true;
72        let mut summaries = Vec::with_capacity(spec.services.len());
73
74        for (name, service_spec) in &spec.services {
75            let desired = match &service_spec.scale {
76                ScaleSpec::Fixed { replicas } => *replicas,
77                ScaleSpec::Adaptive { min, .. } => *min,
78                ScaleSpec::Manual => 0,
79            };
80
81            #[allow(clippy::cast_possible_truncation)]
82            let running = match manager.service_replica_count(name).await {
83                Ok(count) => count as u32,
84                Err(_) => 0,
85            };
86
87            // Check health states from the manager
88            let health_states = manager.health_states();
89            let states = health_states.read().await;
90            let healthy = match states.get(name) {
91                Some(HealthState::Healthy) => true,
92                _ if desired == 0 => true, // Manual scaling / 0 replicas is trivially healthy
93                _ => false,
94            };
95            drop(states);
96
97            let service_ready = running == desired && healthy;
98            if !service_ready && desired > 0 {
99                all_ready = false;
100            }
101
102            // Build endpoint URLs from the spec
103            let endpoints: Vec<String> = service_spec
104                .endpoints
105                .iter()
106                .map(|ep| {
107                    let proto = match ep.protocol {
108                        Protocol::Http => "http",
109                        Protocol::Https => "https",
110                        Protocol::Tcp => "tcp",
111                        Protocol::Udp => "udp",
112                        Protocol::Websocket => "ws",
113                    };
114                    format!("{}://localhost:{}", proto, ep.port)
115                })
116                .collect();
117
118            summaries.push(ServiceHealthSummary {
119                name: name.clone(),
120                running,
121                desired,
122                healthy,
123                endpoints,
124            });
125        }
126
127        if all_ready {
128            return StabilizationResult {
129                outcome: StabilizationOutcome::Ready,
130                services: summaries,
131            };
132        }
133
134        if start.elapsed() >= timeout {
135            // Build a failure message from unhealthy services, including
136            // the tail of each failing service's container logs so the
137            // user sees the real cause (e.g. "GLIBC_2.38 not found",
138            // "failed to prepare rootfs", "panicked at ...") instead of
139            // just "1/1 replicas, healthy=false".
140            let failing: Vec<&ServiceHealthSummary> = summaries
141                .iter()
142                .filter(|s| (s.running != s.desired || !s.healthy) && s.desired > 0)
143                .collect();
144
145            let mut parts: Vec<String> = Vec::with_capacity(failing.len());
146            for s in &failing {
147                let header = format!(
148                    "{}: {}/{} replicas, healthy={}",
149                    s.name, s.running, s.desired, s.healthy
150                );
151                // Try to fetch the last 20 log lines from this service's
152                // replicas. A miss (no containers, runtime error) falls
153                // back to just the header so we never block the error
154                // on log retrieval.
155                match manager.get_service_logs(&s.name, 20, None).await {
156                    Ok(entries) if !entries.is_empty() => {
157                        let body = entries
158                            .iter()
159                            .map(|e| format!("    {}", e.message))
160                            .collect::<Vec<_>>()
161                            .join("\n");
162                        parts.push(format!("{header}\n  logs:\n{body}"));
163                    }
164                    _ => parts.push(header),
165                }
166            }
167
168            let message = if parts.is_empty() {
169                "Stabilization timed out".to_string()
170            } else {
171                format!("Stabilization timed out:\n  {}", parts.join("\n  "))
172            };
173
174            return StabilizationResult {
175                outcome: StabilizationOutcome::TimedOut { message },
176                services: summaries,
177            };
178        }
179
180        tokio::time::sleep(poll_interval).await;
181    }
182}