1use std::collections::HashMap;
15use std::sync::Arc;
16
17use petgraph::algo::toposort;
18use petgraph::graph::{DiGraph, NodeIndex};
19use petgraph::visit::EdgeRef;
20use serde::{Deserialize, Serialize};
21use tokio::sync::Mutex;
22
23use super::error::{GraphError, StygianError};
24use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct Node {
44 pub id: String,
46
47 pub service: String,
49
50 pub config: serde_json::Value,
52
53 #[serde(default)]
55 pub metadata: serde_json::Value,
56}
57
58impl Node {
59 pub fn new(
72 id: impl Into<String>,
73 service: impl Into<String>,
74 config: serde_json::Value,
75 ) -> Self {
76 Self {
77 id: id.into(),
78 service: service.into(),
79 config,
80 metadata: serde_json::Value::Null,
81 }
82 }
83
84 pub fn with_metadata(
86 id: impl Into<String>,
87 service: impl Into<String>,
88 config: serde_json::Value,
89 metadata: serde_json::Value,
90 ) -> Self {
91 Self {
92 id: id.into(),
93 service: service.into(),
94 config,
95 metadata,
96 }
97 }
98
99 pub fn validate(&self) -> Result<(), StygianError> {
105 if self.id.is_empty() {
106 return Err(GraphError::InvalidEdge("Node ID cannot be empty".into()).into());
107 }
108 if self.service.is_empty() {
109 return Err(GraphError::InvalidEdge("Node service type cannot be empty".into()).into());
110 }
111 Ok(())
112 }
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct Edge {
129 pub from: String,
131
132 pub to: String,
134
135 #[serde(default)]
137 pub config: serde_json::Value,
138}
139
140impl Edge {
141 pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
153 Self {
154 from: from.into(),
155 to: to.into(),
156 config: serde_json::Value::Null,
157 }
158 }
159
160 pub fn with_config(
162 from: impl Into<String>,
163 to: impl Into<String>,
164 config: serde_json::Value,
165 ) -> Self {
166 Self {
167 from: from.into(),
168 to: to.into(),
169 config,
170 }
171 }
172
173 pub fn validate(&self) -> Result<(), StygianError> {
179 if self.from.is_empty() || self.to.is_empty() {
180 return Err(GraphError::InvalidEdge("Edge endpoints cannot be empty".into()).into());
181 }
182 Ok(())
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct Pipeline {
201 pub name: String,
203
204 pub nodes: Vec<Node>,
206
207 pub edges: Vec<Edge>,
209
210 #[serde(default)]
212 pub metadata: serde_json::Value,
213}
214
215impl Pipeline {
216 pub fn new(name: impl Into<String>) -> Self {
228 Self {
229 name: name.into(),
230 nodes: Vec::new(),
231 edges: Vec::new(),
232 metadata: serde_json::Value::Null,
233 }
234 }
235
236 pub fn add_node(&mut self, node: Node) {
238 self.nodes.push(node);
239 }
240
241 pub fn add_edge(&mut self, edge: Edge) {
243 self.edges.push(edge);
244 }
245
246 pub fn validate(&self) -> Result<(), StygianError> {
252 for node in &self.nodes {
253 node.validate()?;
254 }
255 for edge in &self.edges {
256 edge.validate()?;
257 }
258 Ok(())
259 }
260}
261
262#[derive(Debug, Clone)]
264pub struct NodeResult {
265 pub node_id: String,
267 pub output: ServiceOutput,
269}
270
271pub struct DagExecutor {
277 graph: DiGraph<Node, ()>,
278 _node_indices: HashMap<String, NodeIndex>,
279}
280
281impl DagExecutor {
282 #[must_use]
292 pub fn new() -> Self {
293 Self {
294 graph: DiGraph::new(),
295 _node_indices: HashMap::new(),
296 }
297 }
298
299 pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, StygianError> {
306 pipeline.validate()?;
307
308 let mut graph = DiGraph::new();
309 let mut node_indices = HashMap::new();
310
311 for node in &pipeline.nodes {
313 let idx = graph.add_node(node.clone());
314 node_indices.insert(node.id.clone(), idx);
315 }
316
317 for edge in &pipeline.edges {
319 let from_idx = node_indices
320 .get(&edge.from)
321 .ok_or_else(|| GraphError::NodeNotFound(edge.from.clone()))?;
322 let to_idx = node_indices
323 .get(&edge.to)
324 .ok_or_else(|| GraphError::NodeNotFound(edge.to.clone()))?;
325 graph.add_edge(*from_idx, *to_idx, ());
326 }
327
328 if petgraph::algo::is_cyclic_directed(&graph) {
330 return Err(GraphError::CycleDetected.into());
331 }
332
333 Ok(Self {
334 graph,
335 _node_indices: node_indices,
336 })
337 }
338
339 pub async fn execute(
349 &self,
350 services: &HashMap<String, Arc<dyn ScrapingService>>,
351 ) -> Result<Vec<NodeResult>, StygianError> {
352 let topo_order = toposort(&self.graph, None).map_err(|_| GraphError::CycleDetected)?;
354
355 let waves = self.build_execution_waves(&topo_order);
357
358 let results: Arc<Mutex<HashMap<String, ServiceOutput>>> =
360 Arc::new(Mutex::new(HashMap::new()));
361
362 for wave in waves {
363 let mut handles = Vec::new();
365
366 for node_idx in wave {
367 let node = self.graph[node_idx].clone();
368 let service = services.get(&node.service).cloned().ok_or_else(|| {
369 GraphError::InvalidPipeline(format!(
370 "No service registered for type '{}'",
371 node.service
372 ))
373 })?;
374
375 let upstream_data = {
377 let store = results.lock().await;
378 let mut data = serde_json::Map::new();
379 for pred_idx in self
380 .graph
381 .neighbors_directed(node_idx, petgraph::Direction::Incoming)
382 {
383 let pred_id = &self.graph[pred_idx].id;
384 if let Some(out) = store.get(pred_id) {
385 data.insert(
386 pred_id.clone(),
387 serde_json::Value::String(out.data.clone()),
388 );
389 }
390 }
391 serde_json::Value::Object(data)
392 };
393
394 let input = ServiceInput {
395 url: node
396 .config
397 .get("url")
398 .and_then(|v| v.as_str())
399 .unwrap_or("")
400 .to_string(),
401 params: upstream_data,
402 };
403
404 let results_clone = Arc::clone(&results);
405 let node_id = node.id.clone();
406
407 handles.push(tokio::spawn(async move {
408 let output = service.execute(input).await?;
409 results_clone
410 .lock()
411 .await
412 .insert(node_id.clone(), output.clone());
413 Ok::<NodeResult, StygianError>(NodeResult { node_id, output })
414 }));
415 }
416
417 for handle in handles {
419 handle
420 .await
421 .map_err(|e| GraphError::ExecutionFailed(format!("Task join error: {e}")))??;
422 }
423 }
424
425 let store = results.lock().await;
427 let final_results = topo_order
428 .iter()
429 .filter_map(|idx| {
430 let node_id = &self.graph[*idx].id;
431 store.get(node_id).map(|output| NodeResult {
432 node_id: node_id.clone(),
433 output: output.clone(),
434 })
435 })
436 .collect();
437
438 Ok(final_results)
439 }
440
441 fn build_execution_waves(&self, topo_order: &[NodeIndex]) -> Vec<Vec<NodeIndex>> {
445 let mut level: HashMap<NodeIndex, usize> = HashMap::new();
446
447 for &idx in topo_order {
448 let max_pred_level = self
449 .graph
450 .neighbors_directed(idx, petgraph::Direction::Incoming)
451 .map(|pred| level.get(&pred).copied().unwrap_or(0) + 1)
452 .max()
453 .unwrap_or(0);
454 level.insert(idx, max_pred_level);
455 }
456
457 let max_level = level.values().copied().max().unwrap_or(0);
458 let mut waves: Vec<Vec<NodeIndex>> = vec![Vec::new(); max_level + 1];
459 for (idx, lvl) in level {
460 if let Some(wave) = waves.get_mut(lvl) {
461 wave.push(idx);
462 }
463 }
464 waves
465 }
466
467 #[must_use]
487 pub fn node_count(&self) -> usize {
488 self.graph.node_count()
489 }
490
491 #[must_use]
508 pub fn edge_count(&self) -> usize {
509 self.graph.edge_count()
510 }
511
512 #[must_use]
527 pub fn node_ids(&self) -> Vec<String> {
528 self.graph
529 .node_indices()
530 .map(|idx| self.graph[idx].id.clone())
531 .collect()
532 }
533
534 #[must_use]
551 pub fn get_node(&self, id: &str) -> Option<&Node> {
552 self.graph
553 .node_indices()
554 .find(|&idx| self.graph[idx].id == id)
555 .map(|idx| &self.graph[idx])
556 }
557
558 #[must_use]
576 pub fn predecessors(&self, id: &str) -> Vec<String> {
577 self.graph
578 .node_indices()
579 .find(|&idx| self.graph[idx].id == id)
580 .map(|idx| {
581 self.graph
582 .neighbors_directed(idx, petgraph::Direction::Incoming)
583 .map(|pred| self.graph[pred].id.clone())
584 .collect()
585 })
586 .unwrap_or_default()
587 }
588
589 #[must_use]
607 pub fn successors(&self, id: &str) -> Vec<String> {
608 self.graph
609 .node_indices()
610 .find(|&idx| self.graph[idx].id == id)
611 .map(|idx| {
612 self.graph
613 .neighbors_directed(idx, petgraph::Direction::Outgoing)
614 .map(|succ| self.graph[succ].id.clone())
615 .collect()
616 })
617 .unwrap_or_default()
618 }
619
620 #[must_use]
641 pub fn topological_order(&self) -> Vec<String> {
642 toposort(&self.graph, None)
643 .map(|indices| {
644 indices
645 .iter()
646 .map(|&idx| self.graph[idx].id.clone())
647 .collect()
648 })
649 .unwrap_or_default()
650 }
651
652 #[must_use]
674 pub fn execution_waves(&self) -> Vec<super::introspection::ExecutionWave> {
675 let Ok(topo) = toposort(&self.graph, None) else {
676 return vec![];
677 };
678
679 let waves = self.build_execution_waves(&topo);
680
681 waves
682 .into_iter()
683 .enumerate()
684 .filter(|(_, nodes)| !nodes.is_empty())
685 .map(|(level, nodes)| super::introspection::ExecutionWave {
686 level,
687 node_ids: nodes
688 .iter()
689 .map(|&idx| self.graph[idx].id.clone())
690 .collect(),
691 })
692 .collect()
693 }
694
695 #[must_use]
715 pub fn node_info(&self, id: &str) -> Option<super::introspection::NodeInfo> {
716 let depths = self.compute_depths();
717
718 self.graph
719 .node_indices()
720 .find(|&idx| self.graph[idx].id == id)
721 .map(|idx| {
722 let node = &self.graph[idx];
723 let predecessors: Vec<String> = self
724 .graph
725 .neighbors_directed(idx, petgraph::Direction::Incoming)
726 .map(|pred| self.graph[pred].id.clone())
727 .collect();
728 let successors: Vec<String> = self
729 .graph
730 .neighbors_directed(idx, petgraph::Direction::Outgoing)
731 .map(|succ| self.graph[succ].id.clone())
732 .collect();
733
734 super::introspection::NodeInfo {
735 id: node.id.clone(),
736 service: node.service.clone(),
737 depth: depths.get(&idx).copied().unwrap_or(0),
738 in_degree: predecessors.len(),
739 out_degree: successors.len(),
740 predecessors,
741 successors,
742 config: node.config.clone(),
743 metadata: node.metadata.clone(),
744 }
745 })
746 }
747
748 #[must_use]
767 pub fn query_nodes(
768 &self,
769 query: &super::introspection::NodeQuery,
770 ) -> Vec<super::introspection::NodeInfo> {
771 self.graph
772 .node_indices()
773 .filter_map(|idx| self.node_info(&self.graph[idx].id))
774 .filter(|info| query.matches(info))
775 .collect()
776 }
777
778 fn compute_depths(&self) -> HashMap<NodeIndex, usize> {
780 let mut depths = HashMap::new();
781
782 let Ok(topo) = toposort(&self.graph, None) else {
783 return depths;
784 };
785
786 for &idx in &topo {
787 let max_pred_depth = self
788 .graph
789 .neighbors_directed(idx, petgraph::Direction::Incoming)
790 .filter_map(|pred| depths.get(&pred))
791 .max()
792 .copied()
793 .map_or(0, |d| d + 1);
794 depths.insert(idx, max_pred_depth);
795 }
796
797 depths
798 }
799
800 #[must_use]
819 pub fn connectivity(&self) -> super::introspection::ConnectivityMetrics {
820 let depths = self.compute_depths();
821
822 let root_nodes: Vec<String> = self
823 .graph
824 .node_indices()
825 .filter(|&idx| {
826 self.graph
827 .neighbors_directed(idx, petgraph::Direction::Incoming)
828 .next()
829 .is_none()
830 })
831 .map(|idx| self.graph[idx].id.clone())
832 .collect();
833
834 let leaf_nodes: Vec<String> = self
835 .graph
836 .node_indices()
837 .filter(|&idx| {
838 self.graph
839 .neighbors_directed(idx, petgraph::Direction::Outgoing)
840 .next()
841 .is_none()
842 })
843 .map(|idx| self.graph[idx].id.clone())
844 .collect();
845
846 let max_depth = depths.values().copied().max().unwrap_or(0);
847
848 let total_degree: usize = self
849 .graph
850 .node_indices()
851 .map(|idx| {
852 let in_deg = self
853 .graph
854 .neighbors_directed(idx, petgraph::Direction::Incoming)
855 .count();
856 let out_deg = self
857 .graph
858 .neighbors_directed(idx, petgraph::Direction::Outgoing)
859 .count();
860 in_deg + out_deg
861 })
862 .sum();
863
864 let node_count = self.graph.node_count();
865 let avg_degree = if node_count > 0 {
866 match (u32::try_from(total_degree), u32::try_from(node_count)) {
867 (Ok(total), Ok(count)) if count > 0 => f64::from(total) / f64::from(count),
868 _ => f64::from(u32::MAX),
869 }
870 } else {
871 0.0
872 };
873
874 let component_count = petgraph::algo::connected_components(&self.graph);
876
877 super::introspection::ConnectivityMetrics {
878 is_connected: component_count <= 1,
879 component_count,
880 root_nodes,
881 leaf_nodes,
882 max_depth,
883 avg_degree,
884 }
885 }
886
887 #[must_use]
908 pub fn critical_path(&self) -> super::introspection::CriticalPath {
909 let depths = self.compute_depths();
910
911 let deepest = depths.iter().max_by_key(|&(_, d)| d);
913
914 if let Some((&end_idx, _)) = deepest {
915 let mut path = vec![self.graph[end_idx].id.clone()];
917 let mut current = end_idx;
918
919 while let Some(pred) = self
920 .graph
921 .neighbors_directed(current, petgraph::Direction::Incoming)
922 .max_by_key(|&p| depths.get(&p).copied().unwrap_or(0))
923 {
924 path.push(self.graph[pred].id.clone());
925 current = pred;
926 }
927
928 path.reverse();
929
930 super::introspection::CriticalPath {
931 length: path.len(),
932 nodes: path,
933 }
934 } else {
935 super::introspection::CriticalPath {
936 length: 0,
937 nodes: vec![],
938 }
939 }
940 }
941
942 #[must_use]
966 pub fn impact_analysis(&self, id: &str) -> super::introspection::ImpactAnalysis {
967 let node_idx = self
968 .graph
969 .node_indices()
970 .find(|&idx| self.graph[idx].id == id);
971
972 let Some(start_idx) = node_idx else {
973 return super::introspection::ImpactAnalysis {
974 node_id: id.to_string(),
975 upstream: vec![],
976 downstream: vec![],
977 total_affected: 0,
978 };
979 };
980
981 let mut upstream = Vec::new();
983 let mut visited = std::collections::HashSet::new();
984 let mut queue = std::collections::VecDeque::new();
985
986 for pred in self
987 .graph
988 .neighbors_directed(start_idx, petgraph::Direction::Incoming)
989 {
990 queue.push_back(pred);
991 }
992
993 while let Some(idx) = queue.pop_front() {
994 if visited.insert(idx) {
995 upstream.push(self.graph[idx].id.clone());
996 for pred in self
997 .graph
998 .neighbors_directed(idx, petgraph::Direction::Incoming)
999 {
1000 queue.push_back(pred);
1001 }
1002 }
1003 }
1004
1005 let mut downstream = Vec::new();
1007 visited.clear();
1008 queue.clear();
1009
1010 for succ in self
1011 .graph
1012 .neighbors_directed(start_idx, petgraph::Direction::Outgoing)
1013 {
1014 queue.push_back(succ);
1015 }
1016
1017 while let Some(idx) = queue.pop_front() {
1018 if visited.insert(idx) {
1019 downstream.push(self.graph[idx].id.clone());
1020 for succ in self
1021 .graph
1022 .neighbors_directed(idx, petgraph::Direction::Outgoing)
1023 {
1024 queue.push_back(succ);
1025 }
1026 }
1027 }
1028
1029 let total_affected = upstream.len() + downstream.len();
1030
1031 super::introspection::ImpactAnalysis {
1032 node_id: id.to_string(),
1033 upstream,
1034 downstream,
1035 total_affected,
1036 }
1037 }
1038
1039 #[must_use]
1059 pub fn snapshot(&self) -> super::introspection::GraphSnapshot {
1060 let nodes: Vec<super::introspection::NodeInfo> = self
1061 .graph
1062 .node_indices()
1063 .filter_map(|idx| self.node_info(&self.graph[idx].id))
1064 .collect();
1065
1066 let edges: Vec<super::introspection::EdgeInfo> = self
1067 .graph
1068 .edge_references()
1069 .map(|edge| {
1070 let from = &self.graph[edge.source()].id;
1071 let to = &self.graph[edge.target()].id;
1072 super::introspection::EdgeInfo {
1073 from: from.clone(),
1074 to: to.clone(),
1075 config: serde_json::Value::Null,
1076 }
1077 })
1078 .collect();
1079
1080 let mut service_distribution = HashMap::new();
1081 for node in &nodes {
1082 *service_distribution
1083 .entry(node.service.clone())
1084 .or_insert(0) += 1;
1085 }
1086
1087 super::introspection::GraphSnapshot {
1088 node_count: self.node_count(),
1089 edge_count: self.edge_count(),
1090 nodes,
1091 edges,
1092 waves: self.execution_waves(),
1093 topological_order: self.topological_order(),
1094 critical_path: self.critical_path(),
1095 connectivity: self.connectivity(),
1096 service_distribution,
1097 }
1098 }
1099}
1100
1101impl Default for DagExecutor {
1102 fn default() -> Self {
1103 Self::new()
1104 }
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109 use super::*;
1110 use crate::domain::error::Result;
1111
1112 #[test]
1113 fn test_node_creation() {
1114 let node = Node::new(
1115 "test",
1116 "http",
1117 serde_json::json!({"url": "https://example.com"}),
1118 );
1119 assert_eq!(node.id, "test");
1120 assert_eq!(node.service, "http");
1121 }
1122
1123 #[test]
1124 fn test_edge_creation() {
1125 let edge = Edge::new("a", "b");
1126 assert_eq!(edge.from, "a");
1127 assert_eq!(edge.to, "b");
1128 }
1129
1130 #[test]
1131 fn test_pipeline_validation() {
1132 let mut pipeline = Pipeline::new("test");
1133 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1134 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1135 pipeline.add_edge(Edge::new("fetch", "extract"));
1136
1137 assert!(pipeline.validate().is_ok());
1138 }
1139
1140 #[test]
1141 fn test_cycle_detection() {
1142 let mut pipeline = Pipeline::new("cyclic");
1143 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1144 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1145 pipeline.add_edge(Edge::new("a", "b"));
1146 pipeline.add_edge(Edge::new("b", "a")); let result = DagExecutor::from_pipeline(&pipeline);
1149 assert!(matches!(
1150 result,
1151 Err(StygianError::Graph(GraphError::CycleDetected))
1152 ));
1153 }
1154
1155 #[tokio::test]
1158 async fn test_diamond_concurrent_execution() -> Result<()> {
1159 use crate::adapters::noop::NoopService;
1160
1161 let mut pipeline = Pipeline::new("diamond");
1163 pipeline.add_node(Node::new("A", "noop", serde_json::json!({"url": ""})));
1164 pipeline.add_node(Node::new("B", "noop", serde_json::json!({"url": ""})));
1165 pipeline.add_node(Node::new("C", "noop", serde_json::json!({"url": ""})));
1166 pipeline.add_node(Node::new("D", "noop", serde_json::json!({"url": ""})));
1167 pipeline.add_edge(Edge::new("A", "B"));
1168 pipeline.add_edge(Edge::new("A", "C"));
1169 pipeline.add_edge(Edge::new("B", "D"));
1170 pipeline.add_edge(Edge::new("C", "D"));
1171
1172 let executor = DagExecutor::from_pipeline(&pipeline)?;
1173
1174 let mut services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
1175 HashMap::new();
1176 services.insert("noop".to_string(), std::sync::Arc::new(NoopService));
1177
1178 let results = executor.execute(&services).await?;
1179
1180 assert_eq!(results.len(), 4);
1182 let ids: Vec<&str> = results.iter().map(|r| r.node_id.as_str()).collect();
1183 assert!(ids.contains(&"A"));
1184 assert!(ids.contains(&"B"));
1185 assert!(ids.contains(&"C"));
1186 assert!(ids.contains(&"D"));
1187 Ok(())
1188 }
1189
1190 #[tokio::test]
1191 async fn test_missing_service_returns_error() -> Result<()> {
1192 let mut pipeline = Pipeline::new("test");
1193 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1194
1195 let executor = DagExecutor::from_pipeline(&pipeline)?;
1196 let services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
1197 HashMap::new();
1198
1199 let result = executor.execute(&services).await;
1200 assert!(result.is_err());
1201 Ok(())
1202 }
1203
1204 #[test]
1209 fn test_introspection_node_count() -> std::result::Result<(), Box<dyn std::error::Error>> {
1210 let mut pipeline = Pipeline::new("test");
1211 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1212 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1213 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1214
1215 let executor = DagExecutor::from_pipeline(&pipeline)?;
1216 assert_eq!(executor.node_count(), 3);
1217 assert_eq!(executor.edge_count(), 0);
1218 Ok(())
1219 }
1220
1221 #[test]
1222 fn test_introspection_node_ids() -> std::result::Result<(), Box<dyn std::error::Error>> {
1223 let mut pipeline = Pipeline::new("test");
1224 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1225 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1226
1227 let executor = DagExecutor::from_pipeline(&pipeline)?;
1228 let ids = executor.node_ids();
1229 assert!(ids.contains(&"fetch".to_string()));
1230 assert!(ids.contains(&"extract".to_string()));
1231 Ok(())
1232 }
1233
1234 #[test]
1235 fn test_introspection_get_node() -> std::result::Result<(), Box<dyn std::error::Error>> {
1236 let mut pipeline = Pipeline::new("test");
1237 pipeline.add_node(Node::new(
1238 "fetch",
1239 "http",
1240 serde_json::json!({"url": "https://example.com"}),
1241 ));
1242
1243 let executor = DagExecutor::from_pipeline(&pipeline)?;
1244
1245 let node = executor.get_node("fetch");
1246 assert!(node.is_some());
1247 let node = node.ok_or_else(|| std::io::Error::other("expected fetch node to exist"))?;
1248 assert_eq!(node.service, "http");
1249
1250 assert!(executor.get_node("nonexistent").is_none());
1251 Ok(())
1252 }
1253
1254 #[test]
1255 fn test_introspection_predecessors_successors()
1256 -> std::result::Result<(), Box<dyn std::error::Error>> {
1257 let mut pipeline = Pipeline::new("test");
1258 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1259 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1260 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1261 pipeline.add_edge(Edge::new("a", "b"));
1262 pipeline.add_edge(Edge::new("b", "c"));
1263
1264 let executor = DagExecutor::from_pipeline(&pipeline)?;
1265
1266 assert_eq!(executor.predecessors("a"), Vec::<String>::new());
1267 assert_eq!(executor.predecessors("b"), vec!["a".to_string()]);
1268 assert_eq!(executor.predecessors("c"), vec!["b".to_string()]);
1269
1270 assert_eq!(executor.successors("a"), vec!["b".to_string()]);
1271 assert_eq!(executor.successors("b"), vec!["c".to_string()]);
1272 assert_eq!(executor.successors("c"), Vec::<String>::new());
1273 Ok(())
1274 }
1275
1276 #[test]
1277 fn test_introspection_topological_order() -> std::result::Result<(), Box<dyn std::error::Error>>
1278 {
1279 let mut pipeline = Pipeline::new("test");
1280 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1281 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1282 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1283 pipeline.add_edge(Edge::new("a", "b"));
1284 pipeline.add_edge(Edge::new("b", "c"));
1285
1286 let executor = DagExecutor::from_pipeline(&pipeline)?;
1287 let order = executor.topological_order();
1288
1289 let a_pos = order
1290 .iter()
1291 .position(|x| x == "a")
1292 .ok_or_else(|| std::io::Error::other("expected node a in order"))?;
1293 let b_pos = order
1294 .iter()
1295 .position(|x| x == "b")
1296 .ok_or_else(|| std::io::Error::other("expected node b in order"))?;
1297 let c_pos = order
1298 .iter()
1299 .position(|x| x == "c")
1300 .ok_or_else(|| std::io::Error::other("expected node c in order"))?;
1301
1302 assert!(a_pos < b_pos);
1303 assert!(b_pos < c_pos);
1304 Ok(())
1305 }
1306
1307 #[test]
1308 fn test_introspection_execution_waves_diamond()
1309 -> std::result::Result<(), Box<dyn std::error::Error>> {
1310 let mut pipeline = Pipeline::new("diamond");
1312 pipeline.add_node(Node::new("A", "http", serde_json::json!({})));
1313 pipeline.add_node(Node::new("B", "http", serde_json::json!({})));
1314 pipeline.add_node(Node::new("C", "http", serde_json::json!({})));
1315 pipeline.add_node(Node::new("D", "http", serde_json::json!({})));
1316 pipeline.add_edge(Edge::new("A", "B"));
1317 pipeline.add_edge(Edge::new("A", "C"));
1318 pipeline.add_edge(Edge::new("B", "D"));
1319 pipeline.add_edge(Edge::new("C", "D"));
1320
1321 let executor = DagExecutor::from_pipeline(&pipeline)?;
1322 let waves = executor.execution_waves();
1323
1324 assert_eq!(waves.len(), 3);
1326 let first_wave = waves
1327 .first()
1328 .ok_or_else(|| std::io::Error::other("missing wave 0"))?;
1329 let second_wave = waves
1330 .get(1)
1331 .ok_or_else(|| std::io::Error::other("missing wave 1"))?;
1332 let third_wave = waves
1333 .get(2)
1334 .ok_or_else(|| std::io::Error::other("missing wave 2"))?;
1335
1336 assert_eq!(first_wave.level, 0);
1337 assert!(first_wave.node_ids.contains(&"A".to_string()));
1338 assert_eq!(second_wave.level, 1);
1339 assert!(second_wave.node_ids.contains(&"B".to_string()));
1340 assert!(second_wave.node_ids.contains(&"C".to_string()));
1341 assert_eq!(third_wave.level, 2);
1342 assert!(third_wave.node_ids.contains(&"D".to_string()));
1343 Ok(())
1344 }
1345
1346 #[test]
1347 fn test_introspection_node_info() -> std::result::Result<(), Box<dyn std::error::Error>> {
1348 let mut pipeline = Pipeline::new("test");
1349 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1350 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1351 pipeline.add_edge(Edge::new("fetch", "extract"));
1352
1353 let executor = DagExecutor::from_pipeline(&pipeline)?;
1354
1355 let info = executor
1356 .node_info("fetch")
1357 .ok_or_else(|| std::io::Error::other("expected fetch node info"))?;
1358 assert_eq!(info.id, "fetch");
1359 assert_eq!(info.service, "http");
1360 assert_eq!(info.depth, 0);
1361 assert_eq!(info.in_degree, 0);
1362 assert_eq!(info.out_degree, 1);
1363 assert!(info.successors.contains(&"extract".to_string()));
1364
1365 let info = executor
1366 .node_info("extract")
1367 .ok_or_else(|| std::io::Error::other("expected extract node info"))?;
1368 assert_eq!(info.depth, 1);
1369 assert_eq!(info.in_degree, 1);
1370 assert_eq!(info.out_degree, 0);
1371 Ok(())
1372 }
1373
1374 #[test]
1375 fn test_introspection_connectivity() -> std::result::Result<(), Box<dyn std::error::Error>> {
1376 let mut pipeline = Pipeline::new("test");
1377 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1378 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1379 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1380 pipeline.add_edge(Edge::new("a", "b"));
1381 pipeline.add_edge(Edge::new("b", "c"));
1382
1383 let executor = DagExecutor::from_pipeline(&pipeline)?;
1384 let metrics = executor.connectivity();
1385
1386 assert!(metrics.is_connected);
1387 assert_eq!(metrics.component_count, 1);
1388 assert_eq!(metrics.root_nodes, vec!["a".to_string()]);
1389 assert_eq!(metrics.leaf_nodes, vec!["c".to_string()]);
1390 assert_eq!(metrics.max_depth, 2);
1391 Ok(())
1392 }
1393
1394 #[test]
1395 fn test_introspection_critical_path() -> std::result::Result<(), Box<dyn std::error::Error>> {
1396 let mut pipeline = Pipeline::new("test");
1397 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1398 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1399 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1400 pipeline.add_edge(Edge::new("a", "b"));
1401 pipeline.add_edge(Edge::new("b", "c"));
1402
1403 let executor = DagExecutor::from_pipeline(&pipeline)?;
1404 let critical = executor.critical_path();
1405
1406 assert_eq!(critical.length, 3);
1407 assert_eq!(critical.nodes, vec!["a", "b", "c"]);
1408 Ok(())
1409 }
1410
1411 #[test]
1412 fn test_introspection_impact_analysis() -> std::result::Result<(), Box<dyn std::error::Error>> {
1413 let mut pipeline = Pipeline::new("test");
1414 pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1415 pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1416 pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1417 pipeline.add_edge(Edge::new("a", "b"));
1418 pipeline.add_edge(Edge::new("b", "c"));
1419
1420 let executor = DagExecutor::from_pipeline(&pipeline)?;
1421
1422 let impact = executor.impact_analysis("b");
1423 assert_eq!(impact.node_id, "b");
1424 assert_eq!(impact.upstream, vec!["a".to_string()]);
1425 assert_eq!(impact.downstream, vec!["c".to_string()]);
1426 assert_eq!(impact.total_affected, 2);
1427
1428 let impact = executor.impact_analysis("a");
1430 assert!(impact.upstream.is_empty());
1431 assert_eq!(impact.downstream.len(), 2);
1432
1433 let impact = executor.impact_analysis("c");
1435 assert_eq!(impact.upstream.len(), 2);
1436 assert!(impact.downstream.is_empty());
1437 Ok(())
1438 }
1439
1440 #[test]
1441 fn test_introspection_snapshot() -> std::result::Result<(), Box<dyn std::error::Error>> {
1442 let mut pipeline = Pipeline::new("test");
1443 pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1444 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1445 pipeline.add_edge(Edge::new("fetch", "extract"));
1446
1447 let executor = DagExecutor::from_pipeline(&pipeline)?;
1448 let snapshot = executor.snapshot();
1449
1450 assert_eq!(snapshot.node_count, 2);
1451 assert_eq!(snapshot.edge_count, 1);
1452 assert_eq!(snapshot.nodes.len(), 2);
1453 assert_eq!(snapshot.edges.len(), 1);
1454 assert_eq!(snapshot.waves.len(), 2);
1455 assert_eq!(snapshot.topological_order.len(), 2);
1456 assert_eq!(snapshot.critical_path.length, 2);
1457 assert_eq!(snapshot.service_distribution.get("http"), Some(&1));
1458 assert_eq!(snapshot.service_distribution.get("ai"), Some(&1));
1459 Ok(())
1460 }
1461
1462 #[test]
1463 fn test_introspection_query_nodes() -> std::result::Result<(), Box<dyn std::error::Error>> {
1464 use super::super::introspection::NodeQuery;
1465
1466 let mut pipeline = Pipeline::new("test");
1467 pipeline.add_node(Node::new("fetch1", "http", serde_json::json!({})));
1468 pipeline.add_node(Node::new("fetch2", "http", serde_json::json!({})));
1469 pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1470 pipeline.add_edge(Edge::new("fetch1", "extract"));
1471 pipeline.add_edge(Edge::new("fetch2", "extract"));
1472
1473 let executor = DagExecutor::from_pipeline(&pipeline)?;
1474
1475 let http_nodes = executor.query_nodes(&NodeQuery::by_service("http"));
1477 assert_eq!(http_nodes.len(), 2);
1478
1479 let ai_nodes = executor.query_nodes(&NodeQuery::by_service("ai"));
1480 assert_eq!(ai_nodes.len(), 1);
1481
1482 let roots = executor.query_nodes(&NodeQuery::roots());
1484 assert_eq!(roots.len(), 2);
1485
1486 let leaves = executor.query_nodes(&NodeQuery::leaves());
1488 assert_eq!(leaves.len(), 1);
1489 let leaf = leaves
1490 .first()
1491 .ok_or_else(|| std::io::Error::other("expected one leaf node"))?;
1492 assert_eq!(leaf.id, "extract");
1493 Ok(())
1494 }
1495}