1use crate::error::{AgentError, Result};
8use crate::health::HealthState;
9use crate::runtime::{ContainerId, ContainerState, Runtime};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::Arc;
12use std::time::Duration;
13use tokio::sync::RwLock;
14use zlayer_proxy::ServiceRegistry;
15use zlayer_spec::{DependencyCondition, DependsSpec, ServiceSpec, TimeoutAction};
16
17#[derive(Debug, Clone)]
19pub enum DependencyError {
20 CyclicDependency { cycle: Vec<String> },
22 MissingService { service: String, missing: String },
24 SelfDependency { service: String },
26}
27
28impl std::fmt::Display for DependencyError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 match self {
31 DependencyError::CyclicDependency { cycle } => {
32 write!(f, "Cyclic dependency detected: {}", cycle.join(" -> "))
33 }
34 DependencyError::MissingService { service, missing } => {
35 write!(
36 f,
37 "Service '{service}' depends on non-existent service '{missing}'"
38 )
39 }
40 DependencyError::SelfDependency { service } => {
41 write!(f, "Service '{service}' has a self-dependency")
42 }
43 }
44 }
45}
46
47impl std::error::Error for DependencyError {}
48
49impl From<DependencyError> for AgentError {
50 fn from(err: DependencyError) -> Self {
51 AgentError::InvalidSpec(err.to_string())
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct DependencyNode {
58 pub service_name: String,
60 pub depends_on: Vec<DependsSpec>,
62}
63
64#[derive(Debug)]
69pub struct DependencyGraph {
70 nodes: HashMap<String, DependencyNode>,
72 startup_order: Vec<String>,
74 adjacency: HashMap<String, Vec<String>>,
76 reverse_adjacency: HashMap<String, Vec<String>>,
78}
79
80impl DependencyGraph {
81 pub fn build(services: &HashMap<String, ServiceSpec>) -> Result<Self> {
97 let mut nodes = HashMap::new();
98 let mut adjacency: HashMap<String, Vec<String>> = HashMap::new();
99 let mut reverse_adjacency: HashMap<String, Vec<String>> = HashMap::new();
100
101 for (name, spec) in services {
103 nodes.insert(
104 name.clone(),
105 DependencyNode {
106 service_name: name.clone(),
107 depends_on: spec.depends.clone(),
108 },
109 );
110 adjacency.insert(name.clone(), Vec::new());
111 reverse_adjacency.insert(name.clone(), Vec::new());
112 }
113
114 for (name, spec) in services {
116 for dep in &spec.depends {
117 if dep.service == *name {
119 return Err(DependencyError::SelfDependency {
120 service: name.clone(),
121 }
122 .into());
123 }
124
125 if !services.contains_key(&dep.service) {
127 return Err(DependencyError::MissingService {
128 service: name.clone(),
129 missing: dep.service.clone(),
130 }
131 .into());
132 }
133
134 adjacency.get_mut(name).unwrap().push(dep.service.clone());
136 reverse_adjacency
137 .get_mut(&dep.service)
138 .unwrap()
139 .push(name.clone());
140 }
141 }
142
143 let mut graph = Self {
144 nodes,
145 startup_order: Vec::new(),
146 adjacency,
147 reverse_adjacency,
148 };
149
150 if let Some(cycle) = graph.detect_cycle() {
152 return Err(DependencyError::CyclicDependency { cycle }.into());
153 }
154
155 graph.startup_order = graph.topological_sort()?;
157
158 Ok(graph)
159 }
160
161 #[must_use]
170 pub fn detect_cycle(&self) -> Option<Vec<String>> {
171 let mut color: HashMap<&String, u8> = HashMap::new();
172 let mut parent: HashMap<&String, Option<&String>> = HashMap::new();
173
174 for name in self.nodes.keys() {
176 color.insert(name, 0);
177 parent.insert(name, None);
178 }
179
180 for start in self.nodes.keys() {
182 if color[start] == 0 {
183 if let Some(cycle) = self.dfs_cycle_detect(start, &mut color, &mut parent) {
184 return Some(cycle);
185 }
186 }
187 }
188
189 None
190 }
191
192 fn dfs_cycle_detect<'a>(
194 &'a self,
195 node: &'a String,
196 color: &mut HashMap<&'a String, u8>,
197 parent: &mut HashMap<&'a String, Option<&'a String>>,
198 ) -> Option<Vec<String>> {
199 color.insert(node, 1);
201
202 if let Some(deps) = self.adjacency.get(node) {
204 for dep in deps {
205 match color.get(dep) {
206 Some(0) => {
207 parent.insert(dep, Some(node));
209 if let Some(cycle) = self.dfs_cycle_detect(dep, color, parent) {
210 return Some(cycle);
211 }
212 }
213 Some(1) => {
214 let mut cycle = vec![dep.clone()];
216 let mut current = node;
217 while current != dep {
218 cycle.push(current.clone());
219 if let Some(Some(p)) = parent.get(current) {
220 current = p;
221 } else {
222 break;
223 }
224 }
225 cycle.push(dep.clone());
226 cycle.reverse();
227 return Some(cycle);
228 }
229 _ => {
230 }
232 }
233 }
234 }
235
236 color.insert(node, 2);
238 None
239 }
240
241 #[must_use]
246 pub fn topological_order(&self) -> Vec<String> {
247 self.startup_order.clone()
248 }
249
250 fn topological_sort(&self) -> Result<Vec<String>> {
252 let mut in_degree: HashMap<&String, usize> = HashMap::new();
253 let mut queue: VecDeque<&String> = VecDeque::new();
254 let mut result = Vec::new();
255
256 for name in self.nodes.keys() {
258 let degree = self.adjacency.get(name).map_or(0, std::vec::Vec::len);
259 in_degree.insert(name, degree);
260 if degree == 0 {
261 queue.push_back(name);
262 }
263 }
264
265 while let Some(node) = queue.pop_front() {
267 result.push(node.clone());
268
269 if let Some(dependents) = self.reverse_adjacency.get(node) {
271 for dependent in dependents {
272 if let Some(degree) = in_degree.get_mut(dependent) {
273 *degree -= 1;
274 if *degree == 0 {
275 queue.push_back(dependent);
276 }
277 }
278 }
279 }
280 }
281
282 if result.len() != self.nodes.len() {
284 return Err(AgentError::InvalidSpec(
285 "Dependency graph has unresolved cycles".to_string(),
286 ));
287 }
288
289 Ok(result)
290 }
291
292 #[must_use]
294 pub fn startup_order(&self) -> &[String] {
295 &self.startup_order
296 }
297
298 #[must_use]
305 pub fn teardown_order(&self) -> Vec<String> {
306 let mut order = self.startup_order.clone();
307 order.reverse();
308 order
309 }
310
311 #[must_use]
313 pub fn dependencies(&self, service: &str) -> Option<&[DependsSpec]> {
314 self.nodes.get(service).map(|n| n.depends_on.as_slice())
315 }
316
317 #[must_use]
319 pub fn len(&self) -> usize {
320 self.nodes.len()
321 }
322
323 #[must_use]
325 pub fn is_empty(&self) -> bool {
326 self.nodes.is_empty()
327 }
328
329 #[must_use]
331 pub fn depends_on(&self, a: &str, b: &str) -> bool {
332 if a == b {
333 return false;
334 }
335
336 let mut visited = HashSet::new();
337 let mut stack = vec![a];
338
339 while let Some(current) = stack.pop() {
340 if visited.contains(current) {
341 continue;
342 }
343 visited.insert(current);
344
345 if let Some(deps) = self.adjacency.get(current) {
346 for dep in deps {
347 if dep == b {
348 return true;
349 }
350 if !visited.contains(dep.as_str()) {
351 stack.push(dep);
352 }
353 }
354 }
355 }
356
357 false
358 }
359
360 #[must_use]
362 pub fn dependents(&self, service: &str) -> Vec<String> {
363 self.reverse_adjacency
364 .get(service)
365 .cloned()
366 .unwrap_or_default()
367 }
368}
369
370pub struct DependencyConditionChecker {
377 runtime: Arc<dyn Runtime + Send + Sync>,
379 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
381 service_registry: Option<Arc<ServiceRegistry>>,
383}
384
385impl DependencyConditionChecker {
386 pub fn new(
393 runtime: Arc<dyn Runtime + Send + Sync>,
394 health_states: Arc<RwLock<HashMap<String, HealthState>>>,
395 service_registry: Option<Arc<ServiceRegistry>>,
396 ) -> Self {
397 Self {
398 runtime,
399 health_states,
400 service_registry,
401 }
402 }
403
404 pub async fn check(&self, dep: &DependsSpec) -> Result<bool> {
415 match dep.condition {
416 DependencyCondition::Started => self.check_started(&dep.service).await,
417 DependencyCondition::Healthy => self.check_healthy(&dep.service).await,
418 DependencyCondition::Ready => self.check_ready(&dep.service).await,
419 }
420 }
421
422 pub async fn check_started(&self, service: &str) -> Result<bool> {
429 let id = ContainerId::new(service.to_string(), 1);
432
433 match self.runtime.container_state(&id).await {
434 Ok(ContainerState::Running) => Ok(true),
435 Ok(_) | Err(AgentError::NotFound { .. }) => Ok(false),
436 Err(e) => Err(e), }
438 }
439
440 pub async fn check_healthy(&self, service: &str) -> Result<bool> {
448 let health_states = self.health_states.read().await;
449
450 match health_states.get(service) {
451 Some(HealthState::Healthy) => Ok(true),
452 Some(_) | None => Ok(false),
453 }
454 }
455
456 pub async fn check_ready(&self, service: &str) -> Result<bool> {
467 if let Some(registry) = &self.service_registry {
468 let services = registry.list_services().await;
471 if !services.contains(&service.to_string()) {
472 return Ok(false);
473 }
474
475 let host = format!("{service}.default");
478 match registry.resolve(Some(&host), "/").await {
479 Some(resolved) => {
480 Ok(!resolved.backends.is_empty())
482 }
483 None => {
484 Ok(false)
486 }
487 }
488 } else {
489 tracing::warn!(
491 service = %service,
492 "No proxy configured for 'ready' condition check, falling back to 'healthy'"
493 );
494 self.check_healthy(service).await
495 }
496 }
497}
498
499#[derive(Debug, Clone)]
503pub enum WaitResult {
504 Satisfied,
506 TimedOutContinue,
508 TimedOutWarn {
510 service: String,
511 condition: DependencyCondition,
512 },
513 TimedOutFail {
515 service: String,
516 condition: DependencyCondition,
517 timeout: Duration,
518 },
519}
520
521impl WaitResult {
522 #[must_use]
524 pub fn is_satisfied(&self) -> bool {
525 matches!(self, WaitResult::Satisfied)
526 }
527
528 #[must_use]
530 pub fn should_continue(&self) -> bool {
531 matches!(
532 self,
533 WaitResult::Satisfied | WaitResult::TimedOutContinue | WaitResult::TimedOutWarn { .. }
534 )
535 }
536
537 #[must_use]
539 pub fn is_failure(&self) -> bool {
540 matches!(self, WaitResult::TimedOutFail { .. })
541 }
542}
543
544pub struct DependencyWaiter {
549 condition_checker: DependencyConditionChecker,
551 poll_interval: Duration,
553}
554
555impl DependencyWaiter {
556 #[must_use]
558 pub fn new(condition_checker: DependencyConditionChecker) -> Self {
559 Self {
560 condition_checker,
561 poll_interval: Duration::from_secs(1),
562 }
563 }
564
565 #[must_use]
567 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
568 self.poll_interval = interval;
569 self
570 }
571
572 #[must_use]
574 pub fn poll_interval(&self) -> Duration {
575 self.poll_interval
576 }
577
578 pub async fn wait_for_dependency(&self, dep: &DependsSpec) -> Result<WaitResult> {
592 let timeout = dep.timeout.unwrap_or(Duration::from_secs(300)); let start = std::time::Instant::now();
594
595 tracing::info!(
596 service = %dep.service,
597 condition = ?dep.condition,
598 timeout = ?timeout,
599 "Waiting for dependency"
600 );
601
602 loop {
603 match self.condition_checker.check(dep).await {
605 Ok(true) => {
606 tracing::info!(
607 service = %dep.service,
608 condition = ?dep.condition,
609 elapsed = ?start.elapsed(),
610 "Dependency condition satisfied"
611 );
612 return Ok(WaitResult::Satisfied);
613 }
614 Ok(false) => {
615 tracing::debug!(
616 service = %dep.service,
617 condition = ?dep.condition,
618 elapsed = ?start.elapsed(),
619 "Dependency condition not yet satisfied"
620 );
621 }
622 Err(e) => {
623 tracing::warn!(
624 service = %dep.service,
625 condition = ?dep.condition,
626 error = %e,
627 "Error checking dependency condition"
628 );
629 }
631 }
632
633 if start.elapsed() >= timeout {
635 return Ok(self.handle_timeout(dep, timeout));
636 }
637
638 tokio::time::sleep(self.poll_interval).await;
640 }
641 }
642
643 #[allow(clippy::unused_self)]
645 fn handle_timeout(&self, dep: &DependsSpec, timeout: Duration) -> WaitResult {
646 match dep.on_timeout {
647 TimeoutAction::Fail => {
648 tracing::error!(
649 service = %dep.service,
650 condition = ?dep.condition,
651 timeout = ?timeout,
652 "Dependency timeout - failing startup"
653 );
654 WaitResult::TimedOutFail {
655 service: dep.service.clone(),
656 condition: dep.condition,
657 timeout,
658 }
659 }
660 TimeoutAction::Warn => {
661 tracing::warn!(
662 service = %dep.service,
663 condition = ?dep.condition,
664 timeout = ?timeout,
665 "Dependency timeout - continuing with warning"
666 );
667 WaitResult::TimedOutWarn {
668 service: dep.service.clone(),
669 condition: dep.condition,
670 }
671 }
672 TimeoutAction::Continue => {
673 tracing::info!(
674 service = %dep.service,
675 condition = ?dep.condition,
676 timeout = ?timeout,
677 "Dependency timeout - continuing anyway"
678 );
679 WaitResult::TimedOutContinue
680 }
681 }
682 }
683
684 pub async fn wait_for_all(&self, deps: &[DependsSpec]) -> Result<Vec<WaitResult>> {
698 let mut results = Vec::with_capacity(deps.len());
699
700 for dep in deps {
701 let result = self.wait_for_dependency(dep).await?;
702
703 if result.is_failure() {
705 results.push(result);
706 return Ok(results);
708 }
709
710 results.push(result);
711 }
712
713 Ok(results)
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720 use crate::runtime::MockRuntime;
721 use zlayer_spec::{DependencyCondition, DependsSpec, TimeoutAction};
722
723 fn minimal_spec(depends: Vec<DependsSpec>) -> ServiceSpec {
725 use zlayer_spec::*;
726 let yaml = r"
727version: v1
728deployment: test
729services:
730 test:
731 rtype: service
732 image:
733 name: test:latest
734 endpoints:
735 - name: http
736 protocol: http
737 port: 8080
738";
739 let mut spec = serde_yaml::from_str::<DeploymentSpec>(yaml)
740 .unwrap()
741 .services
742 .remove("test")
743 .unwrap();
744 spec.depends = depends;
745 spec
746 }
747
748 fn dep(service: &str, condition: DependencyCondition) -> DependsSpec {
750 DependsSpec {
751 service: service.to_string(),
752 condition,
753 timeout: Some(std::time::Duration::from_secs(60)),
754 on_timeout: TimeoutAction::Fail,
755 }
756 }
757
758 #[test]
761 fn test_build_empty_graph() {
762 let services: HashMap<String, ServiceSpec> = HashMap::new();
763 let graph = DependencyGraph::build(&services).unwrap();
764 assert!(graph.is_empty());
765 assert!(graph.startup_order().is_empty());
766 }
767
768 #[test]
769 fn test_build_no_dependencies() {
770 let mut services = HashMap::new();
771 services.insert("a".to_string(), minimal_spec(vec![]));
772 services.insert("b".to_string(), minimal_spec(vec![]));
773 services.insert("c".to_string(), minimal_spec(vec![]));
774
775 let graph = DependencyGraph::build(&services).unwrap();
776 assert_eq!(graph.len(), 3);
777 let order = graph.startup_order();
779 assert_eq!(order.len(), 3);
780 assert!(order.contains(&"a".to_string()));
781 assert!(order.contains(&"b".to_string()));
782 assert!(order.contains(&"c".to_string()));
783 }
784
785 #[test]
786 fn test_build_linear_dependencies() {
787 let mut services = HashMap::new();
789 services.insert("c".to_string(), minimal_spec(vec![]));
790 services.insert(
791 "b".to_string(),
792 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
793 );
794 services.insert(
795 "a".to_string(),
796 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
797 );
798
799 let graph = DependencyGraph::build(&services).unwrap();
800 let order = graph.startup_order();
801
802 let pos_a = order.iter().position(|x| x == "a").unwrap();
804 let pos_b = order.iter().position(|x| x == "b").unwrap();
805 let pos_c = order.iter().position(|x| x == "c").unwrap();
806
807 assert!(pos_c < pos_b);
808 assert!(pos_b < pos_a);
809 }
810
811 #[test]
812 fn test_build_diamond_dependencies() {
813 let mut services = HashMap::new();
820 services.insert("d".to_string(), minimal_spec(vec![]));
821 services.insert(
822 "b".to_string(),
823 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
824 );
825 services.insert(
826 "c".to_string(),
827 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
828 );
829 services.insert(
830 "a".to_string(),
831 minimal_spec(vec![
832 dep("b", DependencyCondition::Started),
833 dep("c", DependencyCondition::Started),
834 ]),
835 );
836
837 let graph = DependencyGraph::build(&services).unwrap();
838 let order = graph.startup_order();
839
840 let pos_a = order.iter().position(|x| x == "a").unwrap();
841 let pos_b = order.iter().position(|x| x == "b").unwrap();
842 let pos_c = order.iter().position(|x| x == "c").unwrap();
843 let pos_d = order.iter().position(|x| x == "d").unwrap();
844
845 assert!(pos_d < pos_b);
847 assert!(pos_d < pos_c);
848 assert!(pos_b < pos_a);
850 assert!(pos_c < pos_a);
851 }
852
853 #[test]
854 fn test_teardown_order_is_reverse_of_startup() {
855 let mut services = HashMap::new();
858 services.insert("forgejodb".to_string(), minimal_spec(vec![]));
859 services.insert(
860 "forgejo".to_string(),
861 minimal_spec(vec![dep("forgejodb", DependencyCondition::Healthy)]),
862 );
863
864 let graph = DependencyGraph::build(&services).unwrap();
865
866 assert_eq!(
867 graph.startup_order(),
868 &["forgejodb".to_string(), "forgejo".to_string()],
869 "dependency must start before its dependent"
870 );
871 assert_eq!(
872 graph.teardown_order(),
873 vec!["forgejo".to_string(), "forgejodb".to_string()],
874 "dependent must be torn down before its dependency"
875 );
876 }
877
878 #[test]
879 fn test_teardown_order_three_tier_chain() {
880 let mut services = HashMap::new();
883 services.insert("db".to_string(), minimal_spec(vec![]));
884 services.insert(
885 "api".to_string(),
886 minimal_spec(vec![dep("db", DependencyCondition::Started)]),
887 );
888 services.insert(
889 "web".to_string(),
890 minimal_spec(vec![dep("api", DependencyCondition::Started)]),
891 );
892
893 let graph = DependencyGraph::build(&services).unwrap();
894
895 assert_eq!(
896 graph.startup_order(),
897 &["db".to_string(), "api".to_string(), "web".to_string()]
898 );
899 assert_eq!(
900 graph.teardown_order(),
901 vec!["web".to_string(), "api".to_string(), "db".to_string()]
902 );
903
904 let mut reversed_startup = graph.startup_order().to_vec();
906 reversed_startup.reverse();
907 assert_eq!(graph.teardown_order(), reversed_startup);
908 }
909
910 #[test]
911 fn test_detect_self_dependency() {
912 let mut services = HashMap::new();
913 services.insert(
914 "a".to_string(),
915 minimal_spec(vec![dep("a", DependencyCondition::Started)]),
916 );
917
918 let result = DependencyGraph::build(&services);
919 assert!(result.is_err());
920 let err = result.unwrap_err().to_string();
921 assert!(err.contains("self-dependency"));
922 }
923
924 #[test]
925 fn test_detect_simple_cycle() {
926 let mut services = HashMap::new();
928 services.insert(
929 "a".to_string(),
930 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
931 );
932 services.insert(
933 "b".to_string(),
934 minimal_spec(vec![dep("a", DependencyCondition::Started)]),
935 );
936
937 let result = DependencyGraph::build(&services);
938 assert!(result.is_err());
939 let err = result.unwrap_err().to_string();
940 assert!(err.contains("Cyclic dependency"));
941 }
942
943 #[test]
944 fn test_detect_complex_cycle() {
945 let mut services = HashMap::new();
947 services.insert(
948 "a".to_string(),
949 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
950 );
951 services.insert(
952 "b".to_string(),
953 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
954 );
955 services.insert(
956 "c".to_string(),
957 minimal_spec(vec![dep("d", DependencyCondition::Started)]),
958 );
959 services.insert(
960 "d".to_string(),
961 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
962 );
963
964 let result = DependencyGraph::build(&services);
965 assert!(result.is_err());
966 let err = result.unwrap_err().to_string();
967 assert!(err.contains("Cyclic dependency"));
968 }
969
970 #[test]
971 fn test_detect_missing_dependency() {
972 let mut services = HashMap::new();
973 services.insert(
974 "a".to_string(),
975 minimal_spec(vec![dep("nonexistent", DependencyCondition::Started)]),
976 );
977
978 let result = DependencyGraph::build(&services);
979 assert!(result.is_err());
980 let err = result.unwrap_err().to_string();
981 assert!(err.contains("non-existent"));
982 assert!(err.contains("nonexistent"));
983 }
984
985 #[test]
986 fn test_depends_on_transitive() {
987 let mut services = HashMap::new();
989 services.insert("c".to_string(), minimal_spec(vec![]));
990 services.insert(
991 "b".to_string(),
992 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
993 );
994 services.insert(
995 "a".to_string(),
996 minimal_spec(vec![dep("b", DependencyCondition::Started)]),
997 );
998
999 let graph = DependencyGraph::build(&services).unwrap();
1000
1001 assert!(graph.depends_on("a", "b"));
1003 assert!(graph.depends_on("b", "c"));
1004
1005 assert!(graph.depends_on("a", "c"));
1007
1008 assert!(!graph.depends_on("c", "a"));
1010 assert!(!graph.depends_on("b", "a"));
1011 assert!(!graph.depends_on("c", "b"));
1012
1013 assert!(!graph.depends_on("a", "a"));
1015 }
1016
1017 #[test]
1018 fn test_get_dependencies() {
1019 let mut services = HashMap::new();
1020 services.insert("c".to_string(), minimal_spec(vec![]));
1021 services.insert(
1022 "b".to_string(),
1023 minimal_spec(vec![dep("c", DependencyCondition::Healthy)]),
1024 );
1025 services.insert(
1026 "a".to_string(),
1027 minimal_spec(vec![
1028 dep("b", DependencyCondition::Started),
1029 dep("c", DependencyCondition::Ready),
1030 ]),
1031 );
1032
1033 let graph = DependencyGraph::build(&services).unwrap();
1034
1035 let a_deps = graph.dependencies("a").unwrap();
1036 assert_eq!(a_deps.len(), 2);
1037
1038 let b_deps = graph.dependencies("b").unwrap();
1039 assert_eq!(b_deps.len(), 1);
1040 assert_eq!(b_deps[0].service, "c");
1041 assert_eq!(b_deps[0].condition, DependencyCondition::Healthy);
1042
1043 let c_deps = graph.dependencies("c").unwrap();
1044 assert!(c_deps.is_empty());
1045
1046 assert!(graph.dependencies("nonexistent").is_none());
1047 }
1048
1049 #[test]
1050 fn test_dependents() {
1051 let mut services = HashMap::new();
1052 services.insert("c".to_string(), minimal_spec(vec![]));
1053 services.insert(
1054 "b".to_string(),
1055 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
1056 );
1057 services.insert(
1058 "a".to_string(),
1059 minimal_spec(vec![dep("c", DependencyCondition::Started)]),
1060 );
1061
1062 let graph = DependencyGraph::build(&services).unwrap();
1063
1064 let c_dependents = graph.dependents("c");
1066 assert_eq!(c_dependents.len(), 2);
1067 assert!(c_dependents.contains(&"a".to_string()));
1068 assert!(c_dependents.contains(&"b".to_string()));
1069
1070 assert!(graph.dependents("a").is_empty());
1072 assert!(graph.dependents("b").is_empty());
1073 }
1074
1075 #[tokio::test]
1078 async fn test_check_started_running() {
1079 let runtime = Arc::new(MockRuntime::new());
1080 let health_states = Arc::new(RwLock::new(HashMap::new()));
1081 let checker = DependencyConditionChecker::new(runtime.clone(), health_states, None);
1082
1083 let id = ContainerId::new("test".to_string(), 1);
1085 let spec = minimal_spec(vec![]);
1086 runtime.create_container(&id, &spec).await.unwrap();
1087 runtime.start_container(&id).await.unwrap();
1088
1089 assert!(checker.check_started("test").await.unwrap());
1091 }
1092
1093 #[tokio::test]
1094 async fn test_check_started_not_running() {
1095 let runtime = Arc::new(MockRuntime::new());
1096 let health_states = Arc::new(RwLock::new(HashMap::new()));
1097 let checker = DependencyConditionChecker::new(runtime.clone(), health_states, None);
1098
1099 let id = ContainerId::new("test".to_string(), 1);
1101 let spec = minimal_spec(vec![]);
1102 runtime.create_container(&id, &spec).await.unwrap();
1103
1104 assert!(!checker.check_started("test").await.unwrap());
1106 }
1107
1108 #[tokio::test]
1109 async fn test_check_started_no_container() {
1110 let runtime = Arc::new(MockRuntime::new());
1111 let health_states = Arc::new(RwLock::new(HashMap::new()));
1112 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1113
1114 assert!(!checker.check_started("nonexistent").await.unwrap());
1116 }
1117
1118 #[tokio::test]
1119 async fn test_check_healthy() {
1120 let runtime = Arc::new(MockRuntime::new());
1121 let health_states = Arc::new(RwLock::new(HashMap::new()));
1122
1123 {
1125 let mut states = health_states.write().await;
1126 states.insert("test".to_string(), HealthState::Healthy);
1127 }
1128
1129 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1130
1131 assert!(checker.check_healthy("test").await.unwrap());
1132 }
1133
1134 #[tokio::test]
1135 async fn test_check_healthy_unhealthy() {
1136 let runtime = Arc::new(MockRuntime::new());
1137 let health_states = Arc::new(RwLock::new(HashMap::new()));
1138
1139 {
1141 let mut states = health_states.write().await;
1142 states.insert(
1143 "test".to_string(),
1144 HealthState::Unhealthy {
1145 failures: 3,
1146 reason: "connection refused".to_string(),
1147 },
1148 );
1149 }
1150
1151 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1152
1153 assert!(!checker.check_healthy("test").await.unwrap());
1154 }
1155
1156 #[tokio::test]
1157 async fn test_check_healthy_unknown() {
1158 let runtime = Arc::new(MockRuntime::new());
1159 let health_states = Arc::new(RwLock::new(HashMap::new()));
1160
1161 {
1163 let mut states = health_states.write().await;
1164 states.insert("test".to_string(), HealthState::Unknown);
1165 }
1166
1167 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1168
1169 assert!(!checker.check_healthy("test").await.unwrap());
1170 }
1171
1172 #[tokio::test]
1173 async fn test_check_healthy_no_state() {
1174 let runtime = Arc::new(MockRuntime::new());
1175 let health_states = Arc::new(RwLock::new(HashMap::new()));
1176 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1177
1178 assert!(!checker.check_healthy("test").await.unwrap());
1180 }
1181
1182 #[tokio::test]
1183 async fn test_check_ready_no_registry() {
1184 let runtime = Arc::new(MockRuntime::new());
1185 let health_states = Arc::new(RwLock::new(HashMap::new()));
1186
1187 {
1189 let mut states = health_states.write().await;
1190 states.insert("test".to_string(), HealthState::Healthy);
1191 }
1192
1193 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1194
1195 assert!(checker.check_ready("test").await.unwrap());
1197 }
1198
1199 #[tokio::test]
1200 async fn test_check_ready_with_registry() {
1201 use std::net::SocketAddr;
1202 use zlayer_proxy::RouteEntry;
1203
1204 let runtime = Arc::new(MockRuntime::new());
1205 let health_states = Arc::new(RwLock::new(HashMap::new()));
1206 let registry = Arc::new(ServiceRegistry::new());
1207
1208 let entry = RouteEntry {
1213 service_name: "test".to_string(),
1214 endpoint_name: "http".to_string(),
1215 host: Some("test.default".to_string()),
1216 path_prefix: "/".to_string(),
1217 resolved: zlayer_proxy::ResolvedService {
1218 name: "test".to_string(),
1219 backends: vec!["127.0.0.1:8080".parse::<SocketAddr>().unwrap()],
1220 use_tls: false,
1221 sni_hostname: "test.local".to_string(),
1222 expose: zlayer_spec::ExposeType::Public,
1223 protocol: zlayer_spec::Protocol::Http,
1224 strip_prefix: false,
1225 path_prefix: "/".to_string(),
1226 target_port: 8080,
1227 },
1228 };
1229 registry.register(entry).await;
1230
1231 let checker = DependencyConditionChecker::new(runtime, health_states, Some(registry));
1232
1233 assert!(checker.check_ready("test").await.unwrap());
1234 }
1235
1236 #[tokio::test]
1237 async fn test_check_ready_no_backends() {
1238 use zlayer_proxy::RouteEntry;
1239
1240 let runtime = Arc::new(MockRuntime::new());
1241 let health_states = Arc::new(RwLock::new(HashMap::new()));
1242 let registry = Arc::new(ServiceRegistry::new());
1243
1244 let entry = RouteEntry {
1246 service_name: "test".to_string(),
1247 endpoint_name: "http".to_string(),
1248 host: Some("test.default".to_string()),
1249 path_prefix: "/".to_string(),
1250 resolved: zlayer_proxy::ResolvedService {
1251 name: "test".to_string(),
1252 backends: vec![], use_tls: false,
1254 sni_hostname: "test.local".to_string(),
1255 expose: zlayer_spec::ExposeType::Public,
1256 protocol: zlayer_spec::Protocol::Http,
1257 strip_prefix: false,
1258 path_prefix: "/".to_string(),
1259 target_port: 8080,
1260 },
1261 };
1262 registry.register(entry).await;
1263
1264 let checker = DependencyConditionChecker::new(runtime, health_states, Some(registry));
1265
1266 assert!(!checker.check_ready("test").await.unwrap());
1268 }
1269
1270 #[tokio::test]
1271 async fn test_check_condition_dispatches_correctly() {
1272 let runtime = Arc::new(MockRuntime::new());
1273 let health_states = Arc::new(RwLock::new(HashMap::new()));
1274
1275 {
1277 let mut states = health_states.write().await;
1278 states.insert("test".to_string(), HealthState::Healthy);
1279 }
1280
1281 let id = ContainerId::new("test".to_string(), 1);
1283 let spec = minimal_spec(vec![]);
1284 runtime.create_container(&id, &spec).await.unwrap();
1285 runtime.start_container(&id).await.unwrap();
1286
1287 let checker = DependencyConditionChecker::new(runtime, Arc::clone(&health_states), None);
1288
1289 let dep_started = dep("test", DependencyCondition::Started);
1291 assert!(checker.check(&dep_started).await.unwrap());
1292
1293 let dep_healthy = dep("test", DependencyCondition::Healthy);
1295 assert!(checker.check(&dep_healthy).await.unwrap());
1296
1297 let dep_ready = dep("test", DependencyCondition::Ready);
1299 assert!(checker.check(&dep_ready).await.unwrap());
1300 }
1301
1302 fn dep_with_timeout(
1306 service: &str,
1307 condition: DependencyCondition,
1308 timeout: Duration,
1309 on_timeout: TimeoutAction,
1310 ) -> DependsSpec {
1311 DependsSpec {
1312 service: service.to_string(),
1313 condition,
1314 timeout: Some(timeout),
1315 on_timeout,
1316 }
1317 }
1318
1319 #[tokio::test]
1320 async fn test_wait_satisfied_immediately() {
1321 let runtime = Arc::new(MockRuntime::new());
1322 let health_states = Arc::new(RwLock::new(HashMap::new()));
1323
1324 {
1326 let mut states = health_states.write().await;
1327 states.insert("db".to_string(), HealthState::Healthy);
1328 }
1329
1330 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1331 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1332
1333 let dep = dep_with_timeout(
1334 "db",
1335 DependencyCondition::Healthy,
1336 Duration::from_secs(5),
1337 TimeoutAction::Fail,
1338 );
1339
1340 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1341 assert!(result.is_satisfied());
1342 }
1343
1344 #[tokio::test]
1345 async fn test_wait_satisfied_after_delay() {
1346 let runtime = Arc::new(MockRuntime::new());
1347 let health_states = Arc::new(RwLock::new(HashMap::new()));
1348
1349 {
1351 let mut states = health_states.write().await;
1352 states.insert("db".to_string(), HealthState::Unknown);
1353 }
1354
1355 let health_states_clone = Arc::clone(&health_states);
1357
1358 tokio::spawn(async move {
1360 tokio::time::sleep(Duration::from_millis(150)).await;
1361 let mut states = health_states_clone.write().await;
1362 states.insert("db".to_string(), HealthState::Healthy);
1363 });
1364
1365 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1366 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1367
1368 let dep = dep_with_timeout(
1369 "db",
1370 DependencyCondition::Healthy,
1371 Duration::from_secs(5),
1372 TimeoutAction::Fail,
1373 );
1374
1375 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1376 assert!(result.is_satisfied());
1377 }
1378
1379 #[tokio::test]
1380 async fn test_wait_timeout_fail() {
1381 let runtime = Arc::new(MockRuntime::new());
1382 let health_states = Arc::new(RwLock::new(HashMap::new()));
1383
1384 {
1386 let mut states = health_states.write().await;
1387 states.insert("db".to_string(), HealthState::Unknown);
1388 }
1389
1390 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1391 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1392
1393 let dep = dep_with_timeout(
1394 "db",
1395 DependencyCondition::Healthy,
1396 Duration::from_millis(200), TimeoutAction::Fail,
1398 );
1399
1400 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1401 assert!(result.is_failure());
1402
1403 match result {
1404 WaitResult::TimedOutFail {
1405 service,
1406 condition,
1407 timeout,
1408 } => {
1409 assert_eq!(service, "db");
1410 assert_eq!(condition, DependencyCondition::Healthy);
1411 assert_eq!(timeout, Duration::from_millis(200));
1412 }
1413 _ => panic!("Expected TimedOutFail"),
1414 }
1415 }
1416
1417 #[tokio::test]
1418 async fn test_wait_timeout_warn() {
1419 let runtime = Arc::new(MockRuntime::new());
1420 let health_states = Arc::new(RwLock::new(HashMap::new()));
1421
1422 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1423 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1424
1425 let dep = dep_with_timeout(
1426 "db",
1427 DependencyCondition::Healthy,
1428 Duration::from_millis(100),
1429 TimeoutAction::Warn,
1430 );
1431
1432 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1433 assert!(result.should_continue());
1434 assert!(!result.is_satisfied());
1435
1436 match result {
1437 WaitResult::TimedOutWarn { service, condition } => {
1438 assert_eq!(service, "db");
1439 assert_eq!(condition, DependencyCondition::Healthy);
1440 }
1441 _ => panic!("Expected TimedOutWarn"),
1442 }
1443 }
1444
1445 #[tokio::test]
1446 async fn test_wait_timeout_continue() {
1447 let runtime = Arc::new(MockRuntime::new());
1448 let health_states = Arc::new(RwLock::new(HashMap::new()));
1449
1450 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1451 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1452
1453 let dep = dep_with_timeout(
1454 "db",
1455 DependencyCondition::Healthy,
1456 Duration::from_millis(100),
1457 TimeoutAction::Continue,
1458 );
1459
1460 let result = waiter.wait_for_dependency(&dep).await.unwrap();
1461 assert!(result.should_continue());
1462 assert!(!result.is_satisfied());
1463 assert!(matches!(result, WaitResult::TimedOutContinue));
1464 }
1465
1466 #[tokio::test]
1467 async fn test_wait_for_all_success() {
1468 let runtime = Arc::new(MockRuntime::new());
1469 let health_states = Arc::new(RwLock::new(HashMap::new()));
1470
1471 {
1473 let mut states = health_states.write().await;
1474 states.insert("db".to_string(), HealthState::Healthy);
1475 states.insert("cache".to_string(), HealthState::Healthy);
1476 }
1477
1478 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1479 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1480
1481 let deps = vec![
1482 dep_with_timeout(
1483 "db",
1484 DependencyCondition::Healthy,
1485 Duration::from_secs(5),
1486 TimeoutAction::Fail,
1487 ),
1488 dep_with_timeout(
1489 "cache",
1490 DependencyCondition::Healthy,
1491 Duration::from_secs(5),
1492 TimeoutAction::Fail,
1493 ),
1494 ];
1495
1496 let results = waiter.wait_for_all(&deps).await.unwrap();
1497 assert_eq!(results.len(), 2);
1498 assert!(results.iter().all(super::WaitResult::is_satisfied));
1499 }
1500
1501 #[tokio::test]
1502 async fn test_wait_for_all_early_failure() {
1503 let runtime = Arc::new(MockRuntime::new());
1504 let health_states = Arc::new(RwLock::new(HashMap::new()));
1505
1506 {
1508 let mut states = health_states.write().await;
1509 states.insert("cache".to_string(), HealthState::Healthy);
1510 }
1511
1512 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1513 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1514
1515 let deps = vec![
1516 dep_with_timeout(
1517 "db",
1518 DependencyCondition::Healthy,
1519 Duration::from_millis(100), TimeoutAction::Fail,
1521 ),
1522 dep_with_timeout(
1523 "cache",
1524 DependencyCondition::Healthy,
1525 Duration::from_secs(5),
1526 TimeoutAction::Fail,
1527 ),
1528 ];
1529
1530 let results = waiter.wait_for_all(&deps).await.unwrap();
1531 assert_eq!(results.len(), 1);
1533 assert!(results[0].is_failure());
1534 }
1535
1536 #[tokio::test]
1537 async fn test_wait_for_all_mixed_results() {
1538 let runtime = Arc::new(MockRuntime::new());
1539 let health_states = Arc::new(RwLock::new(HashMap::new()));
1540
1541 {
1543 let mut states = health_states.write().await;
1544 states.insert("db".to_string(), HealthState::Healthy);
1545 }
1547
1548 let checker = DependencyConditionChecker::new(runtime, health_states, None);
1549 let waiter = DependencyWaiter::new(checker).with_poll_interval(Duration::from_millis(50));
1550
1551 let deps = vec![
1552 dep_with_timeout(
1553 "db",
1554 DependencyCondition::Healthy,
1555 Duration::from_secs(5),
1556 TimeoutAction::Fail,
1557 ),
1558 dep_with_timeout(
1559 "cache",
1560 DependencyCondition::Healthy,
1561 Duration::from_millis(100),
1562 TimeoutAction::Warn, ),
1564 ];
1565
1566 let results = waiter.wait_for_all(&deps).await.unwrap();
1567 assert_eq!(results.len(), 2);
1568 assert!(results[0].is_satisfied()); assert!(matches!(results[1], WaitResult::TimedOutWarn { .. })); }
1571
1572 #[test]
1573 fn test_wait_result_helpers() {
1574 let satisfied = WaitResult::Satisfied;
1575 assert!(satisfied.is_satisfied());
1576 assert!(satisfied.should_continue());
1577 assert!(!satisfied.is_failure());
1578
1579 let continue_result = WaitResult::TimedOutContinue;
1580 assert!(!continue_result.is_satisfied());
1581 assert!(continue_result.should_continue());
1582 assert!(!continue_result.is_failure());
1583
1584 let warn = WaitResult::TimedOutWarn {
1585 service: "db".to_string(),
1586 condition: DependencyCondition::Healthy,
1587 };
1588 assert!(!warn.is_satisfied());
1589 assert!(warn.should_continue());
1590 assert!(!warn.is_failure());
1591
1592 let fail = WaitResult::TimedOutFail {
1593 service: "db".to_string(),
1594 condition: DependencyCondition::Healthy,
1595 timeout: Duration::from_secs(60),
1596 };
1597 assert!(!fail.is_satisfied());
1598 assert!(!fail.should_continue());
1599 assert!(fail.is_failure());
1600 }
1601}