Skip to main content

stygian_graph/domain/
graph.rs

1//! DAG execution engine
2//!
3//! Implements graph-based pipeline execution using petgraph.
4//! Defines core domain entities: Node, Edge, and Pipeline.
5//!
6//! # Example
7//!
8//! ```
9//! use stygian_graph::domain::graph::{Node, Edge, DagExecutor};
10//!
11//! let node = Node::new("fetch", "http", serde_json::json!({"url": "https://example.com"}));
12//! ```
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17use petgraph::algo::toposort;
18use petgraph::graph::{DiGraph, NodeIndex};
19use petgraph::visit::EdgeRef;
20use serde::{Deserialize, Serialize};
21use tokio::sync::Mutex;
22
23use super::error::{GraphError, StygianError};
24use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
25
26/// A node in the scraping pipeline graph
27///
28/// Represents a single operation (HTTP fetch, AI extraction, transformation, etc.)
29///
30/// # Example
31///
32/// ```
33/// use stygian_graph::domain::graph::Node;
34/// use serde_json::json;
35///
36/// let node = Node::new(
37///     "fetch_homepage",
38///     "http",
39///     json!({"url": "https://example.com", "method": "GET"})
40/// );
41/// ```
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct Node {
44    /// Unique identifier for this node
45    pub id: String,
46
47    /// Service type (e.g., `"http"`, `"ai_extract"`, `"browser"`)
48    pub service: String,
49
50    /// Node-specific configuration
51    pub config: serde_json::Value,
52
53    /// Optional node metadata
54    #[serde(default)]
55    pub metadata: serde_json::Value,
56}
57
58impl Node {
59    /// Create a new node
60    ///
61    /// # Example
62    ///
63    /// ```
64    /// use stygian_graph::domain::graph::Node;
65    /// use serde_json::json;
66    ///
67    /// let node = Node::new("fetch", "http", json!({"url": "https://example.com"}));
68    /// assert_eq!(node.id, "fetch");
69    /// assert_eq!(node.service, "http");
70    /// ```
71    pub fn new(
72        id: impl Into<String>,
73        service: impl Into<String>,
74        config: serde_json::Value,
75    ) -> Self {
76        Self {
77            id: id.into(),
78            service: service.into(),
79            config,
80            metadata: serde_json::Value::Null,
81        }
82    }
83
84    /// Create a new node with metadata
85    pub fn with_metadata(
86        id: impl Into<String>,
87        service: impl Into<String>,
88        config: serde_json::Value,
89        metadata: serde_json::Value,
90    ) -> Self {
91        Self {
92            id: id.into(),
93            service: service.into(),
94            config,
95            metadata,
96        }
97    }
98
99    /// Validate the node configuration
100    ///
101    /// # Errors
102    ///
103    /// Returns `GraphError::InvalidEdge` if the node has an empty ID or service type.
104    pub fn validate(&self) -> Result<(), StygianError> {
105        if self.id.is_empty() {
106            return Err(GraphError::InvalidEdge("Node ID cannot be empty".into()).into());
107        }
108        if self.service.is_empty() {
109            return Err(GraphError::InvalidEdge("Node service type cannot be empty".into()).into());
110        }
111        Ok(())
112    }
113}
114
115/// An edge connecting two nodes in the pipeline graph
116///
117/// Represents data flow or dependencies between operations.
118///
119/// # Example
120///
121/// ```
122/// use stygian_graph::domain::graph::Edge;
123/// use serde_json::json;
124///
125/// let edge = Edge::new("fetch_homepage", "extract_data");
126/// ```
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct Edge {
129    /// Source node ID
130    pub from: String,
131
132    /// Target node ID
133    pub to: String,
134
135    /// Optional edge configuration (data transformations, filters, etc.)
136    #[serde(default)]
137    pub config: serde_json::Value,
138}
139
140impl Edge {
141    /// Create a new edge between two nodes
142    ///
143    /// # Example
144    ///
145    /// ```
146    /// use stygian_graph::domain::graph::Edge;
147    ///
148    /// let edge = Edge::new("fetch", "extract");
149    /// assert_eq!(edge.from, "fetch");
150    /// assert_eq!(edge.to, "extract");
151    /// ```
152    pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
153        Self {
154            from: from.into(),
155            to: to.into(),
156            config: serde_json::Value::Null,
157        }
158    }
159
160    /// Create an edge with additional configuration
161    pub fn with_config(
162        from: impl Into<String>,
163        to: impl Into<String>,
164        config: serde_json::Value,
165    ) -> Self {
166        Self {
167            from: from.into(),
168            to: to.into(),
169            config,
170        }
171    }
172
173    /// Validate the edge
174    ///
175    /// # Errors
176    ///
177    /// Returns `GraphError::InvalidEdge` if the edge has empty endpoints.
178    pub fn validate(&self) -> Result<(), StygianError> {
179        if self.from.is_empty() || self.to.is_empty() {
180            return Err(GraphError::InvalidEdge("Edge endpoints cannot be empty".into()).into());
181        }
182        Ok(())
183    }
184}
185
186/// A complete pipeline definition
187///
188/// Contains the full graph structure (nodes and edges) plus metadata.
189///
190/// # Example
191///
192/// ```
193/// use stygian_graph::domain::graph::{Pipeline, Node, Edge};
194/// use serde_json::json;
195///
196/// let mut pipeline = Pipeline::new("example_pipeline");
197/// pipeline.add_node(Node::new("fetch", "http", json!({"url": "https://example.com"})));
198/// ```
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct Pipeline {
201    /// Pipeline name/identifier
202    pub name: String,
203
204    /// Nodes in the pipeline
205    pub nodes: Vec<Node>,
206
207    /// Edges connecting nodes
208    pub edges: Vec<Edge>,
209
210    /// Pipeline-level configuration and metadata
211    #[serde(default)]
212    pub metadata: serde_json::Value,
213}
214
215impl Pipeline {
216    /// Create a new empty pipeline
217    ///
218    /// # Example
219    ///
220    /// ```
221    /// use stygian_graph::domain::graph::Pipeline;
222    ///
223    /// let pipeline = Pipeline::new("my_scraper");
224    /// assert_eq!(pipeline.name, "my_scraper");
225    /// assert!(pipeline.nodes.is_empty());
226    /// ```
227    pub fn new(name: impl Into<String>) -> Self {
228        Self {
229            name: name.into(),
230            nodes: Vec::new(),
231            edges: Vec::new(),
232            metadata: serde_json::Value::Null,
233        }
234    }
235
236    /// Add a node to the pipeline
237    pub fn add_node(&mut self, node: Node) {
238        self.nodes.push(node);
239    }
240
241    /// Add an edge to the pipeline
242    pub fn add_edge(&mut self, edge: Edge) {
243        self.edges.push(edge);
244    }
245
246    /// Validate the entire pipeline
247    ///
248    /// # Errors
249    ///
250    /// Returns an error if any node or edge is invalid.
251    pub fn validate(&self) -> Result<(), StygianError> {
252        for node in &self.nodes {
253            node.validate()?;
254        }
255        for edge in &self.edges {
256            edge.validate()?;
257        }
258        Ok(())
259    }
260}
261
262/// Result of executing a single node
263#[derive(Debug, Clone)]
264pub struct NodeResult {
265    /// The node ID that produced this result
266    pub node_id: String,
267    /// The output from the service execution
268    pub output: ServiceOutput,
269}
270
271/// DAG executor that processes pipeline graphs
272///
273/// Executes scraping pipelines as directed acyclic graphs using petgraph.
274/// Independent branches are executed concurrently using `tokio::spawn`.
275/// Data from upstream nodes is passed as input to downstream nodes.
276pub struct DagExecutor {
277    graph: DiGraph<Node, ()>,
278    _node_indices: HashMap<String, NodeIndex>,
279}
280
281impl DagExecutor {
282    /// Create a new DAG executor
283    ///
284    /// # Example
285    ///
286    /// ```
287    /// use stygian_graph::domain::graph::DagExecutor;
288    ///
289    /// let executor = DagExecutor::new();
290    /// ```
291    pub fn new() -> Self {
292        Self {
293            graph: DiGraph::new(),
294            _node_indices: HashMap::new(),
295        }
296    }
297
298    /// Build a graph from a pipeline definition
299    ///
300    /// # Errors
301    ///
302    /// Returns `GraphError::CycleDetected` if the pipeline contains a cycle.
303    /// Returns `GraphError::NodeNotFound` if an edge references an unknown node.
304    pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, StygianError> {
305        pipeline.validate()?;
306
307        let mut graph = DiGraph::new();
308        let mut node_indices = HashMap::new();
309
310        // Add nodes
311        for node in &pipeline.nodes {
312            let idx = graph.add_node(node.clone());
313            node_indices.insert(node.id.clone(), idx);
314        }
315
316        // Add edges
317        for edge in &pipeline.edges {
318            let from_idx = node_indices
319                .get(&edge.from)
320                .ok_or_else(|| GraphError::NodeNotFound(edge.from.clone()))?;
321            let to_idx = node_indices
322                .get(&edge.to)
323                .ok_or_else(|| GraphError::NodeNotFound(edge.to.clone()))?;
324            graph.add_edge(*from_idx, *to_idx, ());
325        }
326
327        // Check for cycles
328        if petgraph::algo::is_cyclic_directed(&graph) {
329            return Err(GraphError::CycleDetected.into());
330        }
331
332        Ok(Self {
333            graph,
334            _node_indices: node_indices,
335        })
336    }
337
338    /// Execute the pipeline using the provided service registry.
339    ///
340    /// Nodes are executed in topological order. Independent nodes at the same
341    /// depth are spawned concurrently via `tokio::spawn`. The output of each
342    /// node is available to all downstream nodes as their `ServiceInput.params`.
343    ///
344    /// # Errors
345    ///
346    /// Returns `GraphError::ExecutionFailed` if any node execution fails.
347    pub async fn execute(
348        &self,
349        services: &HashMap<String, Arc<dyn ScrapingService>>,
350    ) -> Result<Vec<NodeResult>, StygianError> {
351        // Topological sort — safe because we checked for cycles in from_pipeline
352        let topo_order = toposort(&self.graph, None).map_err(|_| GraphError::CycleDetected)?;
353
354        // Group nodes into execution waves by their level (longest path from root)
355        let waves = self.build_execution_waves(&topo_order);
356
357        // Shared result store
358        let results: Arc<Mutex<HashMap<String, ServiceOutput>>> =
359            Arc::new(Mutex::new(HashMap::new()));
360
361        for wave in waves {
362            // Spawn all nodes in this wave concurrently
363            let mut handles = Vec::new();
364
365            for node_idx in wave {
366                let node = self.graph[node_idx].clone();
367                let service = services.get(&node.service).cloned().ok_or_else(|| {
368                    GraphError::InvalidPipeline(format!(
369                        "No service registered for type '{}'",
370                        node.service
371                    ))
372                })?;
373
374                // Collect upstream outputs as input params
375                let upstream_data = {
376                    let store = results.lock().await;
377                    let mut data = serde_json::Map::new();
378                    for pred_idx in self
379                        .graph
380                        .neighbors_directed(node_idx, petgraph::Direction::Incoming)
381                    {
382                        let pred_id = &self.graph[pred_idx].id;
383                        if let Some(out) = store.get(pred_id) {
384                            data.insert(
385                                pred_id.clone(),
386                                serde_json::Value::String(out.data.clone()),
387                            );
388                        }
389                    }
390                    serde_json::Value::Object(data)
391                };
392
393                let input = ServiceInput {
394                    url: node
395                        .config
396                        .get("url")
397                        .and_then(|v| v.as_str())
398                        .unwrap_or("")
399                        .to_string(),
400                    params: upstream_data,
401                };
402
403                let results_clone = Arc::clone(&results);
404                let node_id = node.id.clone();
405
406                handles.push(tokio::spawn(async move {
407                    let output = service.execute(input).await?;
408                    results_clone
409                        .lock()
410                        .await
411                        .insert(node_id.clone(), output.clone());
412                    Ok::<NodeResult, StygianError>(NodeResult { node_id, output })
413                }));
414            }
415
416            // Await all handles in this wave, propagating errors
417            for handle in handles {
418                handle
419                    .await
420                    .map_err(|e| GraphError::ExecutionFailed(format!("Task join error: {e}")))??;
421            }
422        }
423
424        // Collect final results in topological order
425        let store = results.lock().await;
426        let final_results = topo_order
427            .iter()
428            .filter_map(|idx| {
429                let node_id = &self.graph[*idx].id;
430                store.get(node_id).map(|output| NodeResult {
431                    node_id: node_id.clone(),
432                    output: output.clone(),
433                })
434            })
435            .collect();
436
437        Ok(final_results)
438    }
439
440    /// Build execution waves: groups of nodes that can run concurrently.
441    ///
442    /// Each wave contains nodes whose predecessors are all in earlier waves.
443    fn build_execution_waves(&self, topo_order: &[NodeIndex]) -> Vec<Vec<NodeIndex>> {
444        let mut level: HashMap<NodeIndex, usize> = HashMap::new();
445
446        for &idx in topo_order {
447            let max_pred_level = self
448                .graph
449                .neighbors_directed(idx, petgraph::Direction::Incoming)
450                .map(|pred| level.get(&pred).copied().unwrap_or(0) + 1)
451                .max()
452                .unwrap_or(0);
453            level.insert(idx, max_pred_level);
454        }
455
456        let max_level = level.values().copied().max().unwrap_or(0);
457        let mut waves: Vec<Vec<NodeIndex>> = vec![Vec::new(); max_level + 1];
458        for (idx, lvl) in level {
459            if let Some(wave) = waves.get_mut(lvl) {
460                wave.push(idx);
461            }
462        }
463        waves
464    }
465
466    // ═══════════════════════════════════════════════════════════════════════════
467    // Graph Introspection Methods
468    // ═══════════════════════════════════════════════════════════════════════════
469
470    /// Get the total number of nodes in the graph
471    ///
472    /// # Example
473    ///
474    /// ```
475    /// use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
476    /// use serde_json::json;
477    ///
478    /// let mut pipeline = Pipeline::new("test");
479    /// pipeline.add_node(Node::new("a", "http", json!({})));
480    /// pipeline.add_node(Node::new("b", "http", json!({})));
481    ///
482    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
483    /// assert_eq!(executor.node_count(), 2);
484    /// ```
485    #[must_use]
486    pub fn node_count(&self) -> usize {
487        self.graph.node_count()
488    }
489
490    /// Get the total number of edges in the graph
491    ///
492    /// # Example
493    ///
494    /// ```
495    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
496    /// use serde_json::json;
497    ///
498    /// let mut pipeline = Pipeline::new("test");
499    /// pipeline.add_node(Node::new("a", "http", json!({})));
500    /// pipeline.add_node(Node::new("b", "http", json!({})));
501    /// pipeline.add_edge(Edge::new("a", "b"));
502    ///
503    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
504    /// assert_eq!(executor.edge_count(), 1);
505    /// ```
506    #[must_use]
507    pub fn edge_count(&self) -> usize {
508        self.graph.edge_count()
509    }
510
511    /// Get all node IDs in the graph
512    ///
513    /// # Example
514    ///
515    /// ```
516    /// use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
517    /// use serde_json::json;
518    ///
519    /// let mut pipeline = Pipeline::new("test");
520    /// pipeline.add_node(Node::new("fetch", "http", json!({})));
521    ///
522    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
523    /// assert!(executor.node_ids().contains(&"fetch".to_string()));
524    /// ```
525    #[must_use]
526    pub fn node_ids(&self) -> Vec<String> {
527        self.graph
528            .node_indices()
529            .map(|idx| self.graph[idx].id.clone())
530            .collect()
531    }
532
533    /// Get a node by ID
534    ///
535    /// # Example
536    ///
537    /// ```
538    /// use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
539    /// use serde_json::json;
540    ///
541    /// let mut pipeline = Pipeline::new("test");
542    /// pipeline.add_node(Node::new("fetch", "http", json!({})));
543    ///
544    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
545    /// let node = executor.get_node("fetch");
546    /// assert!(node.is_some());
547    /// assert_eq!(node.unwrap().service, "http");
548    /// ```
549    #[must_use]
550    pub fn get_node(&self, id: &str) -> Option<&Node> {
551        self.graph
552            .node_indices()
553            .find(|&idx| self.graph[idx].id == id)
554            .map(|idx| &self.graph[idx])
555    }
556
557    /// Get the predecessors (upstream nodes) of a node
558    ///
559    /// # Example
560    ///
561    /// ```
562    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
563    /// use serde_json::json;
564    ///
565    /// let mut pipeline = Pipeline::new("test");
566    /// pipeline.add_node(Node::new("a", "http", json!({})));
567    /// pipeline.add_node(Node::new("b", "http", json!({})));
568    /// pipeline.add_edge(Edge::new("a", "b"));
569    ///
570    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
571    /// let preds = executor.predecessors("b");
572    /// assert_eq!(preds, vec!["a".to_string()]);
573    /// ```
574    #[must_use]
575    pub fn predecessors(&self, id: &str) -> Vec<String> {
576        self.graph
577            .node_indices()
578            .find(|&idx| self.graph[idx].id == id)
579            .map(|idx| {
580                self.graph
581                    .neighbors_directed(idx, petgraph::Direction::Incoming)
582                    .map(|pred| self.graph[pred].id.clone())
583                    .collect()
584            })
585            .unwrap_or_default()
586    }
587
588    /// Get the successors (downstream nodes) of a node
589    ///
590    /// # Example
591    ///
592    /// ```
593    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
594    /// use serde_json::json;
595    ///
596    /// let mut pipeline = Pipeline::new("test");
597    /// pipeline.add_node(Node::new("a", "http", json!({})));
598    /// pipeline.add_node(Node::new("b", "http", json!({})));
599    /// pipeline.add_edge(Edge::new("a", "b"));
600    ///
601    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
602    /// let succs = executor.successors("a");
603    /// assert_eq!(succs, vec!["b".to_string()]);
604    /// ```
605    #[must_use]
606    pub fn successors(&self, id: &str) -> Vec<String> {
607        self.graph
608            .node_indices()
609            .find(|&idx| self.graph[idx].id == id)
610            .map(|idx| {
611                self.graph
612                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
613                    .map(|succ| self.graph[succ].id.clone())
614                    .collect()
615            })
616            .unwrap_or_default()
617    }
618
619    /// Get the topological order of nodes
620    ///
621    /// # Example
622    ///
623    /// ```
624    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
625    /// use serde_json::json;
626    ///
627    /// let mut pipeline = Pipeline::new("test");
628    /// pipeline.add_node(Node::new("a", "http", json!({})));
629    /// pipeline.add_node(Node::new("b", "http", json!({})));
630    /// pipeline.add_edge(Edge::new("a", "b"));
631    ///
632    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
633    /// let order = executor.topological_order();
634    /// // "a" must appear before "b"
635    /// let a_pos = order.iter().position(|x| x == "a").unwrap();
636    /// let b_pos = order.iter().position(|x| x == "b").unwrap();
637    /// assert!(a_pos < b_pos);
638    /// ```
639    #[must_use]
640    pub fn topological_order(&self) -> Vec<String> {
641        toposort(&self.graph, None)
642            .map(|indices| {
643                indices
644                    .iter()
645                    .map(|&idx| self.graph[idx].id.clone())
646                    .collect()
647            })
648            .unwrap_or_default()
649    }
650
651    /// Get execution waves (groups of nodes that can run concurrently)
652    ///
653    /// # Example
654    ///
655    /// ```
656    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
657    /// use serde_json::json;
658    ///
659    /// let mut pipeline = Pipeline::new("test");
660    /// pipeline.add_node(Node::new("a", "http", json!({})));
661    /// pipeline.add_node(Node::new("b", "http", json!({})));
662    /// pipeline.add_node(Node::new("c", "http", json!({})));
663    /// pipeline.add_edge(Edge::new("a", "c"));
664    /// pipeline.add_edge(Edge::new("b", "c"));
665    ///
666    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
667    /// let waves = executor.execution_waves();
668    /// // Wave 0 contains "a" and "b" (can run concurrently)
669    /// // Wave 1 contains "c" (depends on both)
670    /// assert_eq!(waves.len(), 2);
671    /// ```
672    #[must_use]
673    pub fn execution_waves(&self) -> Vec<super::introspection::ExecutionWave> {
674        let Ok(topo) = toposort(&self.graph, None) else {
675            return vec![];
676        };
677
678        let waves = self.build_execution_waves(&topo);
679
680        waves
681            .into_iter()
682            .enumerate()
683            .filter(|(_, nodes)| !nodes.is_empty())
684            .map(|(level, nodes)| super::introspection::ExecutionWave {
685                level,
686                node_ids: nodes
687                    .iter()
688                    .map(|&idx| self.graph[idx].id.clone())
689                    .collect(),
690            })
691            .collect()
692    }
693
694    /// Get information about a specific node
695    ///
696    /// # Example
697    ///
698    /// ```
699    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
700    /// use serde_json::json;
701    ///
702    /// let mut pipeline = Pipeline::new("test");
703    /// pipeline.add_node(Node::new("fetch", "http", json!({"url": "https://example.com"})));
704    /// pipeline.add_node(Node::new("extract", "ai", json!({})));
705    /// pipeline.add_edge(Edge::new("fetch", "extract"));
706    ///
707    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
708    /// let info = executor.node_info("fetch").unwrap();
709    /// assert_eq!(info.service, "http");
710    /// assert_eq!(info.in_degree, 0);
711    /// assert_eq!(info.out_degree, 1);
712    /// ```
713    #[must_use]
714    pub fn node_info(&self, id: &str) -> Option<super::introspection::NodeInfo> {
715        let depths = self.compute_depths();
716
717        self.graph
718            .node_indices()
719            .find(|&idx| self.graph[idx].id == id)
720            .map(|idx| {
721                let node = &self.graph[idx];
722                let predecessors: Vec<String> = self
723                    .graph
724                    .neighbors_directed(idx, petgraph::Direction::Incoming)
725                    .map(|pred| self.graph[pred].id.clone())
726                    .collect();
727                let successors: Vec<String> = self
728                    .graph
729                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
730                    .map(|succ| self.graph[succ].id.clone())
731                    .collect();
732
733                super::introspection::NodeInfo {
734                    id: node.id.clone(),
735                    service: node.service.clone(),
736                    depth: depths.get(&idx).copied().unwrap_or(0),
737                    in_degree: predecessors.len(),
738                    out_degree: successors.len(),
739                    predecessors,
740                    successors,
741                    config: node.config.clone(),
742                    metadata: node.metadata.clone(),
743                }
744            })
745    }
746
747    /// Get all nodes matching a query
748    ///
749    /// # Example
750    ///
751    /// ```
752    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
753    /// use stygian_graph::domain::introspection::NodeQuery;
754    /// use serde_json::json;
755    ///
756    /// let mut pipeline = Pipeline::new("test");
757    /// pipeline.add_node(Node::new("fetch1", "http", json!({})));
758    /// pipeline.add_node(Node::new("fetch2", "http", json!({})));
759    /// pipeline.add_node(Node::new("extract", "ai", json!({})));
760    ///
761    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
762    /// let http_nodes = executor.query_nodes(&NodeQuery::by_service("http"));
763    /// assert_eq!(http_nodes.len(), 2);
764    /// ```
765    #[must_use]
766    pub fn query_nodes(
767        &self,
768        query: &super::introspection::NodeQuery,
769    ) -> Vec<super::introspection::NodeInfo> {
770        self.graph
771            .node_indices()
772            .filter_map(|idx| self.node_info(&self.graph[idx].id))
773            .filter(|info| query.matches(info))
774            .collect()
775    }
776
777    /// Compute the depth of each node from root nodes
778    fn compute_depths(&self) -> HashMap<NodeIndex, usize> {
779        let mut depths = HashMap::new();
780
781        let Ok(topo) = toposort(&self.graph, None) else {
782            return depths;
783        };
784
785        for &idx in &topo {
786            let max_pred_depth = self
787                .graph
788                .neighbors_directed(idx, petgraph::Direction::Incoming)
789                .filter_map(|pred| depths.get(&pred))
790                .max()
791                .copied()
792                .map_or(0, |d| d + 1);
793            depths.insert(idx, max_pred_depth);
794        }
795
796        depths
797    }
798
799    /// Get connectivity metrics for the graph
800    ///
801    /// # Example
802    ///
803    /// ```
804    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
805    /// use serde_json::json;
806    ///
807    /// let mut pipeline = Pipeline::new("test");
808    /// pipeline.add_node(Node::new("a", "http", json!({})));
809    /// pipeline.add_node(Node::new("b", "http", json!({})));
810    /// pipeline.add_edge(Edge::new("a", "b"));
811    ///
812    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
813    /// let metrics = executor.connectivity();
814    /// assert_eq!(metrics.root_nodes, vec!["a".to_string()]);
815    /// assert_eq!(metrics.leaf_nodes, vec!["b".to_string()]);
816    /// ```
817    #[must_use]
818    pub fn connectivity(&self) -> super::introspection::ConnectivityMetrics {
819        let depths = self.compute_depths();
820
821        let root_nodes: Vec<String> = self
822            .graph
823            .node_indices()
824            .filter(|&idx| {
825                self.graph
826                    .neighbors_directed(idx, petgraph::Direction::Incoming)
827                    .next()
828                    .is_none()
829            })
830            .map(|idx| self.graph[idx].id.clone())
831            .collect();
832
833        let leaf_nodes: Vec<String> = self
834            .graph
835            .node_indices()
836            .filter(|&idx| {
837                self.graph
838                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
839                    .next()
840                    .is_none()
841            })
842            .map(|idx| self.graph[idx].id.clone())
843            .collect();
844
845        let max_depth = depths.values().copied().max().unwrap_or(0);
846
847        let total_degree: usize = self
848            .graph
849            .node_indices()
850            .map(|idx| {
851                let in_deg = self
852                    .graph
853                    .neighbors_directed(idx, petgraph::Direction::Incoming)
854                    .count();
855                let out_deg = self
856                    .graph
857                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
858                    .count();
859                in_deg + out_deg
860            })
861            .sum();
862
863        let node_count = self.graph.node_count();
864        let avg_degree = if node_count > 0 {
865            match (u32::try_from(total_degree), u32::try_from(node_count)) {
866                (Ok(total), Ok(count)) if count > 0 => f64::from(total) / f64::from(count),
867                _ => f64::from(u32::MAX),
868            }
869        } else {
870            0.0
871        };
872
873        // Count weakly connected components
874        let component_count = petgraph::algo::connected_components(&self.graph);
875
876        super::introspection::ConnectivityMetrics {
877            is_connected: component_count <= 1,
878            component_count,
879            root_nodes,
880            leaf_nodes,
881            max_depth,
882            avg_degree,
883        }
884    }
885
886    /// Get the critical path (longest path through the graph)
887    ///
888    /// # Example
889    ///
890    /// ```
891    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
892    /// use serde_json::json;
893    ///
894    /// let mut pipeline = Pipeline::new("test");
895    /// pipeline.add_node(Node::new("a", "http", json!({})));
896    /// pipeline.add_node(Node::new("b", "http", json!({})));
897    /// pipeline.add_node(Node::new("c", "http", json!({})));
898    /// pipeline.add_edge(Edge::new("a", "b"));
899    /// pipeline.add_edge(Edge::new("b", "c"));
900    ///
901    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
902    /// let critical = executor.critical_path();
903    /// assert_eq!(critical.length, 3);
904    /// assert_eq!(critical.nodes, vec!["a", "b", "c"]);
905    /// ```
906    #[must_use]
907    pub fn critical_path(&self) -> super::introspection::CriticalPath {
908        let depths = self.compute_depths();
909
910        // Find the deepest node
911        let deepest = depths.iter().max_by_key(|&(_, d)| d);
912
913        if let Some((&end_idx, _)) = deepest {
914            // Trace back from deepest to a root
915            let mut path = vec![self.graph[end_idx].id.clone()];
916            let mut current = end_idx;
917
918            while let Some(pred) = self
919                .graph
920                .neighbors_directed(current, petgraph::Direction::Incoming)
921                .max_by_key(|&p| depths.get(&p).copied().unwrap_or(0))
922            {
923                path.push(self.graph[pred].id.clone());
924                current = pred;
925            }
926
927            path.reverse();
928
929            super::introspection::CriticalPath {
930                length: path.len(),
931                nodes: path,
932            }
933        } else {
934            super::introspection::CriticalPath {
935                length: 0,
936                nodes: vec![],
937            }
938        }
939    }
940
941    /// Analyze the impact of changing a node
942    ///
943    /// Returns all nodes that would be affected (upstream dependencies
944    /// and downstream dependents, transitively).
945    ///
946    /// # Example
947    ///
948    /// ```
949    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
950    /// use serde_json::json;
951    ///
952    /// let mut pipeline = Pipeline::new("test");
953    /// pipeline.add_node(Node::new("a", "http", json!({})));
954    /// pipeline.add_node(Node::new("b", "http", json!({})));
955    /// pipeline.add_node(Node::new("c", "http", json!({})));
956    /// pipeline.add_edge(Edge::new("a", "b"));
957    /// pipeline.add_edge(Edge::new("b", "c"));
958    ///
959    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
960    /// let impact = executor.impact_analysis("b");
961    /// assert_eq!(impact.upstream, vec!["a".to_string()]);
962    /// assert_eq!(impact.downstream, vec!["c".to_string()]);
963    /// ```
964    #[must_use]
965    pub fn impact_analysis(&self, id: &str) -> super::introspection::ImpactAnalysis {
966        let node_idx = self
967            .graph
968            .node_indices()
969            .find(|&idx| self.graph[idx].id == id);
970
971        let Some(start_idx) = node_idx else {
972            return super::introspection::ImpactAnalysis {
973                node_id: id.to_string(),
974                upstream: vec![],
975                downstream: vec![],
976                total_affected: 0,
977            };
978        };
979
980        // BFS upstream
981        let mut upstream = Vec::new();
982        let mut visited = std::collections::HashSet::new();
983        let mut queue = std::collections::VecDeque::new();
984
985        for pred in self
986            .graph
987            .neighbors_directed(start_idx, petgraph::Direction::Incoming)
988        {
989            queue.push_back(pred);
990        }
991
992        while let Some(idx) = queue.pop_front() {
993            if visited.insert(idx) {
994                upstream.push(self.graph[idx].id.clone());
995                for pred in self
996                    .graph
997                    .neighbors_directed(idx, petgraph::Direction::Incoming)
998                {
999                    queue.push_back(pred);
1000                }
1001            }
1002        }
1003
1004        // BFS downstream
1005        let mut downstream = Vec::new();
1006        visited.clear();
1007        queue.clear();
1008
1009        for succ in self
1010            .graph
1011            .neighbors_directed(start_idx, petgraph::Direction::Outgoing)
1012        {
1013            queue.push_back(succ);
1014        }
1015
1016        while let Some(idx) = queue.pop_front() {
1017            if visited.insert(idx) {
1018                downstream.push(self.graph[idx].id.clone());
1019                for succ in self
1020                    .graph
1021                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
1022                {
1023                    queue.push_back(succ);
1024                }
1025            }
1026        }
1027
1028        let total_affected = upstream.len() + downstream.len();
1029
1030        super::introspection::ImpactAnalysis {
1031            node_id: id.to_string(),
1032            upstream,
1033            downstream,
1034            total_affected,
1035        }
1036    }
1037
1038    /// Get a complete snapshot of the graph for introspection
1039    ///
1040    /// # Example
1041    ///
1042    /// ```
1043    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
1044    /// use serde_json::json;
1045    ///
1046    /// let mut pipeline = Pipeline::new("test");
1047    /// pipeline.add_node(Node::new("fetch", "http", json!({})));
1048    /// pipeline.add_node(Node::new("extract", "ai", json!({})));
1049    /// pipeline.add_edge(Edge::new("fetch", "extract"));
1050    ///
1051    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1052    /// let snapshot = executor.snapshot();
1053    ///
1054    /// assert_eq!(snapshot.node_count, 2);
1055    /// assert_eq!(snapshot.edge_count, 1);
1056    /// ```
1057    #[must_use]
1058    pub fn snapshot(&self) -> super::introspection::GraphSnapshot {
1059        let nodes: Vec<super::introspection::NodeInfo> = self
1060            .graph
1061            .node_indices()
1062            .filter_map(|idx| self.node_info(&self.graph[idx].id))
1063            .collect();
1064
1065        let edges: Vec<super::introspection::EdgeInfo> = self
1066            .graph
1067            .edge_references()
1068            .map(|edge| {
1069                let from = &self.graph[edge.source()].id;
1070                let to = &self.graph[edge.target()].id;
1071                super::introspection::EdgeInfo {
1072                    from: from.clone(),
1073                    to: to.clone(),
1074                    config: serde_json::Value::Null,
1075                }
1076            })
1077            .collect();
1078
1079        let mut service_distribution = HashMap::new();
1080        for node in &nodes {
1081            *service_distribution
1082                .entry(node.service.clone())
1083                .or_insert(0) += 1;
1084        }
1085
1086        super::introspection::GraphSnapshot {
1087            node_count: self.node_count(),
1088            edge_count: self.edge_count(),
1089            nodes,
1090            edges,
1091            waves: self.execution_waves(),
1092            topological_order: self.topological_order(),
1093            critical_path: self.critical_path(),
1094            connectivity: self.connectivity(),
1095            service_distribution,
1096        }
1097    }
1098}
1099
1100impl Default for DagExecutor {
1101    fn default() -> Self {
1102        Self::new()
1103    }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::*;
1109    use crate::domain::error::Result;
1110
1111    #[test]
1112    fn test_node_creation() {
1113        let node = Node::new(
1114            "test",
1115            "http",
1116            serde_json::json!({"url": "https://example.com"}),
1117        );
1118        assert_eq!(node.id, "test");
1119        assert_eq!(node.service, "http");
1120    }
1121
1122    #[test]
1123    fn test_edge_creation() {
1124        let edge = Edge::new("a", "b");
1125        assert_eq!(edge.from, "a");
1126        assert_eq!(edge.to, "b");
1127    }
1128
1129    #[test]
1130    fn test_pipeline_validation() {
1131        let mut pipeline = Pipeline::new("test");
1132        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1133        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1134        pipeline.add_edge(Edge::new("fetch", "extract"));
1135
1136        assert!(pipeline.validate().is_ok());
1137    }
1138
1139    #[test]
1140    fn test_cycle_detection() {
1141        let mut pipeline = Pipeline::new("cyclic");
1142        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1143        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1144        pipeline.add_edge(Edge::new("a", "b"));
1145        pipeline.add_edge(Edge::new("b", "a")); // Creates a cycle
1146
1147        let result = DagExecutor::from_pipeline(&pipeline);
1148        assert!(matches!(
1149            result,
1150            Err(StygianError::Graph(GraphError::CycleDetected))
1151        ));
1152    }
1153
1154    /// Diamond graph: A → B, A → C, B+C → D
1155    /// B and C run concurrently in the same wave; D waits for both.
1156    #[tokio::test]
1157    async fn test_diamond_concurrent_execution() -> Result<()> {
1158        use crate::adapters::noop::NoopService;
1159
1160        // Build diamond pipeline
1161        let mut pipeline = Pipeline::new("diamond");
1162        pipeline.add_node(Node::new("A", "noop", serde_json::json!({"url": ""})));
1163        pipeline.add_node(Node::new("B", "noop", serde_json::json!({"url": ""})));
1164        pipeline.add_node(Node::new("C", "noop", serde_json::json!({"url": ""})));
1165        pipeline.add_node(Node::new("D", "noop", serde_json::json!({"url": ""})));
1166        pipeline.add_edge(Edge::new("A", "B"));
1167        pipeline.add_edge(Edge::new("A", "C"));
1168        pipeline.add_edge(Edge::new("B", "D"));
1169        pipeline.add_edge(Edge::new("C", "D"));
1170
1171        let executor = DagExecutor::from_pipeline(&pipeline)?;
1172
1173        let mut services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
1174            HashMap::new();
1175        services.insert("noop".to_string(), std::sync::Arc::new(NoopService));
1176
1177        let results = executor.execute(&services).await?;
1178
1179        // All 4 nodes should produce a result
1180        assert_eq!(results.len(), 4);
1181        let ids: Vec<&str> = results.iter().map(|r| r.node_id.as_str()).collect();
1182        assert!(ids.contains(&"A"));
1183        assert!(ids.contains(&"B"));
1184        assert!(ids.contains(&"C"));
1185        assert!(ids.contains(&"D"));
1186        Ok(())
1187    }
1188
1189    #[tokio::test]
1190    async fn test_missing_service_returns_error() -> Result<()> {
1191        let mut pipeline = Pipeline::new("test");
1192        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1193
1194        let executor = DagExecutor::from_pipeline(&pipeline)?;
1195        let services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
1196            HashMap::new();
1197
1198        let result = executor.execute(&services).await;
1199        assert!(result.is_err());
1200        Ok(())
1201    }
1202
1203    // ═══════════════════════════════════════════════════════════════════════════
1204    // Introspection Tests
1205    // ═══════════════════════════════════════════════════════════════════════════
1206
1207    #[test]
1208    fn test_introspection_node_count() -> std::result::Result<(), Box<dyn std::error::Error>> {
1209        let mut pipeline = Pipeline::new("test");
1210        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1211        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1212        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1213
1214        let executor = DagExecutor::from_pipeline(&pipeline)?;
1215        assert_eq!(executor.node_count(), 3);
1216        assert_eq!(executor.edge_count(), 0);
1217        Ok(())
1218    }
1219
1220    #[test]
1221    fn test_introspection_node_ids() -> std::result::Result<(), Box<dyn std::error::Error>> {
1222        let mut pipeline = Pipeline::new("test");
1223        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1224        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1225
1226        let executor = DagExecutor::from_pipeline(&pipeline)?;
1227        let ids = executor.node_ids();
1228        assert!(ids.contains(&"fetch".to_string()));
1229        assert!(ids.contains(&"extract".to_string()));
1230        Ok(())
1231    }
1232
1233    #[test]
1234    fn test_introspection_get_node() -> std::result::Result<(), Box<dyn std::error::Error>> {
1235        let mut pipeline = Pipeline::new("test");
1236        pipeline.add_node(Node::new(
1237            "fetch",
1238            "http",
1239            serde_json::json!({"url": "https://example.com"}),
1240        ));
1241
1242        let executor = DagExecutor::from_pipeline(&pipeline)?;
1243
1244        let node = executor.get_node("fetch");
1245        assert!(node.is_some());
1246        let node = node.ok_or_else(|| std::io::Error::other("expected fetch node to exist"))?;
1247        assert_eq!(node.service, "http");
1248
1249        assert!(executor.get_node("nonexistent").is_none());
1250        Ok(())
1251    }
1252
1253    #[test]
1254    fn test_introspection_predecessors_successors()
1255    -> std::result::Result<(), Box<dyn std::error::Error>> {
1256        let mut pipeline = Pipeline::new("test");
1257        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1258        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1259        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1260        pipeline.add_edge(Edge::new("a", "b"));
1261        pipeline.add_edge(Edge::new("b", "c"));
1262
1263        let executor = DagExecutor::from_pipeline(&pipeline)?;
1264
1265        assert_eq!(executor.predecessors("a"), Vec::<String>::new());
1266        assert_eq!(executor.predecessors("b"), vec!["a".to_string()]);
1267        assert_eq!(executor.predecessors("c"), vec!["b".to_string()]);
1268
1269        assert_eq!(executor.successors("a"), vec!["b".to_string()]);
1270        assert_eq!(executor.successors("b"), vec!["c".to_string()]);
1271        assert_eq!(executor.successors("c"), Vec::<String>::new());
1272        Ok(())
1273    }
1274
1275    #[test]
1276    fn test_introspection_topological_order() -> std::result::Result<(), Box<dyn std::error::Error>>
1277    {
1278        let mut pipeline = Pipeline::new("test");
1279        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1280        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1281        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1282        pipeline.add_edge(Edge::new("a", "b"));
1283        pipeline.add_edge(Edge::new("b", "c"));
1284
1285        let executor = DagExecutor::from_pipeline(&pipeline)?;
1286        let order = executor.topological_order();
1287
1288        let a_pos = order
1289            .iter()
1290            .position(|x| x == "a")
1291            .ok_or_else(|| std::io::Error::other("expected node a in order"))?;
1292        let b_pos = order
1293            .iter()
1294            .position(|x| x == "b")
1295            .ok_or_else(|| std::io::Error::other("expected node b in order"))?;
1296        let c_pos = order
1297            .iter()
1298            .position(|x| x == "c")
1299            .ok_or_else(|| std::io::Error::other("expected node c in order"))?;
1300
1301        assert!(a_pos < b_pos);
1302        assert!(b_pos < c_pos);
1303        Ok(())
1304    }
1305
1306    #[test]
1307    fn test_introspection_execution_waves_diamond()
1308    -> std::result::Result<(), Box<dyn std::error::Error>> {
1309        // Diamond: A → (B, C) → D
1310        let mut pipeline = Pipeline::new("diamond");
1311        pipeline.add_node(Node::new("A", "http", serde_json::json!({})));
1312        pipeline.add_node(Node::new("B", "http", serde_json::json!({})));
1313        pipeline.add_node(Node::new("C", "http", serde_json::json!({})));
1314        pipeline.add_node(Node::new("D", "http", serde_json::json!({})));
1315        pipeline.add_edge(Edge::new("A", "B"));
1316        pipeline.add_edge(Edge::new("A", "C"));
1317        pipeline.add_edge(Edge::new("B", "D"));
1318        pipeline.add_edge(Edge::new("C", "D"));
1319
1320        let executor = DagExecutor::from_pipeline(&pipeline)?;
1321        let waves = executor.execution_waves();
1322
1323        // Should have 3 waves: [A], [B, C], [D]
1324        assert_eq!(waves.len(), 3);
1325        let first_wave = waves
1326            .first()
1327            .ok_or_else(|| std::io::Error::other("missing wave 0"))?;
1328        let second_wave = waves
1329            .get(1)
1330            .ok_or_else(|| std::io::Error::other("missing wave 1"))?;
1331        let third_wave = waves
1332            .get(2)
1333            .ok_or_else(|| std::io::Error::other("missing wave 2"))?;
1334
1335        assert_eq!(first_wave.level, 0);
1336        assert!(first_wave.node_ids.contains(&"A".to_string()));
1337        assert_eq!(second_wave.level, 1);
1338        assert!(second_wave.node_ids.contains(&"B".to_string()));
1339        assert!(second_wave.node_ids.contains(&"C".to_string()));
1340        assert_eq!(third_wave.level, 2);
1341        assert!(third_wave.node_ids.contains(&"D".to_string()));
1342        Ok(())
1343    }
1344
1345    #[test]
1346    fn test_introspection_node_info() -> std::result::Result<(), Box<dyn std::error::Error>> {
1347        let mut pipeline = Pipeline::new("test");
1348        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1349        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1350        pipeline.add_edge(Edge::new("fetch", "extract"));
1351
1352        let executor = DagExecutor::from_pipeline(&pipeline)?;
1353
1354        let info = executor
1355            .node_info("fetch")
1356            .ok_or_else(|| std::io::Error::other("expected fetch node info"))?;
1357        assert_eq!(info.id, "fetch");
1358        assert_eq!(info.service, "http");
1359        assert_eq!(info.depth, 0);
1360        assert_eq!(info.in_degree, 0);
1361        assert_eq!(info.out_degree, 1);
1362        assert!(info.successors.contains(&"extract".to_string()));
1363
1364        let info = executor
1365            .node_info("extract")
1366            .ok_or_else(|| std::io::Error::other("expected extract node info"))?;
1367        assert_eq!(info.depth, 1);
1368        assert_eq!(info.in_degree, 1);
1369        assert_eq!(info.out_degree, 0);
1370        Ok(())
1371    }
1372
1373    #[test]
1374    fn test_introspection_connectivity() -> std::result::Result<(), Box<dyn std::error::Error>> {
1375        let mut pipeline = Pipeline::new("test");
1376        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1377        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1378        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1379        pipeline.add_edge(Edge::new("a", "b"));
1380        pipeline.add_edge(Edge::new("b", "c"));
1381
1382        let executor = DagExecutor::from_pipeline(&pipeline)?;
1383        let metrics = executor.connectivity();
1384
1385        assert!(metrics.is_connected);
1386        assert_eq!(metrics.component_count, 1);
1387        assert_eq!(metrics.root_nodes, vec!["a".to_string()]);
1388        assert_eq!(metrics.leaf_nodes, vec!["c".to_string()]);
1389        assert_eq!(metrics.max_depth, 2);
1390        Ok(())
1391    }
1392
1393    #[test]
1394    fn test_introspection_critical_path() -> std::result::Result<(), Box<dyn std::error::Error>> {
1395        let mut pipeline = Pipeline::new("test");
1396        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1397        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1398        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1399        pipeline.add_edge(Edge::new("a", "b"));
1400        pipeline.add_edge(Edge::new("b", "c"));
1401
1402        let executor = DagExecutor::from_pipeline(&pipeline)?;
1403        let critical = executor.critical_path();
1404
1405        assert_eq!(critical.length, 3);
1406        assert_eq!(critical.nodes, vec!["a", "b", "c"]);
1407        Ok(())
1408    }
1409
1410    #[test]
1411    fn test_introspection_impact_analysis() -> std::result::Result<(), Box<dyn std::error::Error>> {
1412        let mut pipeline = Pipeline::new("test");
1413        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1414        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1415        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1416        pipeline.add_edge(Edge::new("a", "b"));
1417        pipeline.add_edge(Edge::new("b", "c"));
1418
1419        let executor = DagExecutor::from_pipeline(&pipeline)?;
1420
1421        let impact = executor.impact_analysis("b");
1422        assert_eq!(impact.node_id, "b");
1423        assert_eq!(impact.upstream, vec!["a".to_string()]);
1424        assert_eq!(impact.downstream, vec!["c".to_string()]);
1425        assert_eq!(impact.total_affected, 2);
1426
1427        // Root node has no upstream
1428        let impact = executor.impact_analysis("a");
1429        assert!(impact.upstream.is_empty());
1430        assert_eq!(impact.downstream.len(), 2);
1431
1432        // Leaf node has no downstream
1433        let impact = executor.impact_analysis("c");
1434        assert_eq!(impact.upstream.len(), 2);
1435        assert!(impact.downstream.is_empty());
1436        Ok(())
1437    }
1438
1439    #[test]
1440    fn test_introspection_snapshot() -> std::result::Result<(), Box<dyn std::error::Error>> {
1441        let mut pipeline = Pipeline::new("test");
1442        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1443        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1444        pipeline.add_edge(Edge::new("fetch", "extract"));
1445
1446        let executor = DagExecutor::from_pipeline(&pipeline)?;
1447        let snapshot = executor.snapshot();
1448
1449        assert_eq!(snapshot.node_count, 2);
1450        assert_eq!(snapshot.edge_count, 1);
1451        assert_eq!(snapshot.nodes.len(), 2);
1452        assert_eq!(snapshot.edges.len(), 1);
1453        assert_eq!(snapshot.waves.len(), 2);
1454        assert_eq!(snapshot.topological_order.len(), 2);
1455        assert_eq!(snapshot.critical_path.length, 2);
1456        assert_eq!(snapshot.service_distribution.get("http"), Some(&1));
1457        assert_eq!(snapshot.service_distribution.get("ai"), Some(&1));
1458        Ok(())
1459    }
1460
1461    #[test]
1462    fn test_introspection_query_nodes() -> std::result::Result<(), Box<dyn std::error::Error>> {
1463        use super::super::introspection::NodeQuery;
1464
1465        let mut pipeline = Pipeline::new("test");
1466        pipeline.add_node(Node::new("fetch1", "http", serde_json::json!({})));
1467        pipeline.add_node(Node::new("fetch2", "http", serde_json::json!({})));
1468        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1469        pipeline.add_edge(Edge::new("fetch1", "extract"));
1470        pipeline.add_edge(Edge::new("fetch2", "extract"));
1471
1472        let executor = DagExecutor::from_pipeline(&pipeline)?;
1473
1474        // Query by service
1475        let http_nodes = executor.query_nodes(&NodeQuery::by_service("http"));
1476        assert_eq!(http_nodes.len(), 2);
1477
1478        let ai_nodes = executor.query_nodes(&NodeQuery::by_service("ai"));
1479        assert_eq!(ai_nodes.len(), 1);
1480
1481        // Query roots
1482        let roots = executor.query_nodes(&NodeQuery::roots());
1483        assert_eq!(roots.len(), 2);
1484
1485        // Query leaves
1486        let leaves = executor.query_nodes(&NodeQuery::leaves());
1487        assert_eq!(leaves.len(), 1);
1488        let leaf = leaves
1489            .first()
1490            .ok_or_else(|| std::io::Error::other("expected one leaf node"))?;
1491        assert_eq!(leaf.id, "extract");
1492        Ok(())
1493    }
1494}