1use std::collections::HashMap;
15use std::sync::Arc;
16
17use petgraph::algo::toposort;
18use petgraph::graph::{DiGraph, NodeIndex};
19use petgraph::visit::EdgeRef;
20use serde::{Deserialize, Serialize};
21use tokio::sync::Mutex;
22
23use super::error::{GraphError, StygianError};
24use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct Node {
44 pub id: String,
46
47 pub service: String,
49
50 pub config: serde_json::Value,
52
53 #[serde(default)]
55 pub metadata: serde_json::Value,
56}
57
58impl Node {
59 pub fn new(
72 id: impl Into<String>,
73 service: impl Into<String>,
74 config: serde_json::Value,
75 ) -> Self {
76 Self {
77 id: id.into(),
78 service: service.into(),
79 config,
80 metadata: serde_json::Value::Null,
81 }
82 }
83
84 pub fn with_metadata(
86 id: impl Into<String>,
87 service: impl Into<String>,
88 config: serde_json::Value,
89 metadata: serde_json::Value,
90 ) -> Self {
91 Self {
92 id: id.into(),
93 service: service.into(),
94 config,
95 metadata,
96 }
97 }
98
99 pub fn validate(&self) -> Result<(), StygianError> {
105 if self.id.is_empty() {
106 return Err(GraphError::InvalidEdge("Node ID cannot be empty".into()).into());
107 }
108 if self.service.is_empty() {
109 return Err(GraphError::InvalidEdge("Node service type cannot be empty".into()).into());
110 }
111 Ok(())
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct Edge {
129 pub from: String,
131
132 pub to: String,
134
135 #[serde(default)]
137 pub config: serde_json::Value,
138}
139
140impl Edge {
141 pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
153 Self {
154 from: from.into(),
155 to: to.into(),
156 config: serde_json::Value::Null,
157 }
158 }
159
160 pub fn with_config(
162 from: impl Into<String>,
163 to: impl Into<String>,
164 config: serde_json::Value,
165 ) -> Self {
166 Self {
167 from: from.into(),
168 to: to.into(),
169 config,
170 }
171 }
172
173 pub fn validate(&self) -> Result<(), StygianError> {
179 if self.from.is_empty() || self.to.is_empty() {
180 return Err(GraphError::InvalidEdge("Edge endpoints cannot be empty".into()).into());
181 }
182 Ok(())
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct Pipeline {
201 pub name: String,
203
204 pub nodes: Vec<Node>,
206
207 pub edges: Vec<Edge>,
209
210 #[serde(default)]
212 pub metadata: serde_json::Value,
213}
214
215impl Pipeline {
216 pub fn new(name: impl Into<String>) -> Self {
228 Self {
229 name: name.into(),
230 nodes: Vec::new(),
231 edges: Vec::new(),
232 metadata: serde_json::Value::Null,
233 }
234 }
235
236 pub fn add_node(&mut self, node: Node) {
238 self.nodes.push(node);
239 }
240
241 pub fn add_edge(&mut self, edge: Edge) {
243 self.edges.push(edge);
244 }
245
246 pub fn validate(&self) -> Result<(), StygianError> {
252 for node in &self.nodes {
253 node.validate()?;
254 }
255 for edge in &self.edges {
256 edge.validate()?;
257 }
258 Ok(())
259 }
260}
261
262#[derive(Debug, Clone)]
264pub struct NodeResult {
265 pub node_id: String,
267 pub output: ServiceOutput,
269}
270
271pub struct DagExecutor {
277 graph: DiGraph<Node, ()>,
278 _node_indices: HashMap<String, NodeIndex>,
279}
280
281impl DagExecutor {
282 pub fn new() -> Self {
292 Self {
293 graph: DiGraph::new(),
294 _node_indices: HashMap::new(),
295 }
296 }
297
298 pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, StygianError> {
305 pipeline.validate()?;
306
307 let mut graph = DiGraph::new();
308 let mut node_indices = HashMap::new();
309
310 for node in &pipeline.nodes {
312 let idx = graph.add_node(node.clone());
313 node_indices.insert(node.id.clone(), idx);
314 }
315
316 for edge in &pipeline.edges {
318 let from_idx = node_indices
319 .get(&edge.from)
320 .ok_or_else(|| GraphError::NodeNotFound(edge.from.clone()))?;
321 let to_idx = node_indices
322 .get(&edge.to)
323 .ok_or_else(|| GraphError::NodeNotFound(edge.to.clone()))?;
324 graph.add_edge(*from_idx, *to_idx, ());
325 }
326
327 if petgraph::algo::is_cyclic_directed(&graph) {
329 return Err(GraphError::CycleDetected.into());
330 }
331
332 Ok(Self {
333 graph,
334 _node_indices: node_indices,
335 })
336 }
337
338 pub async fn execute(
348 &self,
349 services: &HashMap<String, Arc<dyn ScrapingService>>,
350 ) -> Result<Vec<NodeResult>, StygianError> {
351 let topo_order = toposort(&self.graph, None).map_err(|_| GraphError::CycleDetected)?;
353
354 let waves = self.build_execution_waves(&topo_order);
356
357 let results: Arc<Mutex<HashMap<String, ServiceOutput>>> =
359 Arc::new(Mutex::new(HashMap::new()));
360
361 for wave in waves {
362 let mut handles = Vec::new();
364
365 for node_idx in wave {
366 let node = self.graph[node_idx].clone();
367 let service = services.get(&node.service).cloned().ok_or_else(|| {
368 GraphError::InvalidPipeline(format!(
369 "No service registered for type '{}'",
370 node.service
371 ))
372 })?;
373
374 let upstream_data = {
376 let store = results.lock().await;
377 let mut data = serde_json::Map::new();
378 for pred_idx in self
379 .graph
380 .neighbors_directed(node_idx, petgraph::Direction::Incoming)
381 {
382 let pred_id = &self.graph[pred_idx].id;
383 if let Some(out) = store.get(pred_id) {
384 data.insert(
385 pred_id.clone(),
386 serde_json::Value::String(out.data.clone()),
387 );
388 }
389 }
390 serde_json::Value::Object(data)
391 };
392
393 let input = ServiceInput {
394 url: node
395 .config
396 .get("url")
397 .and_then(|v| v.as_str())
398 .unwrap_or("")
399 .to_string(),
400 params: upstream_data,
401 };
402
403 let results_clone = Arc::clone(&results);
404 let node_id = node.id.clone();
405
406 handles.push(tokio::spawn(async move {
407 let output = service.execute(input).await?;
408 results_clone
409 .lock()
410 .await
411 .insert(node_id.clone(), output.clone());
412 Ok::<NodeResult, StygianError>(NodeResult { node_id, output })
413 }));
414 }
415
416 for handle in handles {
418 handle
419 .await
420 .map_err(|e| GraphError::ExecutionFailed(format!("Task join error: {e}")))??;
421 }
422 }
423
424 let store = results.lock().await;
426 let final_results = topo_order
427 .iter()
428 .filter_map(|idx| {
429 let node_id = &self.graph[*idx].id;
430 store.get(node_id).map(|output| NodeResult {
431 node_id: node_id.clone(),
432 output: output.clone(),
433 })
434 })
435 .collect();
436
437 Ok(final_results)
438 }
439
440 fn build_execution_waves(&self, topo_order: &[NodeIndex]) -> Vec<Vec<NodeIndex>> {
444 let mut level: HashMap<NodeIndex, usize> = HashMap::new();
445
446 for &idx in topo_order {
447 let max_pred_level = self
448 .graph
449 .neighbors_directed(idx, petgraph::Direction::Incoming)
450 .map(|pred| level.get(&pred).copied().unwrap_or(0) + 1)
451 .max()
452 .unwrap_or(0);
453 level.insert(idx, max_pred_level);
454 }
455
456 let max_level = level.values().copied().max().unwrap_or(0);
457 let mut waves: Vec<Vec<NodeIndex>> = vec![Vec::new(); max_level + 1];
458 for (idx, lvl) in level {
459 if let Some(wave) = waves.get_mut(lvl) {
460 wave.push(idx);
461 }
462 }
463 waves
464 }
465
466 #[must_use]
486 pub fn node_count(&self) -> usize {
487 self.graph.node_count()
488 }
489
490 #[must_use]
507 pub fn edge_count(&self) -> usize {
508 self.graph.edge_count()
509 }
510
511 #[must_use]
526 pub fn node_ids(&self) -> Vec<String> {
527 self.graph
528 .node_indices()
529 .map(|idx| self.graph[idx].id.clone())
530 .collect()
531 }
532
533 #[must_use]
550 pub fn get_node(&self, id: &str) -> Option<&Node> {
551 self.graph
552 .node_indices()
553 .find(|&idx| self.graph[idx].id == id)
554 .map(|idx| &self.graph[idx])
555 }
556
557 #[must_use]
575 pub fn predecessors(&self, id: &str) -> Vec<String> {
576 self.graph
577 .node_indices()
578 .find(|&idx| self.graph[idx].id == id)
579 .map(|idx| {
580 self.graph
581 .neighbors_directed(idx, petgraph::Direction::Incoming)
582 .map(|pred| self.graph[pred].id.clone())
583 .collect()
584 })
585 .unwrap_or_default()
586 }
587
588 #[must_use]
606 pub fn successors(&self, id: &str) -> Vec<String> {
607 self.graph
608 .node_indices()
609 .find(|&idx| self.graph[idx].id == id)
610 .map(|idx| {
611 self.graph
612 .neighbors_directed(idx, petgraph::Direction::Outgoing)
613 .map(|succ| self.graph[succ].id.clone())
614 .collect()
615 })
616 .unwrap_or_default()
617 }
618
619 #[must_use]
640 pub fn topological_order(&self) -> Vec<String> {
641 toposort(&self.graph, None)
642 .map(|indices| {
643 indices
644 .iter()
645 .map(|&idx| self.graph[idx].id.clone())
646 .collect()
647 })
648 .unwrap_or_default()
649 }
650
651 #[must_use]
673 pub fn execution_waves(&self) -> Vec<super::introspection::ExecutionWave> {
674 let topo = match toposort(&self.graph, None) {
675 Ok(t) => t,
676 Err(_) => return vec![],
677 };
678
679 let waves = self.build_execution_waves(&topo);
680
681 waves
682 .into_iter()
683 .enumerate()
684 .filter(|(_, nodes)| !nodes.is_empty())
685 .map(|(level, nodes)| super::introspection::ExecutionWave {
686 level,
687 node_ids: nodes
688 .iter()
689 .map(|&idx| self.graph[idx].id.clone())
690 .collect(),
691 })
692 .collect()
693 }
694
695 #[must_use]
715 pub fn node_info(&self, id: &str) -> Option<super::introspection::NodeInfo> {
716 let depths = self.compute_depths();
717
718 self.graph
719 .node_indices()
720 .find(|&idx| self.graph[idx].id == id)
721 .map(|idx| {
722 let node = &self.graph[idx];
723 let predecessors: Vec<String> = self
724 .graph
725 .neighbors_directed(idx, petgraph::Direction::Incoming)
726 .map(|pred| self.graph[pred].id.clone())
727 .collect();
728 let successors: Vec<String> = self
729 .graph
730 .neighbors_directed(idx, petgraph::Direction::Outgoing)
731 .map(|succ| self.graph[succ].id.clone())
732 .collect();
733
734 super::introspection::NodeInfo {
735 id: node.id.clone(),
736 service: node.service.clone(),
737 depth: depths.get(&idx).copied().unwrap_or(0),
738 in_degree: predecessors.len(),
739 out_degree: successors.len(),
740 predecessors,
741 successors,
742 config: node.config.clone(),
743 metadata: node.metadata.clone(),
744 }
745 })
746 }
747
748 #[must_use]
767 pub fn query_nodes(
768 &self,
769 query: &super::introspection::NodeQuery,
770 ) -> Vec<super::introspection::NodeInfo> {
771 self.graph
772 .node_indices()
773 .filter_map(|idx| self.node_info(&self.graph[idx].id))
774 .filter(|info| query.matches(info))
775 .collect()
776 }
777
778 fn compute_depths(&self) -> HashMap<NodeIndex, usize> {
780 let mut depths = HashMap::new();
781
782 let topo = match toposort(&self.graph, None) {
783 Ok(t) => t,
784 Err(_) => return depths,
785 };
786
787 for &idx in &topo {
788 let max_pred_depth = self
789 .graph
790 .neighbors_directed(idx, petgraph::Direction::Incoming)
791 .filter_map(|pred| depths.get(&pred))
792 .max()
793 .copied()
794 .map(|d| d + 1)
795 .unwrap_or(0);
796 depths.insert(idx, max_pred_depth);
797 }
798
799 depths
800 }
801
802 #[must_use]
821 pub fn connectivity(&self) -> super::introspection::ConnectivityMetrics {
822 let depths = self.compute_depths();
823
824 let root_nodes: Vec<String> = self
825 .graph
826 .node_indices()
827 .filter(|&idx| {
828 self.graph
829 .neighbors_directed(idx, petgraph::Direction::Incoming)
830 .next()
831 .is_none()
832 })
833 .map(|idx| self.graph[idx].id.clone())
834 .collect();
835
836 let leaf_nodes: Vec<String> = self
837 .graph
838 .node_indices()
839 .filter(|&idx| {
840 self.graph
841 .neighbors_directed(idx, petgraph::Direction::Outgoing)
842 .next()
843 .is_none()
844 })
845 .map(|idx| self.graph[idx].id.clone())
846 .collect();
847
848 let max_depth = depths.values().copied().max().unwrap_or(0);
849
850 let total_degree: usize = self
851 .graph
852 .node_indices()
853 .map(|idx| {
854 let in_deg = self
855 .graph
856 .neighbors_directed(idx, petgraph::Direction::Incoming)
857 .count();
858 let out_deg = self
859 .graph
860 .neighbors_directed(idx, petgraph::Direction::Outgoing)
861 .count();
862 in_deg + out_deg
863 })
864 .sum();
865
866 let node_count = self.graph.node_count();
867 let avg_degree = if node_count > 0 {
868 total_degree as f64 / node_count as f64
869 } else {
870 0.0
871 };
872
873 let component_count = petgraph::algo::connected_components(&self.graph);
875
876 super::introspection::ConnectivityMetrics {
877 is_connected: component_count <= 1,
878 component_count,
879 root_nodes,
880 leaf_nodes,
881 max_depth,
882 avg_degree,
883 }
884 }
885
886 #[must_use]
907 pub fn critical_path(&self) -> super::introspection::CriticalPath {
908 let depths = self.compute_depths();
909
910 let deepest = depths.iter().max_by_key(|&(_, d)| d);
912
913 if let Some((&end_idx, _)) = deepest {
914 let mut path = vec![self.graph[end_idx].id.clone()];
916 let mut current = end_idx;
917
918 while let Some(pred) = self
919 .graph
920 .neighbors_directed(current, petgraph::Direction::Incoming)
921 .max_by_key(|&p| depths.get(&p).copied().unwrap_or(0))
922 {
923 path.push(self.graph[pred].id.clone());
924 current = pred;
925 }
926
927 path.reverse();
928
929 super::introspection::CriticalPath {
930 length: path.len(),
931 nodes: path,
932 }
933 } else {
934 super::introspection::CriticalPath {
935 length: 0,
936 nodes: vec![],
937 }
938 }
939 }
940
941 #[must_use]
965 pub fn impact_analysis(&self, id: &str) -> super::introspection::ImpactAnalysis {
966 let node_idx = self
967 .graph
968 .node_indices()
969 .find(|&idx| self.graph[idx].id == id);
970
971 let Some(start_idx) = node_idx else {
972 return super::introspection::ImpactAnalysis {
973 node_id: id.to_string(),
974 upstream: vec![],
975 downstream: vec![],
976 total_affected: 0,
977 };
978 };
979
980 let mut upstream = Vec::new();
982 let mut visited = std::collections::HashSet::new();
983 let mut queue = std::collections::VecDeque::new();
984
985 for pred in self
986 .graph
987 .neighbors_directed(start_idx, petgraph::Direction::Incoming)
988 {
989 queue.push_back(pred);
990 }
991
992 while let Some(idx) = queue.pop_front() {
993 if visited.insert(idx) {
994 upstream.push(self.graph[idx].id.clone());
995 for pred in self
996 .graph
997 .neighbors_directed(idx, petgraph::Direction::Incoming)
998 {
999 queue.push_back(pred);
1000 }
1001 }
1002 }
1003
1004 let mut downstream = Vec::new();
1006 visited.clear();
1007 queue.clear();
1008
1009 for succ in self
1010 .graph
1011 .neighbors_directed(start_idx, petgraph::Direction::Outgoing)
1012 {
1013 queue.push_back(succ);
1014 }
1015
1016 while let Some(idx) = queue.pop_front() {
1017 if visited.insert(idx) {
1018 downstream.push(self.graph[idx].id.clone());
1019 for succ in self
1020 .graph
1021 .neighbors_directed(idx, petgraph::Direction::Outgoing)
1022 {
1023 queue.push_back(succ);
1024 }
1025 }
1026 }
1027
1028 let total_affected = upstream.len() + downstream.len();
1029
1030 super::introspection::ImpactAnalysis {
1031 node_id: id.to_string(),
1032 upstream,
1033 downstream,
1034 total_affected,
1035 }
1036 }
1037
1038 #[must_use]
1058 pub fn snapshot(&self) -> super::introspection::GraphSnapshot {
1059 let nodes: Vec<super::introspection::NodeInfo> = self
1060 .graph
1061 .node_indices()
1062 .filter_map(|idx| self.node_info(&self.graph[idx].id))
1063 .collect();
1064
1065 let edges: Vec<super::introspection::EdgeInfo> = self
1066 .graph
1067 .edge_references()
1068 .map(|edge| {
1069 let from = &self.graph[edge.source()].id;
1070 let to = &self.graph[edge.target()].id;
1071 super::introspection::EdgeInfo {
1072 from: from.clone(),
1073 to: to.clone(),
1074 config: serde_json::Value::Null,
1075 }
1076 })
1077 .collect();
1078
1079 let mut service_distribution = HashMap::new();
1080 for node in &nodes {
1081 *service_distribution
1082 .entry(node.service.clone())
1083 .or_insert(0) += 1;
1084 }
1085
1086 super::introspection::GraphSnapshot {
1087 node_count: self.node_count(),
1088 edge_count: self.edge_count(),
1089 nodes,
1090 edges,
1091 waves: self.execution_waves(),
1092 topological_order: self.topological_order(),
1093 critical_path: self.critical_path(),
1094 connectivity: self.connectivity(),
1095 service_distribution,
1096 }
1097 }
1098}
1099
1100impl Default for DagExecutor {
1101 fn default() -> Self {
1102 Self::new()
1103 }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::*;
1109 use crate::domain::error::Result;
1110
1111 #[test]
1112 fn test_node_creation() {
1113 let node = Node::new(
1114 "test",
1115 "http",
1116 serde_json::json!({"url": "https://example.com"}),
1117 );
1118 assert_eq!(node.id, "test");
1119 assert_eq!(node.service, "http");
1120 }
1121
1122 #[test]
1123 fn test_edge_creation() {
1124 let edge = Edge::new("a", "b");
1125 assert_eq!(edge.from, "a");
1126 assert_eq!(edge.to, "b");
1127 }
1128
1129 #[test]
1130 fn test_pipeline_validation() {
1131 let mut pipeline = Pipeline::new("test");
1132 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1133 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1134 pipeline.add_edge(Edge::new("fetch", "extract"));
1135
1136 assert!(pipeline.validate().is_ok());
1137 }
1138
1139 #[test]
1140 fn test_cycle_detection() {
1141 let mut pipeline = Pipeline::new("cyclic");
1142 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1143 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1144 pipeline.add_edge(Edge::new("a", "b"));
1145 pipeline.add_edge(Edge::new("b", "a")); let result = DagExecutor::from_pipeline(&pipeline);
1148 assert!(matches!(
1149 result,
1150 Err(StygianError::Graph(GraphError::CycleDetected))
1151 ));
1152 }
1153
1154 #[tokio::test]
1157 async fn test_diamond_concurrent_execution() -> Result<()> {
1158 use crate::adapters::noop::NoopService;
1159
1160 let mut pipeline = Pipeline::new("diamond");
1162 pipeline.add_node(Node::new("A", "noop", serde_json::json!({"url": ""})));
1163 pipeline.add_node(Node::new("B", "noop", serde_json::json!({"url": ""})));
1164 pipeline.add_node(Node::new("C", "noop", serde_json::json!({"url": ""})));
1165 pipeline.add_node(Node::new("D", "noop", serde_json::json!({"url": ""})));
1166 pipeline.add_edge(Edge::new("A", "B"));
1167 pipeline.add_edge(Edge::new("A", "C"));
1168 pipeline.add_edge(Edge::new("B", "D"));
1169 pipeline.add_edge(Edge::new("C", "D"));
1170
1171 let executor = DagExecutor::from_pipeline(&pipeline)?;
1172
1173 let mut services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
1174 HashMap::new();
1175 services.insert("noop".to_string(), std::sync::Arc::new(NoopService));
1176
1177 let results = executor.execute(&services).await?;
1178
1179 assert_eq!(results.len(), 4);
1181 let ids: Vec<&str> = results.iter().map(|r| r.node_id.as_str()).collect();
1182 assert!(ids.contains(&"A"));
1183 assert!(ids.contains(&"B"));
1184 assert!(ids.contains(&"C"));
1185 assert!(ids.contains(&"D"));
1186 Ok(())
1187 }
1188
1189 #[tokio::test]
1190 async fn test_missing_service_returns_error() -> Result<()> {
1191 let mut pipeline = Pipeline::new("test");
1192 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1193
1194 let executor = DagExecutor::from_pipeline(&pipeline)?;
1195 let services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
1196 HashMap::new();
1197
1198 let result = executor.execute(&services).await;
1199 assert!(result.is_err());
1200 Ok(())
1201 }
1202
1203 #[test]
1208 fn test_introspection_node_count() {
1209 let mut pipeline = Pipeline::new("test");
1210 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1211 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1212 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1213
1214 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1215 assert_eq!(executor.node_count(), 3);
1216 assert_eq!(executor.edge_count(), 0);
1217 }
1218
1219 #[test]
1220 fn test_introspection_node_ids() {
1221 let mut pipeline = Pipeline::new("test");
1222 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1223 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1224
1225 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1226 let ids = executor.node_ids();
1227 assert!(ids.contains(&"fetch".to_string()));
1228 assert!(ids.contains(&"extract".to_string()));
1229 }
1230
1231 #[test]
1232 fn test_introspection_get_node() {
1233 let mut pipeline = Pipeline::new("test");
1234 pipeline.add_node(Node::new(
1235 "fetch",
1236 "http",
1237 serde_json::json!({"url": "https://example.com"}),
1238 ));
1239
1240 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1241
1242 let node = executor.get_node("fetch");
1243 assert!(node.is_some());
1244 let node = node.unwrap();
1245 assert_eq!(node.service, "http");
1246
1247 assert!(executor.get_node("nonexistent").is_none());
1248 }
1249
1250 #[test]
1251 fn test_introspection_predecessors_successors() {
1252 let mut pipeline = Pipeline::new("test");
1253 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1254 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1255 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1256 pipeline.add_edge(Edge::new("a", "b"));
1257 pipeline.add_edge(Edge::new("b", "c"));
1258
1259 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1260
1261 assert_eq!(executor.predecessors("a"), Vec::<String>::new());
1262 assert_eq!(executor.predecessors("b"), vec!["a".to_string()]);
1263 assert_eq!(executor.predecessors("c"), vec!["b".to_string()]);
1264
1265 assert_eq!(executor.successors("a"), vec!["b".to_string()]);
1266 assert_eq!(executor.successors("b"), vec!["c".to_string()]);
1267 assert_eq!(executor.successors("c"), Vec::<String>::new());
1268 }
1269
1270 #[test]
1271 fn test_introspection_topological_order() {
1272 let mut pipeline = Pipeline::new("test");
1273 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1274 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1275 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1276 pipeline.add_edge(Edge::new("a", "b"));
1277 pipeline.add_edge(Edge::new("b", "c"));
1278
1279 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1280 let order = executor.topological_order();
1281
1282 let a_pos = order.iter().position(|x| x == "a").unwrap();
1283 let b_pos = order.iter().position(|x| x == "b").unwrap();
1284 let c_pos = order.iter().position(|x| x == "c").unwrap();
1285
1286 assert!(a_pos < b_pos);
1287 assert!(b_pos < c_pos);
1288 }
1289
1290 #[test]
1291 fn test_introspection_execution_waves_diamond() {
1292 let mut pipeline = Pipeline::new("diamond");
1294 pipeline.add_node(Node::new("A", "http", serde_json::json!({})));
1295 pipeline.add_node(Node::new("B", "http", serde_json::json!({})));
1296 pipeline.add_node(Node::new("C", "http", serde_json::json!({})));
1297 pipeline.add_node(Node::new("D", "http", serde_json::json!({})));
1298 pipeline.add_edge(Edge::new("A", "B"));
1299 pipeline.add_edge(Edge::new("A", "C"));
1300 pipeline.add_edge(Edge::new("B", "D"));
1301 pipeline.add_edge(Edge::new("C", "D"));
1302
1303 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1304 let waves = executor.execution_waves();
1305
1306 assert_eq!(waves.len(), 3);
1308 assert_eq!(waves[0].level, 0);
1309 assert!(waves[0].node_ids.contains(&"A".to_string()));
1310 assert_eq!(waves[1].level, 1);
1311 assert!(waves[1].node_ids.contains(&"B".to_string()));
1312 assert!(waves[1].node_ids.contains(&"C".to_string()));
1313 assert_eq!(waves[2].level, 2);
1314 assert!(waves[2].node_ids.contains(&"D".to_string()));
1315 }
1316
1317 #[test]
1318 fn test_introspection_node_info() {
1319 let mut pipeline = Pipeline::new("test");
1320 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1321 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1322 pipeline.add_edge(Edge::new("fetch", "extract"));
1323
1324 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1325
1326 let info = executor.node_info("fetch").unwrap();
1327 assert_eq!(info.id, "fetch");
1328 assert_eq!(info.service, "http");
1329 assert_eq!(info.depth, 0);
1330 assert_eq!(info.in_degree, 0);
1331 assert_eq!(info.out_degree, 1);
1332 assert!(info.successors.contains(&"extract".to_string()));
1333
1334 let info = executor.node_info("extract").unwrap();
1335 assert_eq!(info.depth, 1);
1336 assert_eq!(info.in_degree, 1);
1337 assert_eq!(info.out_degree, 0);
1338 }
1339
1340 #[test]
1341 fn test_introspection_connectivity() {
1342 let mut pipeline = Pipeline::new("test");
1343 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1344 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1345 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1346 pipeline.add_edge(Edge::new("a", "b"));
1347 pipeline.add_edge(Edge::new("b", "c"));
1348
1349 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1350 let metrics = executor.connectivity();
1351
1352 assert!(metrics.is_connected);
1353 assert_eq!(metrics.component_count, 1);
1354 assert_eq!(metrics.root_nodes, vec!["a".to_string()]);
1355 assert_eq!(metrics.leaf_nodes, vec!["c".to_string()]);
1356 assert_eq!(metrics.max_depth, 2);
1357 }
1358
1359 #[test]
1360 fn test_introspection_critical_path() {
1361 let mut pipeline = Pipeline::new("test");
1362 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1363 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1364 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1365 pipeline.add_edge(Edge::new("a", "b"));
1366 pipeline.add_edge(Edge::new("b", "c"));
1367
1368 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1369 let critical = executor.critical_path();
1370
1371 assert_eq!(critical.length, 3);
1372 assert_eq!(critical.nodes, vec!["a", "b", "c"]);
1373 }
1374
1375 #[test]
1376 fn test_introspection_impact_analysis() {
1377 let mut pipeline = Pipeline::new("test");
1378 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1379 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1380 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1381 pipeline.add_edge(Edge::new("a", "b"));
1382 pipeline.add_edge(Edge::new("b", "c"));
1383
1384 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1385
1386 let impact = executor.impact_analysis("b");
1387 assert_eq!(impact.node_id, "b");
1388 assert_eq!(impact.upstream, vec!["a".to_string()]);
1389 assert_eq!(impact.downstream, vec!["c".to_string()]);
1390 assert_eq!(impact.total_affected, 2);
1391
1392 let impact = executor.impact_analysis("a");
1394 assert!(impact.upstream.is_empty());
1395 assert_eq!(impact.downstream.len(), 2);
1396
1397 let impact = executor.impact_analysis("c");
1399 assert_eq!(impact.upstream.len(), 2);
1400 assert!(impact.downstream.is_empty());
1401 }
1402
1403 #[test]
1404 fn test_introspection_snapshot() {
1405 let mut pipeline = Pipeline::new("test");
1406 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1407 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1408 pipeline.add_edge(Edge::new("fetch", "extract"));
1409
1410 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1411 let snapshot = executor.snapshot();
1412
1413 assert_eq!(snapshot.node_count, 2);
1414 assert_eq!(snapshot.edge_count, 1);
1415 assert_eq!(snapshot.nodes.len(), 2);
1416 assert_eq!(snapshot.edges.len(), 1);
1417 assert_eq!(snapshot.waves.len(), 2);
1418 assert_eq!(snapshot.topological_order.len(), 2);
1419 assert_eq!(snapshot.critical_path.length, 2);
1420 assert_eq!(snapshot.service_distribution.get("http"), Some(&1));
1421 assert_eq!(snapshot.service_distribution.get("ai"), Some(&1));
1422 }
1423
1424 #[test]
1425 fn test_introspection_query_nodes() {
1426 use super::super::introspection::NodeQuery;
1427
1428 let mut pipeline = Pipeline::new("test");
1429 pipeline.add_node(Node::new("fetch1", "http", serde_json::json!({})));
1430 pipeline.add_node(Node::new("fetch2", "http", serde_json::json!({})));
1431 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1432 pipeline.add_edge(Edge::new("fetch1", "extract"));
1433 pipeline.add_edge(Edge::new("fetch2", "extract"));
1434
1435 let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1436
1437 let http_nodes = executor.query_nodes(&NodeQuery::by_service("http"));
1439 assert_eq!(http_nodes.len(), 2);
1440
1441 let ai_nodes = executor.query_nodes(&NodeQuery::by_service("ai"));
1442 assert_eq!(ai_nodes.len(), 1);
1443
1444 let roots = executor.query_nodes(&NodeQuery::roots());
1446 assert_eq!(roots.len(), 2);
1447
1448 let leaves = executor.query_nodes(&NodeQuery::leaves());
1450 assert_eq!(leaves.len(), 1);
1451 assert_eq!(leaves[0].id, "extract");
1452 }
1453}