use std::time::{Duration, Instant};
use crate::health::HealthState;
use crate::service::ServiceManager;
use zlayer_spec::{DeploymentSpec, Protocol, ScaleSpec};
#[derive(Debug, Clone, serde::Serialize)]
pub struct ServiceHealthSummary {
pub name: String,
pub running: u32,
pub desired: u32,
pub healthy: bool,
pub endpoints: Vec<String>,
}
#[derive(Debug, Clone)]
pub enum StabilizationOutcome {
Ready,
TimedOut {
message: String,
},
}
#[derive(Debug, Clone)]
pub struct StabilizationResult {
pub outcome: StabilizationOutcome,
pub services: Vec<ServiceHealthSummary>,
}
pub async fn wait_for_stabilization(
manager: &ServiceManager,
spec: &DeploymentSpec,
timeout: Duration,
) -> StabilizationResult {
let poll_interval = Duration::from_millis(500);
let start = Instant::now();
loop {
let mut all_ready = true;
let mut summaries = Vec::with_capacity(spec.services.len());
for (name, service_spec) in &spec.services {
let desired = match &service_spec.scale {
ScaleSpec::Fixed { replicas } => *replicas,
ScaleSpec::Adaptive { min, .. } => *min,
ScaleSpec::Manual => 0,
};
#[allow(clippy::cast_possible_truncation)]
let running = match manager.service_replica_count(name).await {
Ok(count) => count as u32,
Err(_) => 0,
};
let health_states = manager.health_states();
let states = health_states.read().await;
let healthy = match states.get(name) {
Some(HealthState::Healthy) => true,
_ if desired == 0 => true, _ => false,
};
drop(states);
let service_ready = running == desired && healthy;
if !service_ready && desired > 0 {
all_ready = false;
}
let endpoints: Vec<String> = service_spec
.endpoints
.iter()
.map(|ep| {
let proto = match ep.protocol {
Protocol::Http => "http",
Protocol::Https => "https",
Protocol::Tcp => "tcp",
Protocol::Udp => "udp",
Protocol::Websocket => "ws",
};
format!("{}://localhost:{}", proto, ep.port)
})
.collect();
summaries.push(ServiceHealthSummary {
name: name.clone(),
running,
desired,
healthy,
endpoints,
});
}
if all_ready {
return StabilizationResult {
outcome: StabilizationOutcome::Ready,
services: summaries,
};
}
if start.elapsed() >= timeout {
let failing: Vec<&ServiceHealthSummary> = summaries
.iter()
.filter(|s| (s.running != s.desired || !s.healthy) && s.desired > 0)
.collect();
let mut parts: Vec<String> = Vec::with_capacity(failing.len());
for s in &failing {
let header = format!(
"{}: {}/{} replicas, healthy={}",
s.name, s.running, s.desired, s.healthy
);
match manager.get_service_logs(&s.name, 20, None).await {
Ok(entries) if !entries.is_empty() => {
let body = entries
.iter()
.map(|e| format!(" {}", e.message))
.collect::<Vec<_>>()
.join("\n");
parts.push(format!("{header}\n logs:\n{body}"));
}
_ => parts.push(header),
}
}
let message = if parts.is_empty() {
"Stabilization timed out".to_string()
} else {
format!("Stabilization timed out:\n {}", parts.join("\n "))
};
return StabilizationResult {
outcome: StabilizationOutcome::TimedOut { message },
services: summaries,
};
}
tokio::time::sleep(poll_interval).await;
}
}