1use crate::error::{AgentError, Result};
8use crate::health::HealthState;
9use crate::runtime::{ContainerId, ContainerState, Runtime};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::RwLock;
14use zlayer_proxy::ServiceRegistry;
15use zlayer_spec::{DependencyCondition, DependsSpec, ServiceSpec, TimeoutAction};
16
17#[derive(Debug, Clone)]
19pub enum DependencyError {
20 CyclicDependency { cycle: Vec<String> },
22 MissingService { service: String, missing: String },
24 SelfDependency { service: String },
26}
27
28impl std::fmt::Display for DependencyError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 match self {
31 DependencyError::CyclicDependency { cycle } => {
32 write!(f, "Cyclic dependency detected: {}", cycle.join(" -> "))
33 }
34 DependencyError::MissingService { service, missing } => {
35 write!(
36 f,
37 "Service '{service}' depends on non-existent service '{missing}'"
38 )
39 }
40 DependencyError::SelfDependency { service } => {
41 write!(f, "Service '{service}' has a self-dependency")
42 }
43 }
44 }
45}
46
47impl std::error::Error for DependencyError {}
48
49impl From<DependencyError> for AgentError {
50 fn from(err: DependencyError) -> Self {
51 AgentError::InvalidSpec(err.to_string())
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct DependencyNode {
58 pub service_name: String,
60 pub depends_on: Vec<DependsSpec>,
62}
63
64#[derive(Debug)]
69pub struct DependencyGraph {
70 nodes: HashMap<String, DependencyNode>,
72 startup_order: Vec<String>,
74 adjacency: HashMap<String, Vec<String>>,
76 reverse_adjacency: HashMap<String, Vec<String>>,
78}
79
80impl DependencyGraph {
81 pub fn build(services: &HashMap<String, ServiceSpec>) -> Result<Self> {
97 let mut nodes = HashMap::new();
98 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
99 let mut reverse_adjacency: HashMap<String, Vec<String>> = HashMap::new();
100
101 for (name, spec) in services {
103 nodes.insert(
104 name.clone(),
105 DependencyNode {
106 service_name: name.clone(),
107 depends_on: spec.depends.clone(),
108 },
109 );
110 adjacency.insert(name.clone(), Vec::new());
111 reverse_adjacency.insert(name.clone(), Vec::new());
112 }
113
114 for (name, spec) in services {
116 for dep in &spec.depends {
117 if dep.service == *name {
119 return Err(DependencyError::SelfDependency {
120 service: name.clone(),
121 }
122 .into());
123 }
124
125 if !services.contains_key(&dep.service) {
127 return Err(DependencyError::MissingService {
128 service: name.clone(),
129 missing: dep.service.clone(),
130 }
131 .into());
132 }
133
134 adjacency.get_mut(name).unwrap().push(dep.service.clone());
136 reverse_adjacency
137 .get_mut(&dep.service)
138 .unwrap()
139 .push(name.clone());
140 }
141 }
142
143 let mut graph = Self {
144 nodes,
145 startup_order: Vec::new(),
146 adjacency,
147 reverse_adjacency,
148 };
149
150 if let Some(cycle) = graph.detect_cycle() {
152 return Err(DependencyError::CyclicDependency { cycle }.into());
153 }
154
155 graph.startup_order = graph.topological_sort()?;
157
158 Ok(graph)
159 }
160
161 #[must_use]
170 pub fn detect_cycle(&self) -> Option<Vec<String>> {
171 let mut color: HashMap<&String, u8> = HashMap::new();
172 let mut parent: HashMap<&String, Option<&String>> = HashMap::new();
173
174 for name in self.nodes.keys() {
176 color.insert(name, 0);
177 parent.insert(name, None);
178 }
179
180 for start in self.nodes.keys() {
182 if color[start] == 0 {
183 if let Some(cycle) = self.dfs_cycle_detect(start, &mut color, &mut parent) {
184 return Some(cycle);
185 }
186 }
187 }
188
189 None
190 }
191
192 fn dfs_cycle_detect<'a>(
194 &'a self,
195 node: &'a String,
196 color: &mut HashMap<&'a String, u8>,
197 parent: &mut HashMap<&'a String, Option<&'a String>>,
198 ) -> Option<Vec<String>> {
199 color.insert(node, 1);
201
202 if let Some(deps) = self.adjacency.get(node) {
204 for dep in deps {
205 match color.get(dep) {
206 Some(0) => {
207 parent.insert(dep, Some(node));
209 if let Some(cycle) = self.dfs_cycle_detect(dep, color, parent) {
210 return Some(cycle);
211 }
212 }
213 Some(1) => {
214 let mut cycle = vec![dep.clone()];
216 let mut current = node;
217 while current != dep {
218 cycle.push(current.clone());
219 if let Some(Some(p)) = parent.get(current) {
220 current = p;
221 } else {
222 break;
223 }
224 }
225 cycle.push(dep.clone());
226 cycle.reverse();
227 return Some(cycle);
228 }
229 _ => {
230 }
232 }
233 }
234 }
235
236 color.insert(node, 2);
238 None
239 }
240
241 #[must_use]
246 pub fn topological_order(&self) -> Vec<String> {
247 self.startup_order.clone()
248 }
249
250 fn topological_sort(&self) -> Result<Vec<String>> {
252 let mut in_degree: HashMap<&String, usize> = HashMap::new();
253 let mut queue: VecDeque<&String> = VecDeque::new();
254 let mut result = Vec::new();
255
256 for name in self.nodes.keys() {
258 let degree = self.adjacency.get(name).map_or(0, std::vec::Vec::len);
259 in_degree.insert(name, degree);
260 if degree == 0 {
261 queue.push_back(name);
262 }
263 }
264
265 while let Some(node) = queue.pop_front() {
267 result.push(node.clone());
268
269 if let Some(dependents) = self.reverse_adjacency.get(node) {
271 for dependent in dependents {
272 if let Some(degree) = in_degree.get_mut(dependent) {
273 *degree -= 1;
274 if *degree == 0 {
275 queue.push_back(dependent);
276 }
277 }
278 }
279 }
280 }
281
282 if result.len() != self.nodes.len() {
284 return Err(AgentError::InvalidSpec(
285 "Dependency graph has unresolved cycles".to_string(),
286 ));
287 }
288
289 Ok(result)
290 }
291
292 #[must_use]
294 pub fn startup_order(&self) -> &[String] {
295 &self.startup_order
296 }
297
298 #[must_use]
300 pub fn dependencies(&self, service: &str) -> Option<&[DependsSpec]> {
301 self.nodes.get(service).map(|n| n.depends_on.as_slice())
302 }
303
304 #[must_use]
306 pub fn len(&self) -> usize {
307 self.nodes.len()
308 }
309
310 #[must_use]
312 pub fn is_empty(&self) -> bool {
313 self.nodes.is_empty()
314 }
315
316 #[must_use]
318 pub fn depends_on(&self, a: &str, b: &str) -> bool {
319 if a == b {
320 return false;
321 }
322
323 let mut visited = HashSet::new();
324 let mut stack = vec![a];
325
326 while let Some(current) = stack.pop() {
327 if visited.contains(current) {
328 continue;
329 }
330 visited.insert(current);
331
332 if let Some(deps) = self.adjacency.get(current) {
333 for dep in deps {
334 if dep == b {
335 return true;
336 }
337 if !visited.contains(dep.as_str()) {
338 stack.push(dep);
339 }
340 }
341 }
342 }
343
344 false
345 }
346
347 #[must_use]
349 pub fn dependents(&self, service: &str) -> Vec<String> {
350 self.reverse_adjacency
351 .get(service)
352 .cloned()
353 .unwrap_or_default()
354 }
355}
356
357pub struct DependencyConditionChecker {
364 runtime: Arc<dyn Runtime + Send + Sync>,
366 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
368 service_registry: Option<Arc<ServiceRegistry>>,
370}
371
372impl DependencyConditionChecker {
373 pub fn new(
380 runtime: Arc<dyn Runtime + Send + Sync>,
381 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
382 service_registry: Option<Arc<ServiceRegistry>>,
383 ) -> Self {
384 Self {
385 runtime,
386 health_states,
387 service_registry,
388 }
389 }
390
391 pub async fn check(&self, dep: &DependsSpec) -> Result<bool> {
402 match dep.condition {
403 DependencyCondition::Started => self.check_started(&dep.service).await,
404 DependencyCondition::Healthy => self.check_healthy(&dep.service).await,
405 DependencyCondition::Ready => self.check_ready(&dep.service).await,
406 }
407 }
408
409 pub async fn check_started(&self, service: &str) -> Result<bool> {
416 let id = ContainerId {
419 service: service.to_string(),
420 replica: 1,
421 };
422
423 match self.runtime.container_state(&id).await {
424 Ok(ContainerState::Running) => Ok(true),
425 Ok(_) | Err(AgentError::NotFound { .. }) => Ok(false),
426 Err(e) => Err(e), }
428 }
429
430 pub async fn check_healthy(&self, service: &str) -> Result<bool> {
438 let health_states = self.health_states.read().await;
439
440 match health_states.get(service) {
441 Some(HealthState::Healthy) => Ok(true),
442 Some(_) | None => Ok(false),
443 }
444 }
445
446 pub async fn check_ready(&self, service: &str) -> Result<bool> {
457 if let Some(registry) = &self.service_registry {
458 let services = registry.list_services().await;
461 if !services.contains(&service.to_string()) {
462 return Ok(false);
463 }
464
465 let host = format!("{service}.default");
468 match registry.resolve(Some(&host), "/").await {
469 Some(resolved) => {
470 Ok(!resolved.backends.is_empty())
472 }
473 None => {
474 Ok(false)
476 }
477 }
478 } else {
479 tracing::warn!(
481 service = %service,
482 "No proxy configured for 'ready' condition check, falling back to 'healthy'"
483 );
484 self.check_healthy(service).await
485 }
486 }
487}
488
489#[derive(Debug, Clone)]
493pub enum WaitResult {
494 Satisfied,
496 TimedOutContinue,
498 TimedOutWarn {
500 service: String,
501 condition: DependencyCondition,
502 },
503 TimedOutFail {
505 service: String,
506 condition: DependencyCondition,
507 timeout: Duration,
508 },
509}
510
511impl WaitResult {
512 #[must_use]
514 pub fn is_satisfied(&self) -> bool {
515 matches!(self, WaitResult::Satisfied)
516 }
517
518 #[must_use]
520 pub fn should_continue(&self) -> bool {
521 matches!(
522 self,
523 WaitResult::Satisfied | WaitResult::TimedOutContinue | WaitResult::TimedOutWarn { .. }
524 )
525 }
526
527 #[must_use]
529 pub fn is_failure(&self) -> bool {
530 matches!(self, WaitResult::TimedOutFail { .. })
531 }
532}
533
534pub struct DependencyWaiter {
539 condition_checker: DependencyConditionChecker,
541 poll_interval: Duration,
543}
544
545impl DependencyWaiter {
546 #[must_use]
548 pub fn new(condition_checker: DependencyConditionChecker) -> Self {
549 Self {
550 condition_checker,
551 poll_interval: Duration::from_secs(1),
552 }
553 }
554
555 #[must_use]
557 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
558 self.poll_interval = interval;
559 self
560 }
561
562 #[must_use]
564 pub fn poll_interval(&self) -> Duration {
565 self.poll_interval
566 }
567
568 pub async fn wait_for_dependency(&self, dep: &DependsSpec) -> Result<WaitResult> {
582 let timeout = dep.timeout.unwrap_or(Duration::from_secs(300)); let start = std::time::Instant::now();
584
585 tracing::info!(
586 service = %dep.service,
587 condition = ?dep.condition,
588 timeout = ?timeout,
589 "Waiting for dependency"
590 );
591
592 loop {
593 match self.condition_checker.check(dep).await {
595 Ok(true) => {
596 tracing::info!(
597 service = %dep.service,
598 condition = ?dep.condition,
599 elapsed = ?start.elapsed(),
600 "Dependency condition satisfied"
601 );
602 return Ok(WaitResult::Satisfied);
603 }
604 Ok(false) => {
605 tracing::debug!(
606 service = %dep.service,
607 condition = ?dep.condition,
608 elapsed = ?start.elapsed(),
609 "Dependency condition not yet satisfied"
610 );
611 }
612 Err(e) => {
613 tracing::warn!(
614 service = %dep.service,
615 condition = ?dep.condition,
616 error = %e,
617 "Error checking dependency condition"
618 );
619 }
621 }
622
623 if start.elapsed() >= timeout {
625 return Ok(self.handle_timeout(dep, timeout));
626 }
627
628 tokio::time::sleep(self.poll_interval).await;
630 }
631 }
632
633 #[allow(clippy::unused_self)]
635 fn handle_timeout(&self, dep: &DependsSpec, timeout: Duration) -> WaitResult {
636 match dep.on_timeout {
637 TimeoutAction::Fail => {
638 tracing::error!(
639 service = %dep.service,
640 condition = ?dep.condition,
641 timeout = ?timeout,
642 "Dependency timeout - failing startup"
643 );
644 WaitResult::TimedOutFail {
645 service: dep.service.clone(),
646 condition: dep.condition,
647 timeout,
648 }
649 }
650 TimeoutAction::Warn => {
651 tracing::warn!(
652 service = %dep.service,
653 condition = ?dep.condition,
654 timeout = ?timeout,
655 "Dependency timeout - continuing with warning"
656 );
657 WaitResult::TimedOutWarn {
658 service: dep.service.clone(),
659 condition: dep.condition,
660 }
661 }
662 TimeoutAction::Continue => {
663 tracing::info!(
664 service = %dep.service,
665 condition = ?dep.condition,
666 timeout = ?timeout,
667 "Dependency timeout - continuing anyway"
668 );
669 WaitResult::TimedOutContinue
670 }
671 }
672 }
673
674 pub async fn wait_for_all(&self, deps: &[DependsSpec]) -> Result<Vec<WaitResult>> {
688 let mut results = Vec::with_capacity(deps.len());
689
690 for dep in deps {
691 let result = self.wait_for_dependency(dep).await?;
692
693 if result.is_failure() {
695 results.push(result);
696 return Ok(results);
698 }
699
700 results.push(result);
701 }
702
703 Ok(results)
704 }
705}
706
707#[cfg(test)]
708mod tests {
709 use super::*;
710 use crate::runtime::MockRuntime;
711 use zlayer_spec::{DependencyCondition, DependsSpec, TimeoutAction};
712
713 fn minimal_spec(depends: Vec<DependsSpec>) -> ServiceSpec {
715 use zlayer_spec::*;
716 let yaml = r"
717version: v1
718deployment: test
719services:
720 test:
721 rtype: service
722 image:
723 name: test:latest
724 endpoints:
725 - name: http
726 protocol: http
727 port: 8080
728";
729 let mut spec = serde_yaml::from_str::<DeploymentSpec>(yaml)
730 .unwrap()
731 .services
732 .remove("test")
733 .unwrap();
734 spec.depends = depends;
735 spec
736 }
737
738 fn dep(service: &str, condition: DependencyCondition) -> DependsSpec {
740 DependsSpec {
741 service: service.to_string(),
742 condition,
743 timeout: Some(std::time::Duration::from_secs(60)),
744 on_timeout: TimeoutAction::Fail,
745 }
746 }
747
748 #[test]
751 fn test_build_empty_graph() {
752 let services: HashMap<String, ServiceSpec> = HashMap::new();
753 let graph = DependencyGraph::build(&services).unwrap();
754 assert!(graph.is_empty());
755 assert!(graph.startup_order().is_empty());
756 }
757
758 #[test]
759 fn test_build_no_dependencies() {
760 let mut services = HashMap::new();
761 services.insert("a".to_string(), minimal_spec(vec![]));
762 services.insert("b".to_string(), minimal_spec(vec![]));
763 services.insert("c".to_string(), minimal_spec(vec![]));
764
765 let graph = DependencyGraph::build(&services).unwrap();
766 assert_eq!(graph.len(), 3);
767 let order = graph.startup_order();
769 assert_eq!(order.len(), 3);
770 assert!(order.contains(&"a".to_string()));
771 assert!(order.contains(&"b".to_string()));
772 assert!(order.contains(&"c".to_string()));
773 }
774
775 #[test]
776 fn test_build_linear_dependencies() {
777 let mut services = HashMap::new();
779 services.insert("c".to_string(), minimal_spec(vec![]));
780 services.insert(
781 "b".to_string(),
782 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
783 );
784 services.insert(
785 "a".to_string(),
786 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
787 );
788
789 let graph = DependencyGraph::build(&services).unwrap();
790 let order = graph.startup_order();
791
792 let pos_a = order.iter().position(|x| x == "a").unwrap();
794 let pos_b = order.iter().position(|x| x == "b").unwrap();
795 let pos_c = order.iter().position(|x| x == "c").unwrap();
796
797 assert!(pos_c < pos_b);
798 assert!(pos_b < pos_a);
799 }
800
801 #[test]
802 fn test_build_diamond_dependencies() {
803 let mut services = HashMap::new();
810 services.insert("d".to_string(), minimal_spec(vec![]));
811 services.insert(
812 "b".to_string(),
813 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
814 );
815 services.insert(
816 "c".to_string(),
817 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
818 );
819 services.insert(
820 "a".to_string(),
821 minimal_spec(vec![
822 dep("b", DependencyCondition::Started),
823 dep("c", DependencyCondition::Started),
824 ]),
825 );
826
827 let graph = DependencyGraph::build(&services).unwrap();
828 let order = graph.startup_order();
829
830 let pos_a = order.iter().position(|x| x == "a").unwrap();
831 let pos_b = order.iter().position(|x| x == "b").unwrap();
832 let pos_c = order.iter().position(|x| x == "c").unwrap();
833 let pos_d = order.iter().position(|x| x == "d").unwrap();
834
835 assert!(pos_d < pos_b);
837 assert!(pos_d < pos_c);
838 assert!(pos_b < pos_a);
840 assert!(pos_c < pos_a);
841 }
842
843 #[test]
844 fn test_detect_self_dependency() {
845 let mut services = HashMap::new();
846 services.insert(
847 "a".to_string(),
848 minimal_spec(vec![dep("a", DependencyCondition::Started)]),
849 );
850
851 let result = DependencyGraph::build(&services);
852 assert!(result.is_err());
853 let err = result.unwrap_err().to_string();
854 assert!(err.contains("self-dependency"));
855 }
856
857 #[test]
858 fn test_detect_simple_cycle() {
859 let mut services = HashMap::new();
861 services.insert(
862 "a".to_string(),
863 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
864 );
865 services.insert(
866 "b".to_string(),
867 minimal_spec(vec![dep("a", DependencyCondition::Started)]),
868 );
869
870 let result = DependencyGraph::build(&services);
871 assert!(result.is_err());
872 let err = result.unwrap_err().to_string();
873 assert!(err.contains("Cyclic dependency"));
874 }
875
876 #[test]
877 fn test_detect_complex_cycle() {
878 let mut services = HashMap::new();
880 services.insert(
881 "a".to_string(),
882 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
883 );
884 services.insert(
885 "b".to_string(),
886 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
887 );
888 services.insert(
889 "c".to_string(),
890 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
891 );
892 services.insert(
893 "d".to_string(),
894 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
895 );
896
897 let result = DependencyGraph::build(&services);
898 assert!(result.is_err());
899 let err = result.unwrap_err().to_string();
900 assert!(err.contains("Cyclic dependency"));
901 }
902
903 #[test]
904 fn test_detect_missing_dependency() {
905 let mut services = HashMap::new();
906 services.insert(
907 "a".to_string(),
908 minimal_spec(vec![dep("nonexistent", DependencyCondition::Started)]),
909 );
910
911 let result = DependencyGraph::build(&services);
912 assert!(result.is_err());
913 let err = result.unwrap_err().to_string();
914 assert!(err.contains("non-existent"));
915 assert!(err.contains("nonexistent"));
916 }
917
918 #[test]
919 fn test_depends_on_transitive() {
920 let mut services = HashMap::new();
922 services.insert("c".to_string(), minimal_spec(vec![]));
923 services.insert(
924 "b".to_string(),
925 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
926 );
927 services.insert(
928 "a".to_string(),
929 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
930 );
931
932 let graph = DependencyGraph::build(&services).unwrap();
933
934 assert!(graph.depends_on("a", "b"));
936 assert!(graph.depends_on("b", "c"));
937
938 assert!(graph.depends_on("a", "c"));
940
941 assert!(!graph.depends_on("c", "a"));
943 assert!(!graph.depends_on("b", "a"));
944 assert!(!graph.depends_on("c", "b"));
945
946 assert!(!graph.depends_on("a", "a"));
948 }
949
950 #[test]
951 fn test_get_dependencies() {
952 let mut services = HashMap::new();
953 services.insert("c".to_string(), minimal_spec(vec![]));
954 services.insert(
955 "b".to_string(),
956 minimal_spec(vec![dep("c", DependencyCondition::Healthy)]),
957 );
958 services.insert(
959 "a".to_string(),
960 minimal_spec(vec![
961 dep("b", DependencyCondition::Started),
962 dep("c", DependencyCondition::Ready),
963 ]),
964 );
965
966 let graph = DependencyGraph::build(&services).unwrap();
967
968 let a_deps = graph.dependencies("a").unwrap();
969 assert_eq!(a_deps.len(), 2);
970
971 let b_deps = graph.dependencies("b").unwrap();
972 assert_eq!(b_deps.len(), 1);
973 assert_eq!(b_deps[0].service, "c");
974 assert_eq!(b_deps[0].condition, DependencyCondition::Healthy);
975
976 let c_deps = graph.dependencies("c").unwrap();
977 assert!(c_deps.is_empty());
978
979 assert!(graph.dependencies("nonexistent").is_none());
980 }
981
982 #[test]
983 fn test_dependents() {
984 let mut services = HashMap::new();
985 services.insert("c".to_string(), minimal_spec(vec![]));
986 services.insert(
987 "b".to_string(),
988 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
989 );
990 services.insert(
991 "a".to_string(),
992 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
993 );
994
995 let graph = DependencyGraph::build(&services).unwrap();
996
997 let c_dependents = graph.dependents("c");
999 assert_eq!(c_dependents.len(), 2);
1000 assert!(c_dependents.contains(&"a".to_string()));
1001 assert!(c_dependents.contains(&"b".to_string()));
1002
1003 assert!(graph.dependents("a").is_empty());
1005 assert!(graph.dependents("b").is_empty());
1006 }
1007
1008 #[tokio::test]
1011 async fn test_check_started_running() {
1012 let runtime = Arc::new(MockRuntime::new());
1013 let health_states = Arc::new(RwLock::new(HashMap::new()));
1014 let checker = DependencyConditionChecker::new(runtime.clone(), health_states, None);
1015
1016 let id = ContainerId {
1018 service: "test".to_string(),
1019 replica: 1,
1020 };
1021 let spec = minimal_spec(vec![]);
1022 runtime.create_container(&id, &spec).await.unwrap();
1023 runtime.start_container(&id).await.unwrap();
1024
1025 assert!(checker.check_started("test").await.unwrap());
1027 }
1028
1029 #[tokio::test]
1030 async fn test_check_started_not_running() {
1031 let runtime = Arc::new(MockRuntime::new());
1032 let health_states = Arc::new(RwLock::new(HashMap::new()));
1033 let checker = DependencyConditionChecker::new(runtime.clone(), health_states, None);
1034
1035 let id = ContainerId {
1037 service: "test".to_string(),
1038 replica: 1,
1039 };
1040 let spec = minimal_spec(vec![]);
1041 runtime.create_container(&id, &spec).await.unwrap();
1042
1043 assert!(!checker.check_started("test").await.unwrap());
1045 }
1046
1047 #[tokio::test]
1048 async fn test_check_started_no_container() {
1049 let runtime = Arc::new(MockRuntime::new());
1050 let health_states = Arc::new(RwLock::new(HashMap::new()));
1051 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1052
1053 assert!(!checker.check_started("nonexistent").await.unwrap());
1055 }
1056
1057 #[tokio::test]
1058 async fn test_check_healthy() {
1059 let runtime = Arc::new(MockRuntime::new());
1060 let health_states = Arc::new(RwLock::new(HashMap::new()));
1061
1062 {
1064 let mut states = health_states.write().await;
1065 states.insert("test".to_string(), HealthState::Healthy);
1066 }
1067
1068 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1069
1070 assert!(checker.check_healthy("test").await.unwrap());
1071 }
1072
1073 #[tokio::test]
1074 async fn test_check_healthy_unhealthy() {
1075 let runtime = Arc::new(MockRuntime::new());
1076 let health_states = Arc::new(RwLock::new(HashMap::new()));
1077
1078 {
1080 let mut states = health_states.write().await;
1081 states.insert(
1082 "test".to_string(),
1083 HealthState::Unhealthy {
1084 failures: 3,
1085 reason: "connection refused".to_string(),
1086 },
1087 );
1088 }
1089
1090 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1091
1092 assert!(!checker.check_healthy("test").await.unwrap());
1093 }
1094
1095 #[tokio::test]
1096 async fn test_check_healthy_unknown() {
1097 let runtime = Arc::new(MockRuntime::new());
1098 let health_states = Arc::new(RwLock::new(HashMap::new()));
1099
1100 {
1102 let mut states = health_states.write().await;
1103 states.insert("test".to_string(), HealthState::Unknown);
1104 }
1105
1106 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1107
1108 assert!(!checker.check_healthy("test").await.unwrap());
1109 }
1110
1111 #[tokio::test]
1112 async fn test_check_healthy_no_state() {
1113 let runtime = Arc::new(MockRuntime::new());
1114 let health_states = Arc::new(RwLock::new(HashMap::new()));
1115 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1116
1117 assert!(!checker.check_healthy("test").await.unwrap());
1119 }
1120
1121 #[tokio::test]
1122 async fn test_check_ready_no_registry() {
1123 let runtime = Arc::new(MockRuntime::new());
1124 let health_states = Arc::new(RwLock::new(HashMap::new()));
1125
1126 {
1128 let mut states = health_states.write().await;
1129 states.insert("test".to_string(), HealthState::Healthy);
1130 }
1131
1132 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1133
1134 assert!(checker.check_ready("test").await.unwrap());
1136 }
1137
1138 #[tokio::test]
1139 async fn test_check_ready_with_registry() {
1140 use std::net::SocketAddr;
1141 use zlayer_proxy::RouteEntry;
1142
1143 let runtime = Arc::new(MockRuntime::new());
1144 let health_states = Arc::new(RwLock::new(HashMap::new()));
1145 let registry = Arc::new(ServiceRegistry::new());
1146
1147 let entry = RouteEntry {
1152 service_name: "test".to_string(),
1153 endpoint_name: "http".to_string(),
1154 host: Some("test.default".to_string()),
1155 path_prefix: "/".to_string(),
1156 resolved: zlayer_proxy::ResolvedService {
1157 name: "test".to_string(),
1158 backends: vec!["127.0.0.1:8080".parse::<SocketAddr>().unwrap()],
1159 use_tls: false,
1160 sni_hostname: "test.local".to_string(),
1161 expose: zlayer_spec::ExposeType::Public,
1162 protocol: zlayer_spec::Protocol::Http,
1163 strip_prefix: false,
1164 path_prefix: "/".to_string(),
1165 target_port: 8080,
1166 },
1167 };
1168 registry.register(entry).await;
1169
1170 let checker = DependencyConditionChecker::new(runtime, health_states, Some(registry));
1171
1172 assert!(checker.check_ready("test").await.unwrap());
1173 }
1174
1175 #[tokio::test]
1176 async fn test_check_ready_no_backends() {
1177 use zlayer_proxy::RouteEntry;
1178
1179 let runtime = Arc::new(MockRuntime::new());
1180 let health_states = Arc::new(RwLock::new(HashMap::new()));
1181 let registry = Arc::new(ServiceRegistry::new());
1182
1183 let entry = RouteEntry {
1185 service_name: "test".to_string(),
1186 endpoint_name: "http".to_string(),
1187 host: Some("test.default".to_string()),
1188 path_prefix: "/".to_string(),
1189 resolved: zlayer_proxy::ResolvedService {
1190 name: "test".to_string(),
1191 backends: vec![], use_tls: false,
1193 sni_hostname: "test.local".to_string(),
1194 expose: zlayer_spec::ExposeType::Public,
1195 protocol: zlayer_spec::Protocol::Http,
1196 strip_prefix: false,
1197 path_prefix: "/".to_string(),
1198 target_port: 8080,
1199 },
1200 };
1201 registry.register(entry).await;
1202
1203 let checker = DependencyConditionChecker::new(runtime, health_states, Some(registry));
1204
1205 assert!(!checker.check_ready("test").await.unwrap());
1207 }
1208
1209 #[tokio::test]
1210 async fn test_check_condition_dispatches_correctly() {
1211 let runtime = Arc::new(MockRuntime::new());
1212 let health_states = Arc::new(RwLock::new(HashMap::new()));
1213
1214 {
1216 let mut states = health_states.write().await;
1217 states.insert("test".to_string(), HealthState::Healthy);
1218 }
1219
1220 let id = ContainerId {
1222 service: "test".to_string(),
1223 replica: 1,
1224 };
1225 let spec = minimal_spec(vec![]);
1226 runtime.create_container(&id, &spec).await.unwrap();
1227 runtime.start_container(&id).await.unwrap();
1228
1229 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1230
1231 let dep_started = dep("test", DependencyCondition::Started);
1233 assert!(checker.check(&dep_started).await.unwrap());
1234
1235 let dep_healthy = dep("test", DependencyCondition::Healthy);
1237 assert!(checker.check(&dep_healthy).await.unwrap());
1238
1239 let dep_ready = dep("test", DependencyCondition::Ready);
1241 assert!(checker.check(&dep_ready).await.unwrap());
1242 }
1243
1244 fn dep_with_timeout(
1248 service: &str,
1249 condition: DependencyCondition,
1250 timeout: Duration,
1251 on_timeout: TimeoutAction,
1252 ) -> DependsSpec {
1253 DependsSpec {
1254 service: service.to_string(),
1255 condition,
1256 timeout: Some(timeout),
1257 on_timeout,
1258 }
1259 }
1260
1261 #[tokio::test]
1262 async fn test_wait_satisfied_immediately() {
1263 let runtime = Arc::new(MockRuntime::new());
1264 let health_states = Arc::new(RwLock::new(HashMap::new()));
1265
1266 {
1268 let mut states = health_states.write().await;
1269 states.insert("db".to_string(), HealthState::Healthy);
1270 }
1271
1272 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1273 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1274
1275 let dep = dep_with_timeout(
1276 "db",
1277 DependencyCondition::Healthy,
1278 Duration::from_secs(5),
1279 TimeoutAction::Fail,
1280 );
1281
1282 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1283 assert!(result.is_satisfied());
1284 }
1285
1286 #[tokio::test]
1287 async fn test_wait_satisfied_after_delay() {
1288 let runtime = Arc::new(MockRuntime::new());
1289 let health_states = Arc::new(RwLock::new(HashMap::new()));
1290
1291 {
1293 let mut states = health_states.write().await;
1294 states.insert("db".to_string(), HealthState::Unknown);
1295 }
1296
1297 let health_states_clone = Arc::clone(&health_states);
1299
1300 tokio::spawn(async move {
1302 tokio::time::sleep(Duration::from_millis(150)).await;
1303 let mut states = health_states_clone.write().await;
1304 states.insert("db".to_string(), HealthState::Healthy);
1305 });
1306
1307 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1308 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1309
1310 let dep = dep_with_timeout(
1311 "db",
1312 DependencyCondition::Healthy,
1313 Duration::from_secs(5),
1314 TimeoutAction::Fail,
1315 );
1316
1317 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1318 assert!(result.is_satisfied());
1319 }
1320
1321 #[tokio::test]
1322 async fn test_wait_timeout_fail() {
1323 let runtime = Arc::new(MockRuntime::new());
1324 let health_states = Arc::new(RwLock::new(HashMap::new()));
1325
1326 {
1328 let mut states = health_states.write().await;
1329 states.insert("db".to_string(), HealthState::Unknown);
1330 }
1331
1332 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1333 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1334
1335 let dep = dep_with_timeout(
1336 "db",
1337 DependencyCondition::Healthy,
1338 Duration::from_millis(200), TimeoutAction::Fail,
1340 );
1341
1342 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1343 assert!(result.is_failure());
1344
1345 match result {
1346 WaitResult::TimedOutFail {
1347 service,
1348 condition,
1349 timeout,
1350 } => {
1351 assert_eq!(service, "db");
1352 assert_eq!(condition, DependencyCondition::Healthy);
1353 assert_eq!(timeout, Duration::from_millis(200));
1354 }
1355 _ => panic!("Expected TimedOutFail"),
1356 }
1357 }
1358
1359 #[tokio::test]
1360 async fn test_wait_timeout_warn() {
1361 let runtime = Arc::new(MockRuntime::new());
1362 let health_states = Arc::new(RwLock::new(HashMap::new()));
1363
1364 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1365 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1366
1367 let dep = dep_with_timeout(
1368 "db",
1369 DependencyCondition::Healthy,
1370 Duration::from_millis(100),
1371 TimeoutAction::Warn,
1372 );
1373
1374 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1375 assert!(result.should_continue());
1376 assert!(!result.is_satisfied());
1377
1378 match result {
1379 WaitResult::TimedOutWarn { service, condition } => {
1380 assert_eq!(service, "db");
1381 assert_eq!(condition, DependencyCondition::Healthy);
1382 }
1383 _ => panic!("Expected TimedOutWarn"),
1384 }
1385 }
1386
1387 #[tokio::test]
1388 async fn test_wait_timeout_continue() {
1389 let runtime = Arc::new(MockRuntime::new());
1390 let health_states = Arc::new(RwLock::new(HashMap::new()));
1391
1392 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1393 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1394
1395 let dep = dep_with_timeout(
1396 "db",
1397 DependencyCondition::Healthy,
1398 Duration::from_millis(100),
1399 TimeoutAction::Continue,
1400 );
1401
1402 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1403 assert!(result.should_continue());
1404 assert!(!result.is_satisfied());
1405 assert!(matches!(result, WaitResult::TimedOutContinue));
1406 }
1407
1408 #[tokio::test]
1409 async fn test_wait_for_all_success() {
1410 let runtime = Arc::new(MockRuntime::new());
1411 let health_states = Arc::new(RwLock::new(HashMap::new()));
1412
1413 {
1415 let mut states = health_states.write().await;
1416 states.insert("db".to_string(), HealthState::Healthy);
1417 states.insert("cache".to_string(), HealthState::Healthy);
1418 }
1419
1420 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1421 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1422
1423 let deps = vec![
1424 dep_with_timeout(
1425 "db",
1426 DependencyCondition::Healthy,
1427 Duration::from_secs(5),
1428 TimeoutAction::Fail,
1429 ),
1430 dep_with_timeout(
1431 "cache",
1432 DependencyCondition::Healthy,
1433 Duration::from_secs(5),
1434 TimeoutAction::Fail,
1435 ),
1436 ];
1437
1438 let results = waiter.wait_for_all(&deps).await.unwrap();
1439 assert_eq!(results.len(), 2);
1440 assert!(results.iter().all(super::WaitResult::is_satisfied));
1441 }
1442
1443 #[tokio::test]
1444 async fn test_wait_for_all_early_failure() {
1445 let runtime = Arc::new(MockRuntime::new());
1446 let health_states = Arc::new(RwLock::new(HashMap::new()));
1447
1448 {
1450 let mut states = health_states.write().await;
1451 states.insert("cache".to_string(), HealthState::Healthy);
1452 }
1453
1454 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1455 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1456
1457 let deps = vec![
1458 dep_with_timeout(
1459 "db",
1460 DependencyCondition::Healthy,
1461 Duration::from_millis(100), TimeoutAction::Fail,
1463 ),
1464 dep_with_timeout(
1465 "cache",
1466 DependencyCondition::Healthy,
1467 Duration::from_secs(5),
1468 TimeoutAction::Fail,
1469 ),
1470 ];
1471
1472 let results = waiter.wait_for_all(&deps).await.unwrap();
1473 assert_eq!(results.len(), 1);
1475 assert!(results[0].is_failure());
1476 }
1477
1478 #[tokio::test]
1479 async fn test_wait_for_all_mixed_results() {
1480 let runtime = Arc::new(MockRuntime::new());
1481 let health_states = Arc::new(RwLock::new(HashMap::new()));
1482
1483 {
1485 let mut states = health_states.write().await;
1486 states.insert("db".to_string(), HealthState::Healthy);
1487 }
1489
1490 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1491 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1492
1493 let deps = vec![
1494 dep_with_timeout(
1495 "db",
1496 DependencyCondition::Healthy,
1497 Duration::from_secs(5),
1498 TimeoutAction::Fail,
1499 ),
1500 dep_with_timeout(
1501 "cache",
1502 DependencyCondition::Healthy,
1503 Duration::from_millis(100),
1504 TimeoutAction::Warn, ),
1506 ];
1507
1508 let results = waiter.wait_for_all(&deps).await.unwrap();
1509 assert_eq!(results.len(), 2);
1510 assert!(results[0].is_satisfied()); assert!(matches!(results[1], WaitResult::TimedOutWarn { .. })); }
1513
1514 #[test]
1515 fn test_wait_result_helpers() {
1516 let satisfied = WaitResult::Satisfied;
1517 assert!(satisfied.is_satisfied());
1518 assert!(satisfied.should_continue());
1519 assert!(!satisfied.is_failure());
1520
1521 let continue_result = WaitResult::TimedOutContinue;
1522 assert!(!continue_result.is_satisfied());
1523 assert!(continue_result.should_continue());
1524 assert!(!continue_result.is_failure());
1525
1526 let warn = WaitResult::TimedOutWarn {
1527 service: "db".to_string(),
1528 condition: DependencyCondition::Healthy,
1529 };
1530 assert!(!warn.is_satisfied());
1531 assert!(warn.should_continue());
1532 assert!(!warn.is_failure());
1533
1534 let fail = WaitResult::TimedOutFail {
1535 service: "db".to_string(),
1536 condition: DependencyCondition::Healthy,
1537 timeout: Duration::from_secs(60),
1538 };
1539 assert!(!fail.is_satisfied());
1540 assert!(!fail.should_continue());
1541 assert!(fail.is_failure());
1542 }
1543}