1use crate::error::{AgentError, Result};
8use crate::health::HealthState;
9use crate::runtime::{ContainerId, ContainerState, Runtime};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::RwLock;
14use zlayer_proxy::ServiceRegistry;
15use zlayer_spec::{DependencyCondition, DependsSpec, ServiceSpec, TimeoutAction};
16
17#[derive(Debug, Clone)]
19pub enum DependencyError {
20 CyclicDependency { cycle: Vec<String> },
22 MissingService { service: String, missing: String },
24 SelfDependency { service: String },
26}
27
28impl std::fmt::Display for DependencyError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 match self {
31 DependencyError::CyclicDependency { cycle } => {
32 write!(f, "Cyclic dependency detected: {}", cycle.join(" -> "))
33 }
34 DependencyError::MissingService { service, missing } => {
35 write!(
36 f,
37 "Service '{service}' depends on non-existent service '{missing}'"
38 )
39 }
40 DependencyError::SelfDependency { service } => {
41 write!(f, "Service '{service}' has a self-dependency")
42 }
43 }
44 }
45}
46
47impl std::error::Error for DependencyError {}
48
49impl From<DependencyError> for AgentError {
50 fn from(err: DependencyError) -> Self {
51 AgentError::InvalidSpec(err.to_string())
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct DependencyNode {
58 pub service_name: String,
60 pub depends_on: Vec<DependsSpec>,
62}
63
64#[derive(Debug)]
69pub struct DependencyGraph {
70 nodes: HashMap<String, DependencyNode>,
72 startup_order: Vec<String>,
74 adjacency: HashMap<String, Vec<String>>,
76 reverse_adjacency: HashMap<String, Vec<String>>,
78}
79
80impl DependencyGraph {
81 pub fn build(services: &HashMap<String, ServiceSpec>) -> Result<Self> {
97 let mut nodes = HashMap::new();
98 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
99 let mut reverse_adjacency: HashMap<String, Vec<String>> = HashMap::new();
100
101 for (name, spec) in services {
103 nodes.insert(
104 name.clone(),
105 DependencyNode {
106 service_name: name.clone(),
107 depends_on: spec.depends.clone(),
108 },
109 );
110 adjacency.insert(name.clone(), Vec::new());
111 reverse_adjacency.insert(name.clone(), Vec::new());
112 }
113
114 for (name, spec) in services {
116 for dep in &spec.depends {
117 if dep.service == *name {
119 return Err(DependencyError::SelfDependency {
120 service: name.clone(),
121 }
122 .into());
123 }
124
125 if !services.contains_key(&dep.service) {
127 return Err(DependencyError::MissingService {
128 service: name.clone(),
129 missing: dep.service.clone(),
130 }
131 .into());
132 }
133
134 adjacency.get_mut(name).unwrap().push(dep.service.clone());
136 reverse_adjacency
137 .get_mut(&dep.service)
138 .unwrap()
139 .push(name.clone());
140 }
141 }
142
143 let mut graph = Self {
144 nodes,
145 startup_order: Vec::new(),
146 adjacency,
147 reverse_adjacency,
148 };
149
150 if let Some(cycle) = graph.detect_cycle() {
152 return Err(DependencyError::CyclicDependency { cycle }.into());
153 }
154
155 graph.startup_order = graph.topological_sort()?;
157
158 Ok(graph)
159 }
160
161 #[must_use]
170 pub fn detect_cycle(&self) -> Option<Vec<String>> {
171 let mut color: HashMap<&String, u8> = HashMap::new();
172 let mut parent: HashMap<&String, Option<&String>> = HashMap::new();
173
174 for name in self.nodes.keys() {
176 color.insert(name, 0);
177 parent.insert(name, None);
178 }
179
180 for start in self.nodes.keys() {
182 if color[start] == 0 {
183 if let Some(cycle) = self.dfs_cycle_detect(start, &mut color, &mut parent) {
184 return Some(cycle);
185 }
186 }
187 }
188
189 None
190 }
191
192 fn dfs_cycle_detect<'a>(
194 &'a self,
195 node: &'a String,
196 color: &mut HashMap<&'a String, u8>,
197 parent: &mut HashMap<&'a String, Option<&'a String>>,
198 ) -> Option<Vec<String>> {
199 color.insert(node, 1);
201
202 if let Some(deps) = self.adjacency.get(node) {
204 for dep in deps {
205 match color.get(dep) {
206 Some(0) => {
207 parent.insert(dep, Some(node));
209 if let Some(cycle) = self.dfs_cycle_detect(dep, color, parent) {
210 return Some(cycle);
211 }
212 }
213 Some(1) => {
214 let mut cycle = vec![dep.clone()];
216 let mut current = node;
217 while current != dep {
218 cycle.push(current.clone());
219 if let Some(Some(p)) = parent.get(current) {
220 current = p;
221 } else {
222 break;
223 }
224 }
225 cycle.push(dep.clone());
226 cycle.reverse();
227 return Some(cycle);
228 }
229 _ => {
230 }
232 }
233 }
234 }
235
236 color.insert(node, 2);
238 None
239 }
240
241 #[must_use]
246 pub fn topological_order(&self) -> Vec<String> {
247 self.startup_order.clone()
248 }
249
250 fn topological_sort(&self) -> Result<Vec<String>> {
252 let mut in_degree: HashMap<&String, usize> = HashMap::new();
253 let mut queue: VecDeque<&String> = VecDeque::new();
254 let mut result = Vec::new();
255
256 for name in self.nodes.keys() {
258 let degree = self.adjacency.get(name).map_or(0, std::vec::Vec::len);
259 in_degree.insert(name, degree);
260 if degree == 0 {
261 queue.push_back(name);
262 }
263 }
264
265 while let Some(node) = queue.pop_front() {
267 result.push(node.clone());
268
269 if let Some(dependents) = self.reverse_adjacency.get(node) {
271 for dependent in dependents {
272 if let Some(degree) = in_degree.get_mut(dependent) {
273 *degree -= 1;
274 if *degree == 0 {
275 queue.push_back(dependent);
276 }
277 }
278 }
279 }
280 }
281
282 if result.len() != self.nodes.len() {
284 return Err(AgentError::InvalidSpec(
285 "Dependency graph has unresolved cycles".to_string(),
286 ));
287 }
288
289 Ok(result)
290 }
291
292 #[must_use]
294 pub fn startup_order(&self) -> &[String] {
295 &self.startup_order
296 }
297
298 #[must_use]
300 pub fn dependencies(&self, service: &str) -> Option<&[DependsSpec]> {
301 self.nodes.get(service).map(|n| n.depends_on.as_slice())
302 }
303
304 #[must_use]
306 pub fn len(&self) -> usize {
307 self.nodes.len()
308 }
309
310 #[must_use]
312 pub fn is_empty(&self) -> bool {
313 self.nodes.is_empty()
314 }
315
316 #[must_use]
318 pub fn depends_on(&self, a: &str, b: &str) -> bool {
319 if a == b {
320 return false;
321 }
322
323 let mut visited = HashSet::new();
324 let mut stack = vec![a];
325
326 while let Some(current) = stack.pop() {
327 if visited.contains(current) {
328 continue;
329 }
330 visited.insert(current);
331
332 if let Some(deps) = self.adjacency.get(current) {
333 for dep in deps {
334 if dep == b {
335 return true;
336 }
337 if !visited.contains(dep.as_str()) {
338 stack.push(dep);
339 }
340 }
341 }
342 }
343
344 false
345 }
346
347 #[must_use]
349 pub fn dependents(&self, service: &str) -> Vec<String> {
350 self.reverse_adjacency
351 .get(service)
352 .cloned()
353 .unwrap_or_default()
354 }
355}
356
357pub struct DependencyConditionChecker {
364 runtime: Arc<dyn Runtime + Send + Sync>,
366 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
368 service_registry: Option<Arc<ServiceRegistry>>,
370}
371
372impl DependencyConditionChecker {
373 pub fn new(
380 runtime: Arc<dyn Runtime + Send + Sync>,
381 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
382 service_registry: Option<Arc<ServiceRegistry>>,
383 ) -> Self {
384 Self {
385 runtime,
386 health_states,
387 service_registry,
388 }
389 }
390
391 pub async fn check(&self, dep: &DependsSpec) -> Result<bool> {
402 match dep.condition {
403 DependencyCondition::Started => self.check_started(&dep.service).await,
404 DependencyCondition::Healthy => self.check_healthy(&dep.service).await,
405 DependencyCondition::Ready => self.check_ready(&dep.service).await,
406 }
407 }
408
409 pub async fn check_started(&self, service: &str) -> Result<bool> {
416 let id = ContainerId::new(service.to_string(), 1);
419
420 match self.runtime.container_state(&id).await {
421 Ok(ContainerState::Running) => Ok(true),
422 Ok(_) | Err(AgentError::NotFound { .. }) => Ok(false),
423 Err(e) => Err(e), }
425 }
426
427 pub async fn check_healthy(&self, service: &str) -> Result<bool> {
435 let health_states = self.health_states.read().await;
436
437 match health_states.get(service) {
438 Some(HealthState::Healthy) => Ok(true),
439 Some(_) | None => Ok(false),
440 }
441 }
442
443 pub async fn check_ready(&self, service: &str) -> Result<bool> {
454 if let Some(registry) = &self.service_registry {
455 let services = registry.list_services().await;
458 if !services.contains(&service.to_string()) {
459 return Ok(false);
460 }
461
462 let host = format!("{service}.default");
465 match registry.resolve(Some(&host), "/").await {
466 Some(resolved) => {
467 Ok(!resolved.backends.is_empty())
469 }
470 None => {
471 Ok(false)
473 }
474 }
475 } else {
476 tracing::warn!(
478 service = %service,
479 "No proxy configured for 'ready' condition check, falling back to 'healthy'"
480 );
481 self.check_healthy(service).await
482 }
483 }
484}
485
486#[derive(Debug, Clone)]
490pub enum WaitResult {
491 Satisfied,
493 TimedOutContinue,
495 TimedOutWarn {
497 service: String,
498 condition: DependencyCondition,
499 },
500 TimedOutFail {
502 service: String,
503 condition: DependencyCondition,
504 timeout: Duration,
505 },
506}
507
508impl WaitResult {
509 #[must_use]
511 pub fn is_satisfied(&self) -> bool {
512 matches!(self, WaitResult::Satisfied)
513 }
514
515 #[must_use]
517 pub fn should_continue(&self) -> bool {
518 matches!(
519 self,
520 WaitResult::Satisfied | WaitResult::TimedOutContinue | WaitResult::TimedOutWarn { .. }
521 )
522 }
523
524 #[must_use]
526 pub fn is_failure(&self) -> bool {
527 matches!(self, WaitResult::TimedOutFail { .. })
528 }
529}
530
531pub struct DependencyWaiter {
536 condition_checker: DependencyConditionChecker,
538 poll_interval: Duration,
540}
541
542impl DependencyWaiter {
543 #[must_use]
545 pub fn new(condition_checker: DependencyConditionChecker) -> Self {
546 Self {
547 condition_checker,
548 poll_interval: Duration::from_secs(1),
549 }
550 }
551
552 #[must_use]
554 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
555 self.poll_interval = interval;
556 self
557 }
558
559 #[must_use]
561 pub fn poll_interval(&self) -> Duration {
562 self.poll_interval
563 }
564
565 pub async fn wait_for_dependency(&self, dep: &DependsSpec) -> Result<WaitResult> {
579 let timeout = dep.timeout.unwrap_or(Duration::from_secs(300)); let start = std::time::Instant::now();
581
582 tracing::info!(
583 service = %dep.service,
584 condition = ?dep.condition,
585 timeout = ?timeout,
586 "Waiting for dependency"
587 );
588
589 loop {
590 match self.condition_checker.check(dep).await {
592 Ok(true) => {
593 tracing::info!(
594 service = %dep.service,
595 condition = ?dep.condition,
596 elapsed = ?start.elapsed(),
597 "Dependency condition satisfied"
598 );
599 return Ok(WaitResult::Satisfied);
600 }
601 Ok(false) => {
602 tracing::debug!(
603 service = %dep.service,
604 condition = ?dep.condition,
605 elapsed = ?start.elapsed(),
606 "Dependency condition not yet satisfied"
607 );
608 }
609 Err(e) => {
610 tracing::warn!(
611 service = %dep.service,
612 condition = ?dep.condition,
613 error = %e,
614 "Error checking dependency condition"
615 );
616 }
618 }
619
620 if start.elapsed() >= timeout {
622 return Ok(self.handle_timeout(dep, timeout));
623 }
624
625 tokio::time::sleep(self.poll_interval).await;
627 }
628 }
629
630 #[allow(clippy::unused_self)]
632 fn handle_timeout(&self, dep: &DependsSpec, timeout: Duration) -> WaitResult {
633 match dep.on_timeout {
634 TimeoutAction::Fail => {
635 tracing::error!(
636 service = %dep.service,
637 condition = ?dep.condition,
638 timeout = ?timeout,
639 "Dependency timeout - failing startup"
640 );
641 WaitResult::TimedOutFail {
642 service: dep.service.clone(),
643 condition: dep.condition,
644 timeout,
645 }
646 }
647 TimeoutAction::Warn => {
648 tracing::warn!(
649 service = %dep.service,
650 condition = ?dep.condition,
651 timeout = ?timeout,
652 "Dependency timeout - continuing with warning"
653 );
654 WaitResult::TimedOutWarn {
655 service: dep.service.clone(),
656 condition: dep.condition,
657 }
658 }
659 TimeoutAction::Continue => {
660 tracing::info!(
661 service = %dep.service,
662 condition = ?dep.condition,
663 timeout = ?timeout,
664 "Dependency timeout - continuing anyway"
665 );
666 WaitResult::TimedOutContinue
667 }
668 }
669 }
670
671 pub async fn wait_for_all(&self, deps: &[DependsSpec]) -> Result<Vec<WaitResult>> {
685 let mut results = Vec::with_capacity(deps.len());
686
687 for dep in deps {
688 let result = self.wait_for_dependency(dep).await?;
689
690 if result.is_failure() {
692 results.push(result);
693 return Ok(results);
695 }
696
697 results.push(result);
698 }
699
700 Ok(results)
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707 use crate::runtime::MockRuntime;
708 use zlayer_spec::{DependencyCondition, DependsSpec, TimeoutAction};
709
710 fn minimal_spec(depends: Vec<DependsSpec>) -> ServiceSpec {
712 use zlayer_spec::*;
713 let yaml = r"
714version: v1
715deployment: test
716services:
717 test:
718 rtype: service
719 image:
720 name: test:latest
721 endpoints:
722 - name: http
723 protocol: http
724 port: 8080
725";
726 let mut spec = serde_yaml::from_str::<DeploymentSpec>(yaml)
727 .unwrap()
728 .services
729 .remove("test")
730 .unwrap();
731 spec.depends = depends;
732 spec
733 }
734
735 fn dep(service: &str, condition: DependencyCondition) -> DependsSpec {
737 DependsSpec {
738 service: service.to_string(),
739 condition,
740 timeout: Some(std::time::Duration::from_secs(60)),
741 on_timeout: TimeoutAction::Fail,
742 }
743 }
744
745 #[test]
748 fn test_build_empty_graph() {
749 let services: HashMap<String, ServiceSpec> = HashMap::new();
750 let graph = DependencyGraph::build(&services).unwrap();
751 assert!(graph.is_empty());
752 assert!(graph.startup_order().is_empty());
753 }
754
755 #[test]
756 fn test_build_no_dependencies() {
757 let mut services = HashMap::new();
758 services.insert("a".to_string(), minimal_spec(vec![]));
759 services.insert("b".to_string(), minimal_spec(vec![]));
760 services.insert("c".to_string(), minimal_spec(vec![]));
761
762 let graph = DependencyGraph::build(&services).unwrap();
763 assert_eq!(graph.len(), 3);
764 let order = graph.startup_order();
766 assert_eq!(order.len(), 3);
767 assert!(order.contains(&"a".to_string()));
768 assert!(order.contains(&"b".to_string()));
769 assert!(order.contains(&"c".to_string()));
770 }
771
772 #[test]
773 fn test_build_linear_dependencies() {
774 let mut services = HashMap::new();
776 services.insert("c".to_string(), minimal_spec(vec![]));
777 services.insert(
778 "b".to_string(),
779 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
780 );
781 services.insert(
782 "a".to_string(),
783 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
784 );
785
786 let graph = DependencyGraph::build(&services).unwrap();
787 let order = graph.startup_order();
788
789 let pos_a = order.iter().position(|x| x == "a").unwrap();
791 let pos_b = order.iter().position(|x| x == "b").unwrap();
792 let pos_c = order.iter().position(|x| x == "c").unwrap();
793
794 assert!(pos_c < pos_b);
795 assert!(pos_b < pos_a);
796 }
797
798 #[test]
799 fn test_build_diamond_dependencies() {
800 let mut services = HashMap::new();
807 services.insert("d".to_string(), minimal_spec(vec![]));
808 services.insert(
809 "b".to_string(),
810 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
811 );
812 services.insert(
813 "c".to_string(),
814 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
815 );
816 services.insert(
817 "a".to_string(),
818 minimal_spec(vec![
819 dep("b", DependencyCondition::Started),
820 dep("c", DependencyCondition::Started),
821 ]),
822 );
823
824 let graph = DependencyGraph::build(&services).unwrap();
825 let order = graph.startup_order();
826
827 let pos_a = order.iter().position(|x| x == "a").unwrap();
828 let pos_b = order.iter().position(|x| x == "b").unwrap();
829 let pos_c = order.iter().position(|x| x == "c").unwrap();
830 let pos_d = order.iter().position(|x| x == "d").unwrap();
831
832 assert!(pos_d < pos_b);
834 assert!(pos_d < pos_c);
835 assert!(pos_b < pos_a);
837 assert!(pos_c < pos_a);
838 }
839
840 #[test]
841 fn test_detect_self_dependency() {
842 let mut services = HashMap::new();
843 services.insert(
844 "a".to_string(),
845 minimal_spec(vec![dep("a", DependencyCondition::Started)]),
846 );
847
848 let result = DependencyGraph::build(&services);
849 assert!(result.is_err());
850 let err = result.unwrap_err().to_string();
851 assert!(err.contains("self-dependency"));
852 }
853
854 #[test]
855 fn test_detect_simple_cycle() {
856 let mut services = HashMap::new();
858 services.insert(
859 "a".to_string(),
860 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
861 );
862 services.insert(
863 "b".to_string(),
864 minimal_spec(vec![dep("a", DependencyCondition::Started)]),
865 );
866
867 let result = DependencyGraph::build(&services);
868 assert!(result.is_err());
869 let err = result.unwrap_err().to_string();
870 assert!(err.contains("Cyclic dependency"));
871 }
872
873 #[test]
874 fn test_detect_complex_cycle() {
875 let mut services = HashMap::new();
877 services.insert(
878 "a".to_string(),
879 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
880 );
881 services.insert(
882 "b".to_string(),
883 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
884 );
885 services.insert(
886 "c".to_string(),
887 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
888 );
889 services.insert(
890 "d".to_string(),
891 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
892 );
893
894 let result = DependencyGraph::build(&services);
895 assert!(result.is_err());
896 let err = result.unwrap_err().to_string();
897 assert!(err.contains("Cyclic dependency"));
898 }
899
900 #[test]
901 fn test_detect_missing_dependency() {
902 let mut services = HashMap::new();
903 services.insert(
904 "a".to_string(),
905 minimal_spec(vec![dep("nonexistent", DependencyCondition::Started)]),
906 );
907
908 let result = DependencyGraph::build(&services);
909 assert!(result.is_err());
910 let err = result.unwrap_err().to_string();
911 assert!(err.contains("non-existent"));
912 assert!(err.contains("nonexistent"));
913 }
914
915 #[test]
916 fn test_depends_on_transitive() {
917 let mut services = HashMap::new();
919 services.insert("c".to_string(), minimal_spec(vec![]));
920 services.insert(
921 "b".to_string(),
922 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
923 );
924 services.insert(
925 "a".to_string(),
926 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
927 );
928
929 let graph = DependencyGraph::build(&services).unwrap();
930
931 assert!(graph.depends_on("a", "b"));
933 assert!(graph.depends_on("b", "c"));
934
935 assert!(graph.depends_on("a", "c"));
937
938 assert!(!graph.depends_on("c", "a"));
940 assert!(!graph.depends_on("b", "a"));
941 assert!(!graph.depends_on("c", "b"));
942
943 assert!(!graph.depends_on("a", "a"));
945 }
946
947 #[test]
948 fn test_get_dependencies() {
949 let mut services = HashMap::new();
950 services.insert("c".to_string(), minimal_spec(vec![]));
951 services.insert(
952 "b".to_string(),
953 minimal_spec(vec![dep("c", DependencyCondition::Healthy)]),
954 );
955 services.insert(
956 "a".to_string(),
957 minimal_spec(vec![
958 dep("b", DependencyCondition::Started),
959 dep("c", DependencyCondition::Ready),
960 ]),
961 );
962
963 let graph = DependencyGraph::build(&services).unwrap();
964
965 let a_deps = graph.dependencies("a").unwrap();
966 assert_eq!(a_deps.len(), 2);
967
968 let b_deps = graph.dependencies("b").unwrap();
969 assert_eq!(b_deps.len(), 1);
970 assert_eq!(b_deps[0].service, "c");
971 assert_eq!(b_deps[0].condition, DependencyCondition::Healthy);
972
973 let c_deps = graph.dependencies("c").unwrap();
974 assert!(c_deps.is_empty());
975
976 assert!(graph.dependencies("nonexistent").is_none());
977 }
978
979 #[test]
980 fn test_dependents() {
981 let mut services = HashMap::new();
982 services.insert("c".to_string(), minimal_spec(vec![]));
983 services.insert(
984 "b".to_string(),
985 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
986 );
987 services.insert(
988 "a".to_string(),
989 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
990 );
991
992 let graph = DependencyGraph::build(&services).unwrap();
993
994 let c_dependents = graph.dependents("c");
996 assert_eq!(c_dependents.len(), 2);
997 assert!(c_dependents.contains(&"a".to_string()));
998 assert!(c_dependents.contains(&"b".to_string()));
999
1000 assert!(graph.dependents("a").is_empty());
1002 assert!(graph.dependents("b").is_empty());
1003 }
1004
1005 #[tokio::test]
1008 async fn test_check_started_running() {
1009 let runtime = Arc::new(MockRuntime::new());
1010 let health_states = Arc::new(RwLock::new(HashMap::new()));
1011 let checker = DependencyConditionChecker::new(runtime.clone(), health_states, None);
1012
1013 let id = ContainerId::new("test".to_string(), 1);
1015 let spec = minimal_spec(vec![]);
1016 runtime.create_container(&id, &spec).await.unwrap();
1017 runtime.start_container(&id).await.unwrap();
1018
1019 assert!(checker.check_started("test").await.unwrap());
1021 }
1022
1023 #[tokio::test]
1024 async fn test_check_started_not_running() {
1025 let runtime = Arc::new(MockRuntime::new());
1026 let health_states = Arc::new(RwLock::new(HashMap::new()));
1027 let checker = DependencyConditionChecker::new(runtime.clone(), health_states, None);
1028
1029 let id = ContainerId::new("test".to_string(), 1);
1031 let spec = minimal_spec(vec![]);
1032 runtime.create_container(&id, &spec).await.unwrap();
1033
1034 assert!(!checker.check_started("test").await.unwrap());
1036 }
1037
1038 #[tokio::test]
1039 async fn test_check_started_no_container() {
1040 let runtime = Arc::new(MockRuntime::new());
1041 let health_states = Arc::new(RwLock::new(HashMap::new()));
1042 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1043
1044 assert!(!checker.check_started("nonexistent").await.unwrap());
1046 }
1047
1048 #[tokio::test]
1049 async fn test_check_healthy() {
1050 let runtime = Arc::new(MockRuntime::new());
1051 let health_states = Arc::new(RwLock::new(HashMap::new()));
1052
1053 {
1055 let mut states = health_states.write().await;
1056 states.insert("test".to_string(), HealthState::Healthy);
1057 }
1058
1059 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1060
1061 assert!(checker.check_healthy("test").await.unwrap());
1062 }
1063
1064 #[tokio::test]
1065 async fn test_check_healthy_unhealthy() {
1066 let runtime = Arc::new(MockRuntime::new());
1067 let health_states = Arc::new(RwLock::new(HashMap::new()));
1068
1069 {
1071 let mut states = health_states.write().await;
1072 states.insert(
1073 "test".to_string(),
1074 HealthState::Unhealthy {
1075 failures: 3,
1076 reason: "connection refused".to_string(),
1077 },
1078 );
1079 }
1080
1081 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1082
1083 assert!(!checker.check_healthy("test").await.unwrap());
1084 }
1085
1086 #[tokio::test]
1087 async fn test_check_healthy_unknown() {
1088 let runtime = Arc::new(MockRuntime::new());
1089 let health_states = Arc::new(RwLock::new(HashMap::new()));
1090
1091 {
1093 let mut states = health_states.write().await;
1094 states.insert("test".to_string(), HealthState::Unknown);
1095 }
1096
1097 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1098
1099 assert!(!checker.check_healthy("test").await.unwrap());
1100 }
1101
1102 #[tokio::test]
1103 async fn test_check_healthy_no_state() {
1104 let runtime = Arc::new(MockRuntime::new());
1105 let health_states = Arc::new(RwLock::new(HashMap::new()));
1106 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1107
1108 assert!(!checker.check_healthy("test").await.unwrap());
1110 }
1111
1112 #[tokio::test]
1113 async fn test_check_ready_no_registry() {
1114 let runtime = Arc::new(MockRuntime::new());
1115 let health_states = Arc::new(RwLock::new(HashMap::new()));
1116
1117 {
1119 let mut states = health_states.write().await;
1120 states.insert("test".to_string(), HealthState::Healthy);
1121 }
1122
1123 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1124
1125 assert!(checker.check_ready("test").await.unwrap());
1127 }
1128
1129 #[tokio::test]
1130 async fn test_check_ready_with_registry() {
1131 use std::net::SocketAddr;
1132 use zlayer_proxy::RouteEntry;
1133
1134 let runtime = Arc::new(MockRuntime::new());
1135 let health_states = Arc::new(RwLock::new(HashMap::new()));
1136 let registry = Arc::new(ServiceRegistry::new());
1137
1138 let entry = RouteEntry {
1143 service_name: "test".to_string(),
1144 endpoint_name: "http".to_string(),
1145 host: Some("test.default".to_string()),
1146 path_prefix: "/".to_string(),
1147 resolved: zlayer_proxy::ResolvedService {
1148 name: "test".to_string(),
1149 backends: vec!["127.0.0.1:8080".parse::<SocketAddr>().unwrap()],
1150 use_tls: false,
1151 sni_hostname: "test.local".to_string(),
1152 expose: zlayer_spec::ExposeType::Public,
1153 protocol: zlayer_spec::Protocol::Http,
1154 strip_prefix: false,
1155 path_prefix: "/".to_string(),
1156 target_port: 8080,
1157 },
1158 };
1159 registry.register(entry).await;
1160
1161 let checker = DependencyConditionChecker::new(runtime, health_states, Some(registry));
1162
1163 assert!(checker.check_ready("test").await.unwrap());
1164 }
1165
1166 #[tokio::test]
1167 async fn test_check_ready_no_backends() {
1168 use zlayer_proxy::RouteEntry;
1169
1170 let runtime = Arc::new(MockRuntime::new());
1171 let health_states = Arc::new(RwLock::new(HashMap::new()));
1172 let registry = Arc::new(ServiceRegistry::new());
1173
1174 let entry = RouteEntry {
1176 service_name: "test".to_string(),
1177 endpoint_name: "http".to_string(),
1178 host: Some("test.default".to_string()),
1179 path_prefix: "/".to_string(),
1180 resolved: zlayer_proxy::ResolvedService {
1181 name: "test".to_string(),
1182 backends: vec![], use_tls: false,
1184 sni_hostname: "test.local".to_string(),
1185 expose: zlayer_spec::ExposeType::Public,
1186 protocol: zlayer_spec::Protocol::Http,
1187 strip_prefix: false,
1188 path_prefix: "/".to_string(),
1189 target_port: 8080,
1190 },
1191 };
1192 registry.register(entry).await;
1193
1194 let checker = DependencyConditionChecker::new(runtime, health_states, Some(registry));
1195
1196 assert!(!checker.check_ready("test").await.unwrap());
1198 }
1199
1200 #[tokio::test]
1201 async fn test_check_condition_dispatches_correctly() {
1202 let runtime = Arc::new(MockRuntime::new());
1203 let health_states = Arc::new(RwLock::new(HashMap::new()));
1204
1205 {
1207 let mut states = health_states.write().await;
1208 states.insert("test".to_string(), HealthState::Healthy);
1209 }
1210
1211 let id = ContainerId::new("test".to_string(), 1);
1213 let spec = minimal_spec(vec![]);
1214 runtime.create_container(&id, &spec).await.unwrap();
1215 runtime.start_container(&id).await.unwrap();
1216
1217 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1218
1219 let dep_started = dep("test", DependencyCondition::Started);
1221 assert!(checker.check(&dep_started).await.unwrap());
1222
1223 let dep_healthy = dep("test", DependencyCondition::Healthy);
1225 assert!(checker.check(&dep_healthy).await.unwrap());
1226
1227 let dep_ready = dep("test", DependencyCondition::Ready);
1229 assert!(checker.check(&dep_ready).await.unwrap());
1230 }
1231
1232 fn dep_with_timeout(
1236 service: &str,
1237 condition: DependencyCondition,
1238 timeout: Duration,
1239 on_timeout: TimeoutAction,
1240 ) -> DependsSpec {
1241 DependsSpec {
1242 service: service.to_string(),
1243 condition,
1244 timeout: Some(timeout),
1245 on_timeout,
1246 }
1247 }
1248
1249 #[tokio::test]
1250 async fn test_wait_satisfied_immediately() {
1251 let runtime = Arc::new(MockRuntime::new());
1252 let health_states = Arc::new(RwLock::new(HashMap::new()));
1253
1254 {
1256 let mut states = health_states.write().await;
1257 states.insert("db".to_string(), HealthState::Healthy);
1258 }
1259
1260 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1261 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1262
1263 let dep = dep_with_timeout(
1264 "db",
1265 DependencyCondition::Healthy,
1266 Duration::from_secs(5),
1267 TimeoutAction::Fail,
1268 );
1269
1270 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1271 assert!(result.is_satisfied());
1272 }
1273
1274 #[tokio::test]
1275 async fn test_wait_satisfied_after_delay() {
1276 let runtime = Arc::new(MockRuntime::new());
1277 let health_states = Arc::new(RwLock::new(HashMap::new()));
1278
1279 {
1281 let mut states = health_states.write().await;
1282 states.insert("db".to_string(), HealthState::Unknown);
1283 }
1284
1285 let health_states_clone = Arc::clone(&health_states);
1287
1288 tokio::spawn(async move {
1290 tokio::time::sleep(Duration::from_millis(150)).await;
1291 let mut states = health_states_clone.write().await;
1292 states.insert("db".to_string(), HealthState::Healthy);
1293 });
1294
1295 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1296 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1297
1298 let dep = dep_with_timeout(
1299 "db",
1300 DependencyCondition::Healthy,
1301 Duration::from_secs(5),
1302 TimeoutAction::Fail,
1303 );
1304
1305 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1306 assert!(result.is_satisfied());
1307 }
1308
1309 #[tokio::test]
1310 async fn test_wait_timeout_fail() {
1311 let runtime = Arc::new(MockRuntime::new());
1312 let health_states = Arc::new(RwLock::new(HashMap::new()));
1313
1314 {
1316 let mut states = health_states.write().await;
1317 states.insert("db".to_string(), HealthState::Unknown);
1318 }
1319
1320 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1321 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1322
1323 let dep = dep_with_timeout(
1324 "db",
1325 DependencyCondition::Healthy,
1326 Duration::from_millis(200), TimeoutAction::Fail,
1328 );
1329
1330 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1331 assert!(result.is_failure());
1332
1333 match result {
1334 WaitResult::TimedOutFail {
1335 service,
1336 condition,
1337 timeout,
1338 } => {
1339 assert_eq!(service, "db");
1340 assert_eq!(condition, DependencyCondition::Healthy);
1341 assert_eq!(timeout, Duration::from_millis(200));
1342 }
1343 _ => panic!("Expected TimedOutFail"),
1344 }
1345 }
1346
1347 #[tokio::test]
1348 async fn test_wait_timeout_warn() {
1349 let runtime = Arc::new(MockRuntime::new());
1350 let health_states = Arc::new(RwLock::new(HashMap::new()));
1351
1352 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1353 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1354
1355 let dep = dep_with_timeout(
1356 "db",
1357 DependencyCondition::Healthy,
1358 Duration::from_millis(100),
1359 TimeoutAction::Warn,
1360 );
1361
1362 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1363 assert!(result.should_continue());
1364 assert!(!result.is_satisfied());
1365
1366 match result {
1367 WaitResult::TimedOutWarn { service, condition } => {
1368 assert_eq!(service, "db");
1369 assert_eq!(condition, DependencyCondition::Healthy);
1370 }
1371 _ => panic!("Expected TimedOutWarn"),
1372 }
1373 }
1374
1375 #[tokio::test]
1376 async fn test_wait_timeout_continue() {
1377 let runtime = Arc::new(MockRuntime::new());
1378 let health_states = Arc::new(RwLock::new(HashMap::new()));
1379
1380 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1381 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1382
1383 let dep = dep_with_timeout(
1384 "db",
1385 DependencyCondition::Healthy,
1386 Duration::from_millis(100),
1387 TimeoutAction::Continue,
1388 );
1389
1390 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1391 assert!(result.should_continue());
1392 assert!(!result.is_satisfied());
1393 assert!(matches!(result, WaitResult::TimedOutContinue));
1394 }
1395
1396 #[tokio::test]
1397 async fn test_wait_for_all_success() {
1398 let runtime = Arc::new(MockRuntime::new());
1399 let health_states = Arc::new(RwLock::new(HashMap::new()));
1400
1401 {
1403 let mut states = health_states.write().await;
1404 states.insert("db".to_string(), HealthState::Healthy);
1405 states.insert("cache".to_string(), HealthState::Healthy);
1406 }
1407
1408 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1409 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1410
1411 let deps = vec![
1412 dep_with_timeout(
1413 "db",
1414 DependencyCondition::Healthy,
1415 Duration::from_secs(5),
1416 TimeoutAction::Fail,
1417 ),
1418 dep_with_timeout(
1419 "cache",
1420 DependencyCondition::Healthy,
1421 Duration::from_secs(5),
1422 TimeoutAction::Fail,
1423 ),
1424 ];
1425
1426 let results = waiter.wait_for_all(&deps).await.unwrap();
1427 assert_eq!(results.len(), 2);
1428 assert!(results.iter().all(super::WaitResult::is_satisfied));
1429 }
1430
1431 #[tokio::test]
1432 async fn test_wait_for_all_early_failure() {
1433 let runtime = Arc::new(MockRuntime::new());
1434 let health_states = Arc::new(RwLock::new(HashMap::new()));
1435
1436 {
1438 let mut states = health_states.write().await;
1439 states.insert("cache".to_string(), HealthState::Healthy);
1440 }
1441
1442 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1443 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1444
1445 let deps = vec![
1446 dep_with_timeout(
1447 "db",
1448 DependencyCondition::Healthy,
1449 Duration::from_millis(100), TimeoutAction::Fail,
1451 ),
1452 dep_with_timeout(
1453 "cache",
1454 DependencyCondition::Healthy,
1455 Duration::from_secs(5),
1456 TimeoutAction::Fail,
1457 ),
1458 ];
1459
1460 let results = waiter.wait_for_all(&deps).await.unwrap();
1461 assert_eq!(results.len(), 1);
1463 assert!(results[0].is_failure());
1464 }
1465
1466 #[tokio::test]
1467 async fn test_wait_for_all_mixed_results() {
1468 let runtime = Arc::new(MockRuntime::new());
1469 let health_states = Arc::new(RwLock::new(HashMap::new()));
1470
1471 {
1473 let mut states = health_states.write().await;
1474 states.insert("db".to_string(), HealthState::Healthy);
1475 }
1477
1478 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1479 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1480
1481 let deps = vec![
1482 dep_with_timeout(
1483 "db",
1484 DependencyCondition::Healthy,
1485 Duration::from_secs(5),
1486 TimeoutAction::Fail,
1487 ),
1488 dep_with_timeout(
1489 "cache",
1490 DependencyCondition::Healthy,
1491 Duration::from_millis(100),
1492 TimeoutAction::Warn, ),
1494 ];
1495
1496 let results = waiter.wait_for_all(&deps).await.unwrap();
1497 assert_eq!(results.len(), 2);
1498 assert!(results[0].is_satisfied()); assert!(matches!(results[1], WaitResult::TimedOutWarn { .. })); }
1501
1502 #[test]
1503 fn test_wait_result_helpers() {
1504 let satisfied = WaitResult::Satisfied;
1505 assert!(satisfied.is_satisfied());
1506 assert!(satisfied.should_continue());
1507 assert!(!satisfied.is_failure());
1508
1509 let continue_result = WaitResult::TimedOutContinue;
1510 assert!(!continue_result.is_satisfied());
1511 assert!(continue_result.should_continue());
1512 assert!(!continue_result.is_failure());
1513
1514 let warn = WaitResult::TimedOutWarn {
1515 service: "db".to_string(),
1516 condition: DependencyCondition::Healthy,
1517 };
1518 assert!(!warn.is_satisfied());
1519 assert!(warn.should_continue());
1520 assert!(!warn.is_failure());
1521
1522 let fail = WaitResult::TimedOutFail {
1523 service: "db".to_string(),
1524 condition: DependencyCondition::Healthy,
1525 timeout: Duration::from_secs(60),
1526 };
1527 assert!(!fail.is_satisfied());
1528 assert!(!fail.should_continue());
1529 assert!(fail.is_failure());
1530 }
1531}