use crate::error::{AgentError, Result};
use crate::health::HealthState;
use crate::runtime::{ContainerId, ContainerState, Runtime};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use zlayer_proxy::ServiceRegistry;
use zlayer_spec::{DependencyCondition, DependsSpec, ServiceSpec, TimeoutAction};
#[derive(Debug, Clone)]
pub enum DependencyError {
CyclicDependency { cycle: Vec<String> },
MissingService { service: String, missing: String },
SelfDependency { service: String },
}
impl std::fmt::Display for DependencyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DependencyError::CyclicDependency { cycle } => {
write!(f, "Cyclic dependency detected: {}", cycle.join(" -> "))
}
DependencyError::MissingService { service, missing } => {
write!(
f,
"Service '{service}' depends on non-existent service '{missing}'"
)
}
DependencyError::SelfDependency { service } => {
write!(f, "Service '{service}' has a self-dependency")
}
}
}
}
impl std::error::Error for DependencyError {}
impl From<DependencyError> for AgentError {
fn from(err: DependencyError) -> Self {
AgentError::InvalidSpec(err.to_string())
}
}
#[derive(Debug, Clone)]
pub struct DependencyNode {
pub service_name: String,
pub depends_on: Vec<DependsSpec>,
}
#[derive(Debug)]
pub struct DependencyGraph {
nodes: HashMap<String, DependencyNode>,
startup_order: Vec<String>,
adjacency: HashMap<String, Vec<String>>,
reverse_adjacency: HashMap<String, Vec<String>>,
}
impl DependencyGraph {
pub fn build(services: &HashMap<String, ServiceSpec>) -> Result<Self> {
let mut nodes = HashMap::new();
let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
let mut reverse_adjacency: HashMap<String, Vec<String>> = HashMap::new();
for (name, spec) in services {
nodes.insert(
name.clone(),
DependencyNode {
service_name: name.clone(),
depends_on: spec.depends.clone(),
},
);
adjacency.insert(name.clone(), Vec::new());
reverse_adjacency.insert(name.clone(), Vec::new());
}
for (name, spec) in services {
for dep in &spec.depends {
if dep.service == *name {
return Err(DependencyError::SelfDependency {
service: name.clone(),
}
.into());
}
if !services.contains_key(&dep.service) {
return Err(DependencyError::MissingService {
service: name.clone(),
missing: dep.service.clone(),
}
.into());
}
adjacency.get_mut(name).unwrap().push(dep.service.clone());
reverse_adjacency
.get_mut(&dep.service)
.unwrap()
.push(name.clone());
}
}
let mut graph = Self {
nodes,
startup_order: Vec::new(),
adjacency,
reverse_adjacency,
};
if let Some(cycle) = graph.detect_cycle() {
return Err(DependencyError::CyclicDependency { cycle }.into());
}
graph.startup_order = graph.topological_sort()?;
Ok(graph)
}
#[must_use]
pub fn detect_cycle(&self) -> Option<Vec<String>> {
let mut color: HashMap<&String, u8> = HashMap::new();
let mut parent: HashMap<&String, Option<&String>> = HashMap::new();
for name in self.nodes.keys() {
color.insert(name, 0);
parent.insert(name, None);
}
for start in self.nodes.keys() {
if color[start] == 0 {
if let Some(cycle) = self.dfs_cycle_detect(start, &mut color, &mut parent) {
return Some(cycle);
}
}
}
None
}
fn dfs_cycle_detect<'a>(
&'a self,
node: &'a String,
color: &mut HashMap<&'a String, u8>,
parent: &mut HashMap<&'a String, Option<&'a String>>,
) -> Option<Vec<String>> {
color.insert(node, 1);
if let Some(deps) = self.adjacency.get(node) {
for dep in deps {
match color.get(dep) {
Some(0) => {
parent.insert(dep, Some(node));
if let Some(cycle) = self.dfs_cycle_detect(dep, color, parent) {
return Some(cycle);
}
}
Some(1) => {
let mut cycle = vec![dep.clone()];
let mut current = node;
while current != dep {
cycle.push(current.clone());
if let Some(Some(p)) = parent.get(current) {
current = p;
} else {
break;
}
}
cycle.push(dep.clone());
cycle.reverse();
return Some(cycle);
}
_ => {
}
}
}
}
color.insert(node, 2);
None
}
#[must_use]
pub fn topological_order(&self) -> Vec<String> {
self.startup_order.clone()
}
fn topological_sort(&self) -> Result<Vec<String>> {
let mut in_degree: HashMap<&String, usize> = HashMap::new();
let mut queue: VecDeque<&String> = VecDeque::new();
let mut result = Vec::new();
for name in self.nodes.keys() {
let degree = self.adjacency.get(name).map_or(0, std::vec::Vec::len);
in_degree.insert(name, degree);
if degree == 0 {
queue.push_back(name);
}
}
while let Some(node) = queue.pop_front() {
result.push(node.clone());
if let Some(dependents) = self.reverse_adjacency.get(node) {
for dependent in dependents {
if let Some(degree) = in_degree.get_mut(dependent) {
*degree -= 1;
if *degree == 0 {
queue.push_back(dependent);
}
}
}
}
}
if result.len() != self.nodes.len() {
return Err(AgentError::InvalidSpec(
"Dependency graph has unresolved cycles".to_string(),
));
}
Ok(result)
}
#[must_use]
pub fn startup_order(&self) -> &[String] {
&self.startup_order
}
#[must_use]
pub fn dependencies(&self, service: &str) -> Option<&[DependsSpec]> {
self.nodes.get(service).map(|n| n.depends_on.as_slice())
}
#[must_use]
pub fn len(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
#[must_use]
pub fn depends_on(&self, a: &str, b: &str) -> bool {
if a == b {
return false;
}
let mut visited = HashSet::new();
let mut stack = vec![a];
while let Some(current) = stack.pop() {
if visited.contains(current) {
continue;
}
visited.insert(current);
if let Some(deps) = self.adjacency.get(current) {
for dep in deps {
if dep == b {
return true;
}
if !visited.contains(dep.as_str()) {
stack.push(dep);
}
}
}
}
false
}
#[must_use]
pub fn dependents(&self, service: &str) -> Vec<String> {
self.reverse_adjacency
.get(service)
.cloned()
.unwrap_or_default()
}
}
pub struct DependencyConditionChecker {
runtime: Arc<dyn Runtime + Send + Sync>,
health_states: Arc<RwLock<HashMap<String, HealthState>>>,
service_registry: Option<Arc<ServiceRegistry>>,
}
impl DependencyConditionChecker {
pub fn new(
runtime: Arc<dyn Runtime + Send + Sync>,
health_states: Arc<RwLock<HashMap<String, HealthState>>>,
service_registry: Option<Arc<ServiceRegistry>>,
) -> Self {
Self {
runtime,
health_states,
service_registry,
}
}
pub async fn check(&self, dep: &DependsSpec) -> Result<bool> {
match dep.condition {
DependencyCondition::Started => self.check_started(&dep.service).await,
DependencyCondition::Healthy => self.check_healthy(&dep.service).await,
DependencyCondition::Ready => self.check_ready(&dep.service).await,
}
}
pub async fn check_started(&self, service: &str) -> Result<bool> {
let id = ContainerId {
service: service.to_string(),
replica: 1,
};
match self.runtime.container_state(&id).await {
Ok(ContainerState::Running) => Ok(true),
Ok(_) | Err(AgentError::NotFound { .. }) => Ok(false),
Err(e) => Err(e), }
}
pub async fn check_healthy(&self, service: &str) -> Result<bool> {
let health_states = self.health_states.read().await;
match health_states.get(service) {
Some(HealthState::Healthy) => Ok(true),
Some(_) | None => Ok(false),
}
}
pub async fn check_ready(&self, service: &str) -> Result<bool> {
if let Some(registry) = &self.service_registry {
let services = registry.list_services().await;
if !services.contains(&service.to_string()) {
return Ok(false);
}
let host = format!("{service}.default");
match registry.resolve(Some(&host), "/").await {
Some(resolved) => {
Ok(!resolved.backends.is_empty())
}
None => {
Ok(false)
}
}
} else {
tracing::warn!(
service = %service,
"No proxy configured for 'ready' condition check, falling back to 'healthy'"
);
self.check_healthy(service).await
}
}
}
#[derive(Debug, Clone)]
pub enum WaitResult {
Satisfied,
TimedOutContinue,
TimedOutWarn {
service: String,
condition: DependencyCondition,
},
TimedOutFail {
service: String,
condition: DependencyCondition,
timeout: Duration,
},
}
impl WaitResult {
#[must_use]
pub fn is_satisfied(&self) -> bool {
matches!(self, WaitResult::Satisfied)
}
#[must_use]
pub fn should_continue(&self) -> bool {
matches!(
self,
WaitResult::Satisfied | WaitResult::TimedOutContinue | WaitResult::TimedOutWarn { .. }
)
}
#[must_use]
pub fn is_failure(&self) -> bool {
matches!(self, WaitResult::TimedOutFail { .. })
}
}
pub struct DependencyWaiter {
condition_checker: DependencyConditionChecker,
poll_interval: Duration,
}
impl DependencyWaiter {
#[must_use]
pub fn new(condition_checker: DependencyConditionChecker) -> Self {
Self {
condition_checker,
poll_interval: Duration::from_secs(1),
}
}
#[must_use]
pub fn with_poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
#[must_use]
pub fn poll_interval(&self) -> Duration {
self.poll_interval
}
pub async fn wait_for_dependency(&self, dep: &DependsSpec) -> Result<WaitResult> {
let timeout = dep.timeout.unwrap_or(Duration::from_secs(300)); let start = std::time::Instant::now();
tracing::info!(
service = %dep.service,
condition = ?dep.condition,
timeout = ?timeout,
"Waiting for dependency"
);
loop {
match self.condition_checker.check(dep).await {
Ok(true) => {
tracing::info!(
service = %dep.service,
condition = ?dep.condition,
elapsed = ?start.elapsed(),
"Dependency condition satisfied"
);
return Ok(WaitResult::Satisfied);
}
Ok(false) => {
tracing::debug!(
service = %dep.service,
condition = ?dep.condition,
elapsed = ?start.elapsed(),
"Dependency condition not yet satisfied"
);
}
Err(e) => {
tracing::warn!(
service = %dep.service,
condition = ?dep.condition,
error = %e,
"Error checking dependency condition"
);
}
}
if start.elapsed() >= timeout {
return Ok(self.handle_timeout(dep, timeout));
}
tokio::time::sleep(self.poll_interval).await;
}
}
#[allow(clippy::unused_self)]
fn handle_timeout(&self, dep: &DependsSpec, timeout: Duration) -> WaitResult {
match dep.on_timeout {
TimeoutAction::Fail => {
tracing::error!(
service = %dep.service,
condition = ?dep.condition,
timeout = ?timeout,
"Dependency timeout - failing startup"
);
WaitResult::TimedOutFail {
service: dep.service.clone(),
condition: dep.condition,
timeout,
}
}
TimeoutAction::Warn => {
tracing::warn!(
service = %dep.service,
condition = ?dep.condition,
timeout = ?timeout,
"Dependency timeout - continuing with warning"
);
WaitResult::TimedOutWarn {
service: dep.service.clone(),
condition: dep.condition,
}
}
TimeoutAction::Continue => {
tracing::info!(
service = %dep.service,
condition = ?dep.condition,
timeout = ?timeout,
"Dependency timeout - continuing anyway"
);
WaitResult::TimedOutContinue
}
}
}
pub async fn wait_for_all(&self, deps: &[DependsSpec]) -> Result<Vec<WaitResult>> {
let mut results = Vec::with_capacity(deps.len());
for dep in deps {
let result = self.wait_for_dependency(dep).await?;
if result.is_failure() {
results.push(result);
return Ok(results);
}
results.push(result);
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::MockRuntime;
use zlayer_spec::{DependencyCondition, DependsSpec, TimeoutAction};
fn minimal_spec(depends: Vec<DependsSpec>) -> ServiceSpec {
use zlayer_spec::*;
let yaml = r"
version: v1
deployment: test
services:
test:
rtype: service
image:
name: test:latest
endpoints:
- name: http
protocol: http
port: 8080
";
let mut spec = serde_yaml::from_str::<DeploymentSpec>(yaml)
.unwrap()
.services
.remove("test")
.unwrap();
spec.depends = depends;
spec
}
fn dep(service: &str, condition: DependencyCondition) -> DependsSpec {
DependsSpec {
service: service.to_string(),
condition,
timeout: Some(std::time::Duration::from_secs(60)),
on_timeout: TimeoutAction::Fail,
}
}
#[test]
fn test_build_empty_graph() {
let services: HashMap<String, ServiceSpec> = HashMap::new();
let graph = DependencyGraph::build(&services).unwrap();
assert!(graph.is_empty());
assert!(graph.startup_order().is_empty());
}
#[test]
fn test_build_no_dependencies() {
let mut services = HashMap::new();
services.insert("a".to_string(), minimal_spec(vec![]));
services.insert("b".to_string(), minimal_spec(vec![]));
services.insert("c".to_string(), minimal_spec(vec![]));
let graph = DependencyGraph::build(&services).unwrap();
assert_eq!(graph.len(), 3);
let order = graph.startup_order();
assert_eq!(order.len(), 3);
assert!(order.contains(&"a".to_string()));
assert!(order.contains(&"b".to_string()));
assert!(order.contains(&"c".to_string()));
}
#[test]
fn test_build_linear_dependencies() {
let mut services = HashMap::new();
services.insert("c".to_string(), minimal_spec(vec![]));
services.insert(
"b".to_string(),
minimal_spec(vec![dep("c", DependencyCondition::Started)]),
);
services.insert(
"a".to_string(),
minimal_spec(vec![dep("b", DependencyCondition::Started)]),
);
let graph = DependencyGraph::build(&services).unwrap();
let order = graph.startup_order();
let pos_a = order.iter().position(|x| x == "a").unwrap();
let pos_b = order.iter().position(|x| x == "b").unwrap();
let pos_c = order.iter().position(|x| x == "c").unwrap();
assert!(pos_c < pos_b);
assert!(pos_b < pos_a);
}
#[test]
fn test_build_diamond_dependencies() {
let mut services = HashMap::new();
services.insert("d".to_string(), minimal_spec(vec![]));
services.insert(
"b".to_string(),
minimal_spec(vec![dep("d", DependencyCondition::Started)]),
);
services.insert(
"c".to_string(),
minimal_spec(vec![dep("d", DependencyCondition::Started)]),
);
services.insert(
"a".to_string(),
minimal_spec(vec![
dep("b", DependencyCondition::Started),
dep("c", DependencyCondition::Started),
]),
);
let graph = DependencyGraph::build(&services).unwrap();
let order = graph.startup_order();
let pos_a = order.iter().position(|x| x == "a").unwrap();
let pos_b = order.iter().position(|x| x == "b").unwrap();
let pos_c = order.iter().position(|x| x == "c").unwrap();
let pos_d = order.iter().position(|x| x == "d").unwrap();
assert!(pos_d < pos_b);
assert!(pos_d < pos_c);
assert!(pos_b < pos_a);
assert!(pos_c < pos_a);
}
#[test]
fn test_detect_self_dependency() {
let mut services = HashMap::new();
services.insert(
"a".to_string(),
minimal_spec(vec![dep("a", DependencyCondition::Started)]),
);
let result = DependencyGraph::build(&services);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("self-dependency"));
}
#[test]
fn test_detect_simple_cycle() {
let mut services = HashMap::new();
services.insert(
"a".to_string(),
minimal_spec(vec![dep("b", DependencyCondition::Started)]),
);
services.insert(
"b".to_string(),
minimal_spec(vec![dep("a", DependencyCondition::Started)]),
);
let result = DependencyGraph::build(&services);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Cyclic dependency"));
}
#[test]
fn test_detect_complex_cycle() {
let mut services = HashMap::new();
services.insert(
"a".to_string(),
minimal_spec(vec![dep("b", DependencyCondition::Started)]),
);
services.insert(
"b".to_string(),
minimal_spec(vec![dep("c", DependencyCondition::Started)]),
);
services.insert(
"c".to_string(),
minimal_spec(vec![dep("d", DependencyCondition::Started)]),
);
services.insert(
"d".to_string(),
minimal_spec(vec![dep("b", DependencyCondition::Started)]),
);
let result = DependencyGraph::build(&services);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Cyclic dependency"));
}
#[test]
fn test_detect_missing_dependency() {
let mut services = HashMap::new();
services.insert(
"a".to_string(),
minimal_spec(vec![dep("nonexistent", DependencyCondition::Started)]),
);
let result = DependencyGraph::build(&services);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("non-existent"));
assert!(err.contains("nonexistent"));
}
#[test]
fn test_depends_on_transitive() {
let mut services = HashMap::new();
services.insert("c".to_string(), minimal_spec(vec![]));
services.insert(
"b".to_string(),
minimal_spec(vec![dep("c", DependencyCondition::Started)]),
);
services.insert(
"a".to_string(),
minimal_spec(vec![dep("b", DependencyCondition::Started)]),
);
let graph = DependencyGraph::build(&services).unwrap();
assert!(graph.depends_on("a", "b"));
assert!(graph.depends_on("b", "c"));
assert!(graph.depends_on("a", "c"));
assert!(!graph.depends_on("c", "a"));
assert!(!graph.depends_on("b", "a"));
assert!(!graph.depends_on("c", "b"));
assert!(!graph.depends_on("a", "a"));
}
#[test]
fn test_get_dependencies() {
let mut services = HashMap::new();
services.insert("c".to_string(), minimal_spec(vec![]));
services.insert(
"b".to_string(),
minimal_spec(vec![dep("c", DependencyCondition::Healthy)]),
);
services.insert(
"a".to_string(),
minimal_spec(vec![
dep("b", DependencyCondition::Started),
dep("c", DependencyCondition::Ready),
]),
);
let graph = DependencyGraph::build(&services).unwrap();
let a_deps = graph.dependencies("a").unwrap();
assert_eq!(a_deps.len(), 2);
let b_deps = graph.dependencies("b").unwrap();
assert_eq!(b_deps.len(), 1);
assert_eq!(b_deps[0].service, "c");
assert_eq!(b_deps[0].condition, DependencyCondition::Healthy);
let c_deps = graph.dependencies("c").unwrap();
assert!(c_deps.is_empty());
assert!(graph.dependencies("nonexistent").is_none());
}
#[test]
fn test_dependents() {
let mut services = HashMap::new();
services.insert("c".to_string(), minimal_spec(vec![]));
services.insert(
"b".to_string(),
minimal_spec(vec![dep("c", DependencyCondition::Started)]),
);
services.insert(
"a".to_string(),
minimal_spec(vec![dep("c", DependencyCondition::Started)]),
);
let graph = DependencyGraph::build(&services).unwrap();
let c_dependents = graph.dependents("c");
assert_eq!(c_dependents.len(), 2);
assert!(c_dependents.contains(&"a".to_string()));
assert!(c_dependents.contains(&"b".to_string()));
assert!(graph.dependents("a").is_empty());
assert!(graph.dependents("b").is_empty());
}
#[tokio::test]
async fn test_check_started_running() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
let checker = DependencyConditionChecker::new(runtime.clone(), health_states, None);
let id = ContainerId {
service: "test".to_string(),
replica: 1,
};
let spec = minimal_spec(vec![]);
runtime.create_container(&id, &spec).await.unwrap();
runtime.start_container(&id).await.unwrap();
assert!(checker.check_started("test").await.unwrap());
}
#[tokio::test]
async fn test_check_started_not_running() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
let checker = DependencyConditionChecker::new(runtime.clone(), health_states, None);
let id = ContainerId {
service: "test".to_string(),
replica: 1,
};
let spec = minimal_spec(vec![]);
runtime.create_container(&id, &spec).await.unwrap();
assert!(!checker.check_started("test").await.unwrap());
}
#[tokio::test]
async fn test_check_started_no_container() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
let checker = DependencyConditionChecker::new(runtime, health_states, None);
assert!(!checker.check_started("nonexistent").await.unwrap());
}
#[tokio::test]
async fn test_check_healthy() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("test".to_string(), HealthState::Healthy);
}
let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
assert!(checker.check_healthy("test").await.unwrap());
}
#[tokio::test]
async fn test_check_healthy_unhealthy() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert(
"test".to_string(),
HealthState::Unhealthy {
failures: 3,
reason: "connection refused".to_string(),
},
);
}
let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
assert!(!checker.check_healthy("test").await.unwrap());
}
#[tokio::test]
async fn test_check_healthy_unknown() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("test".to_string(), HealthState::Unknown);
}
let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
assert!(!checker.check_healthy("test").await.unwrap());
}
#[tokio::test]
async fn test_check_healthy_no_state() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
let checker = DependencyConditionChecker::new(runtime, health_states, None);
assert!(!checker.check_healthy("test").await.unwrap());
}
#[tokio::test]
async fn test_check_ready_no_registry() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("test".to_string(), HealthState::Healthy);
}
let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
assert!(checker.check_ready("test").await.unwrap());
}
#[tokio::test]
async fn test_check_ready_with_registry() {
use std::net::SocketAddr;
use zlayer_proxy::RouteEntry;
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
let registry = Arc::new(ServiceRegistry::new());
let entry = RouteEntry {
service_name: "test".to_string(),
endpoint_name: "http".to_string(),
host: Some("test.default".to_string()),
path_prefix: "/".to_string(),
resolved: zlayer_proxy::ResolvedService {
name: "test".to_string(),
backends: vec!["127.0.0.1:8080".parse::<SocketAddr>().unwrap()],
use_tls: false,
sni_hostname: "test.local".to_string(),
expose: zlayer_spec::ExposeType::Public,
protocol: zlayer_spec::Protocol::Http,
strip_prefix: false,
path_prefix: "/".to_string(),
target_port: 8080,
},
};
registry.register(entry).await;
let checker = DependencyConditionChecker::new(runtime, health_states, Some(registry));
assert!(checker.check_ready("test").await.unwrap());
}
#[tokio::test]
async fn test_check_ready_no_backends() {
use zlayer_proxy::RouteEntry;
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
let registry = Arc::new(ServiceRegistry::new());
let entry = RouteEntry {
service_name: "test".to_string(),
endpoint_name: "http".to_string(),
host: Some("test.default".to_string()),
path_prefix: "/".to_string(),
resolved: zlayer_proxy::ResolvedService {
name: "test".to_string(),
backends: vec![], use_tls: false,
sni_hostname: "test.local".to_string(),
expose: zlayer_spec::ExposeType::Public,
protocol: zlayer_spec::Protocol::Http,
strip_prefix: false,
path_prefix: "/".to_string(),
target_port: 8080,
},
};
registry.register(entry).await;
let checker = DependencyConditionChecker::new(runtime, health_states, Some(registry));
assert!(!checker.check_ready("test").await.unwrap());
}
#[tokio::test]
async fn test_check_condition_dispatches_correctly() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("test".to_string(), HealthState::Healthy);
}
let id = ContainerId {
service: "test".to_string(),
replica: 1,
};
let spec = minimal_spec(vec![]);
runtime.create_container(&id, &spec).await.unwrap();
runtime.start_container(&id).await.unwrap();
let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
let dep_started = dep("test", DependencyCondition::Started);
assert!(checker.check(&dep_started).await.unwrap());
let dep_healthy = dep("test", DependencyCondition::Healthy);
assert!(checker.check(&dep_healthy).await.unwrap());
let dep_ready = dep("test", DependencyCondition::Ready);
assert!(checker.check(&dep_ready).await.unwrap());
}
fn dep_with_timeout(
service: &str,
condition: DependencyCondition,
timeout: Duration,
on_timeout: TimeoutAction,
) -> DependsSpec {
DependsSpec {
service: service.to_string(),
condition,
timeout: Some(timeout),
on_timeout,
}
}
#[tokio::test]
async fn test_wait_satisfied_immediately() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("db".to_string(), HealthState::Healthy);
}
let checker = DependencyConditionChecker::new(runtime, health_states, None);
let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
let dep = dep_with_timeout(
"db",
DependencyCondition::Healthy,
Duration::from_secs(5),
TimeoutAction::Fail,
);
let result = waiter.wait_for_dependency(&dep).await.unwrap();
assert!(result.is_satisfied());
}
#[tokio::test]
async fn test_wait_satisfied_after_delay() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("db".to_string(), HealthState::Unknown);
}
let health_states_clone = Arc::clone(&health_states);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(150)).await;
let mut states = health_states_clone.write().await;
states.insert("db".to_string(), HealthState::Healthy);
});
let checker = DependencyConditionChecker::new(runtime, health_states, None);
let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
let dep = dep_with_timeout(
"db",
DependencyCondition::Healthy,
Duration::from_secs(5),
TimeoutAction::Fail,
);
let result = waiter.wait_for_dependency(&dep).await.unwrap();
assert!(result.is_satisfied());
}
#[tokio::test]
async fn test_wait_timeout_fail() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("db".to_string(), HealthState::Unknown);
}
let checker = DependencyConditionChecker::new(runtime, health_states, None);
let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
let dep = dep_with_timeout(
"db",
DependencyCondition::Healthy,
Duration::from_millis(200), TimeoutAction::Fail,
);
let result = waiter.wait_for_dependency(&dep).await.unwrap();
assert!(result.is_failure());
match result {
WaitResult::TimedOutFail {
service,
condition,
timeout,
} => {
assert_eq!(service, "db");
assert_eq!(condition, DependencyCondition::Healthy);
assert_eq!(timeout, Duration::from_millis(200));
}
_ => panic!("Expected TimedOutFail"),
}
}
#[tokio::test]
async fn test_wait_timeout_warn() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
let checker = DependencyConditionChecker::new(runtime, health_states, None);
let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
let dep = dep_with_timeout(
"db",
DependencyCondition::Healthy,
Duration::from_millis(100),
TimeoutAction::Warn,
);
let result = waiter.wait_for_dependency(&dep).await.unwrap();
assert!(result.should_continue());
assert!(!result.is_satisfied());
match result {
WaitResult::TimedOutWarn { service, condition } => {
assert_eq!(service, "db");
assert_eq!(condition, DependencyCondition::Healthy);
}
_ => panic!("Expected TimedOutWarn"),
}
}
#[tokio::test]
async fn test_wait_timeout_continue() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
let checker = DependencyConditionChecker::new(runtime, health_states, None);
let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
let dep = dep_with_timeout(
"db",
DependencyCondition::Healthy,
Duration::from_millis(100),
TimeoutAction::Continue,
);
let result = waiter.wait_for_dependency(&dep).await.unwrap();
assert!(result.should_continue());
assert!(!result.is_satisfied());
assert!(matches!(result, WaitResult::TimedOutContinue));
}
#[tokio::test]
async fn test_wait_for_all_success() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("db".to_string(), HealthState::Healthy);
states.insert("cache".to_string(), HealthState::Healthy);
}
let checker = DependencyConditionChecker::new(runtime, health_states, None);
let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
let deps = vec![
dep_with_timeout(
"db",
DependencyCondition::Healthy,
Duration::from_secs(5),
TimeoutAction::Fail,
),
dep_with_timeout(
"cache",
DependencyCondition::Healthy,
Duration::from_secs(5),
TimeoutAction::Fail,
),
];
let results = waiter.wait_for_all(&deps).await.unwrap();
assert_eq!(results.len(), 2);
assert!(results.iter().all(super::WaitResult::is_satisfied));
}
#[tokio::test]
async fn test_wait_for_all_early_failure() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("cache".to_string(), HealthState::Healthy);
}
let checker = DependencyConditionChecker::new(runtime, health_states, None);
let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
let deps = vec![
dep_with_timeout(
"db",
DependencyCondition::Healthy,
Duration::from_millis(100), TimeoutAction::Fail,
),
dep_with_timeout(
"cache",
DependencyCondition::Healthy,
Duration::from_secs(5),
TimeoutAction::Fail,
),
];
let results = waiter.wait_for_all(&deps).await.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].is_failure());
}
#[tokio::test]
async fn test_wait_for_all_mixed_results() {
let runtime = Arc::new(MockRuntime::new());
let health_states = Arc::new(RwLock::new(HashMap::new()));
{
let mut states = health_states.write().await;
states.insert("db".to_string(), HealthState::Healthy);
}
let checker = DependencyConditionChecker::new(runtime, health_states, None);
let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
let deps = vec![
dep_with_timeout(
"db",
DependencyCondition::Healthy,
Duration::from_secs(5),
TimeoutAction::Fail,
),
dep_with_timeout(
"cache",
DependencyCondition::Healthy,
Duration::from_millis(100),
TimeoutAction::Warn, ),
];
let results = waiter.wait_for_all(&deps).await.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].is_satisfied()); assert!(matches!(results[1], WaitResult::TimedOutWarn { .. })); }
#[test]
fn test_wait_result_helpers() {
let satisfied = WaitResult::Satisfied;
assert!(satisfied.is_satisfied());
assert!(satisfied.should_continue());
assert!(!satisfied.is_failure());
let continue_result = WaitResult::TimedOutContinue;
assert!(!continue_result.is_satisfied());
assert!(continue_result.should_continue());
assert!(!continue_result.is_failure());
let warn = WaitResult::TimedOutWarn {
service: "db".to_string(),
condition: DependencyCondition::Healthy,
};
assert!(!warn.is_satisfied());
assert!(warn.should_continue());
assert!(!warn.is_failure());
let fail = WaitResult::TimedOutFail {
service: "db".to_string(),
condition: DependencyCondition::Healthy,
timeout: Duration::from_secs(60),
};
assert!(!fail.is_satisfied());
assert!(!fail.should_continue());
assert!(fail.is_failure());
}
}