Skip to main content

stygian_graph/domain/
graph.rs

1//! DAG execution engine
2//!
3//! Implements graph-based pipeline execution using petgraph.
4//! Defines core domain entities: Node, Edge, and Pipeline.
5//!
6//! # Example
7//!
8//! ```
9//! use stygian_graph::domain::graph::{Node, Edge, DagExecutor};
10//!
11//! let node = Node::new("fetch", "http", serde_json::json!({"url": "https://example.com"}));
12//! ```
13
14use std::collections::HashMap;
15use std::sync::Arc;
16
17use petgraph::algo::toposort;
18use petgraph::graph::{DiGraph, NodeIndex};
19use petgraph::visit::EdgeRef;
20use serde::{Deserialize, Serialize};
21use tokio::sync::Mutex;
22
23use super::error::{GraphError, StygianError};
24use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
25
26/// A node in the scraping pipeline graph
27///
28/// Represents a single operation (HTTP fetch, AI extraction, transformation, etc.)
29///
30/// # Example
31///
32/// ```
33/// use stygian_graph::domain::graph::Node;
34/// use serde_json::json;
35///
36/// let node = Node::new(
37///     "fetch_homepage",
38///     "http",
39///     json!({"url": "https://example.com", "method": "GET"})
40/// );
41/// ```
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct Node {
44    /// Unique identifier for this node
45    pub id: String,
46
47    /// Service type (e.g., `"http"`, `"ai_extract"`, `"browser"`)
48    pub service: String,
49
50    /// Node-specific configuration
51    pub config: serde_json::Value,
52
53    /// Optional node metadata
54    #[serde(default)]
55    pub metadata: serde_json::Value,
56}
57
58impl Node {
59    /// Create a new node
60    ///
61    /// # Example
62    ///
63    /// ```
64    /// use stygian_graph::domain::graph::Node;
65    /// use serde_json::json;
66    ///
67    /// let node = Node::new("fetch", "http", json!({"url": "https://example.com"}));
68    /// assert_eq!(node.id, "fetch");
69    /// assert_eq!(node.service, "http");
70    /// ```
71    pub fn new(
72        id: impl Into<String>,
73        service: impl Into<String>,
74        config: serde_json::Value,
75    ) -> Self {
76        Self {
77            id: id.into(),
78            service: service.into(),
79            config,
80            metadata: serde_json::Value::Null,
81        }
82    }
83
84    /// Create a new node with metadata
85    pub fn with_metadata(
86        id: impl Into<String>,
87        service: impl Into<String>,
88        config: serde_json::Value,
89        metadata: serde_json::Value,
90    ) -> Self {
91        Self {
92            id: id.into(),
93            service: service.into(),
94            config,
95            metadata,
96        }
97    }
98
99    /// Validate the node configuration
100    ///
101    /// # Errors
102    ///
103    /// Returns `GraphError::InvalidEdge` if the node has an empty ID or service type.
104    pub fn validate(&self) -> Result<(), StygianError> {
105        if self.id.is_empty() {
106            return Err(GraphError::InvalidEdge("Node ID cannot be empty".into()).into());
107        }
108        if self.service.is_empty() {
109            return Err(GraphError::InvalidEdge("Node service type cannot be empty".into()).into());
110        }
111        Ok(())
112    }
113}
114
115/// An edge connecting two nodes in the pipeline graph
116///
117/// Represents data flow or dependencies between operations.
118///
119/// # Example
120///
121/// ```
122/// use stygian_graph::domain::graph::Edge;
123/// use serde_json::json;
124///
125/// let edge = Edge::new("fetch_homepage", "extract_data");
126/// ```
127#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct Edge {
129    /// Source node ID
130    pub from: String,
131
132    /// Target node ID
133    pub to: String,
134
135    /// Optional edge configuration (data transformations, filters, etc.)
136    #[serde(default)]
137    pub config: serde_json::Value,
138}
139
140impl Edge {
141    /// Create a new edge between two nodes
142    ///
143    /// # Example
144    ///
145    /// ```
146    /// use stygian_graph::domain::graph::Edge;
147    ///
148    /// let edge = Edge::new("fetch", "extract");
149    /// assert_eq!(edge.from, "fetch");
150    /// assert_eq!(edge.to, "extract");
151    /// ```
152    pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
153        Self {
154            from: from.into(),
155            to: to.into(),
156            config: serde_json::Value::Null,
157        }
158    }
159
160    /// Create an edge with additional configuration
161    pub fn with_config(
162        from: impl Into<String>,
163        to: impl Into<String>,
164        config: serde_json::Value,
165    ) -> Self {
166        Self {
167            from: from.into(),
168            to: to.into(),
169            config,
170        }
171    }
172
173    /// Validate the edge
174    ///
175    /// # Errors
176    ///
177    /// Returns `GraphError::InvalidEdge` if the edge has empty endpoints.
178    pub fn validate(&self) -> Result<(), StygianError> {
179        if self.from.is_empty() || self.to.is_empty() {
180            return Err(GraphError::InvalidEdge("Edge endpoints cannot be empty".into()).into());
181        }
182        Ok(())
183    }
184}
185
186/// A complete pipeline definition
187///
188/// Contains the full graph structure (nodes and edges) plus metadata.
189///
190/// # Example
191///
192/// ```
193/// use stygian_graph::domain::graph::{Pipeline, Node, Edge};
194/// use serde_json::json;
195///
196/// let mut pipeline = Pipeline::new("example_pipeline");
197/// pipeline.add_node(Node::new("fetch", "http", json!({"url": "https://example.com"})));
198/// ```
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct Pipeline {
201    /// Pipeline name/identifier
202    pub name: String,
203
204    /// Nodes in the pipeline
205    pub nodes: Vec<Node>,
206
207    /// Edges connecting nodes
208    pub edges: Vec<Edge>,
209
210    /// Pipeline-level configuration and metadata
211    #[serde(default)]
212    pub metadata: serde_json::Value,
213}
214
215impl Pipeline {
216    /// Create a new empty pipeline
217    ///
218    /// # Example
219    ///
220    /// ```
221    /// use stygian_graph::domain::graph::Pipeline;
222    ///
223    /// let pipeline = Pipeline::new("my_scraper");
224    /// assert_eq!(pipeline.name, "my_scraper");
225    /// assert!(pipeline.nodes.is_empty());
226    /// ```
227    pub fn new(name: impl Into<String>) -> Self {
228        Self {
229            name: name.into(),
230            nodes: Vec::new(),
231            edges: Vec::new(),
232            metadata: serde_json::Value::Null,
233        }
234    }
235
236    /// Add a node to the pipeline
237    pub fn add_node(&mut self, node: Node) {
238        self.nodes.push(node);
239    }
240
241    /// Add an edge to the pipeline
242    pub fn add_edge(&mut self, edge: Edge) {
243        self.edges.push(edge);
244    }
245
246    /// Validate the entire pipeline
247    ///
248    /// # Errors
249    ///
250    /// Returns an error if any node or edge is invalid.
251    pub fn validate(&self) -> Result<(), StygianError> {
252        for node in &self.nodes {
253            node.validate()?;
254        }
255        for edge in &self.edges {
256            edge.validate()?;
257        }
258        Ok(())
259    }
260}
261
262/// Result of executing a single node
263#[derive(Debug, Clone)]
264pub struct NodeResult {
265    /// The node ID that produced this result
266    pub node_id: String,
267    /// The output from the service execution
268    pub output: ServiceOutput,
269}
270
271/// DAG executor that processes pipeline graphs
272///
273/// Executes scraping pipelines as directed acyclic graphs using petgraph.
274/// Independent branches are executed concurrently using `tokio::spawn`.
275/// Data from upstream nodes is passed as input to downstream nodes.
276pub struct DagExecutor {
277    graph: DiGraph<Node, ()>,
278    _node_indices: HashMap<String, NodeIndex>,
279}
280
281impl DagExecutor {
282    /// Create a new DAG executor
283    ///
284    /// # Example
285    ///
286    /// ```
287    /// use stygian_graph::domain::graph::DagExecutor;
288    ///
289    /// let executor = DagExecutor::new();
290    /// ```
291    #[must_use]
292    pub fn new() -> Self {
293        Self {
294            graph: DiGraph::new(),
295            _node_indices: HashMap::new(),
296        }
297    }
298
299    /// Build a graph from a pipeline definition
300    ///
301    /// # Errors
302    ///
303    /// Returns `GraphError::CycleDetected` if the pipeline contains a cycle.
304    /// Returns `GraphError::NodeNotFound` if an edge references an unknown node.
305    pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, StygianError> {
306        pipeline.validate()?;
307
308        let mut graph = DiGraph::new();
309        let mut node_indices = HashMap::new();
310
311        // Add nodes
312        for node in &pipeline.nodes {
313            let idx = graph.add_node(node.clone());
314            node_indices.insert(node.id.clone(), idx);
315        }
316
317        // Add edges
318        for edge in &pipeline.edges {
319            let from_idx = node_indices
320                .get(&edge.from)
321                .ok_or_else(|| GraphError::NodeNotFound(edge.from.clone()))?;
322            let to_idx = node_indices
323                .get(&edge.to)
324                .ok_or_else(|| GraphError::NodeNotFound(edge.to.clone()))?;
325            graph.add_edge(*from_idx, *to_idx, ());
326        }
327
328        // Check for cycles
329        if petgraph::algo::is_cyclic_directed(&graph) {
330            return Err(GraphError::CycleDetected.into());
331        }
332
333        Ok(Self {
334            graph,
335            _node_indices: node_indices,
336        })
337    }
338
339    /// Execute the pipeline using the provided service registry.
340    ///
341    /// Nodes are executed in topological order. Independent nodes at the same
342    /// depth are spawned concurrently via `tokio::spawn`. The output of each
343    /// node is available to all downstream nodes as their `ServiceInput.params`.
344    ///
345    /// # Errors
346    ///
347    /// Returns `GraphError::ExecutionFailed` if any node execution fails.
348    pub async fn execute(
349        &self,
350        services: &HashMap<String, Arc<dyn ScrapingService>>,
351    ) -> Result<Vec<NodeResult>, StygianError> {
352        // Topological sort — safe because we checked for cycles in from_pipeline
353        let topo_order = toposort(&self.graph, None).map_err(|_| GraphError::CycleDetected)?;
354
355        // Group nodes into execution waves by their level (longest path from root)
356        let waves = self.build_execution_waves(&topo_order);
357
358        // Shared result store
359        let results: Arc<Mutex<HashMap<String, ServiceOutput>>> =
360            Arc::new(Mutex::new(HashMap::new()));
361
362        for wave in waves {
363            // Spawn all nodes in this wave concurrently
364            let mut handles = Vec::new();
365
366            for node_idx in wave {
367                let node = self.graph[node_idx].clone();
368                let service = services.get(&node.service).cloned().ok_or_else(|| {
369                    GraphError::InvalidPipeline(format!(
370                        "No service registered for type '{}'",
371                        node.service
372                    ))
373                })?;
374
375                // Collect upstream outputs as input params
376                let upstream_data = {
377                    let store = results.lock().await;
378                    let mut data = serde_json::Map::new();
379                    for pred_idx in self
380                        .graph
381                        .neighbors_directed(node_idx, petgraph::Direction::Incoming)
382                    {
383                        let pred_id = &self.graph[pred_idx].id;
384                        if let Some(out) = store.get(pred_id) {
385                            data.insert(
386                                pred_id.clone(),
387                                serde_json::Value::String(out.data.clone()),
388                            );
389                        }
390                    }
391                    serde_json::Value::Object(data)
392                };
393
394                let input = ServiceInput {
395                    url: node
396                        .config
397                        .get("url")
398                        .and_then(|v| v.as_str())
399                        .unwrap_or("")
400                        .to_string(),
401                    params: upstream_data,
402                };
403
404                let results_clone = Arc::clone(&results);
405                let node_id = node.id.clone();
406
407                handles.push(tokio::spawn(async move {
408                    let output = service.execute(input).await?;
409                    results_clone
410                        .lock()
411                        .await
412                        .insert(node_id.clone(), output.clone());
413                    Ok::<NodeResult, StygianError>(NodeResult { node_id, output })
414                }));
415            }
416
417            // Await all handles in this wave, propagating errors
418            for handle in handles {
419                handle
420                    .await
421                    .map_err(|e| GraphError::ExecutionFailed(format!("Task join error: {e}")))??;
422            }
423        }
424
425        // Collect final results in topological order
426        let store = results.lock().await;
427        let final_results = topo_order
428            .iter()
429            .filter_map(|idx| {
430                let node_id = &self.graph[*idx].id;
431                store.get(node_id).map(|output| NodeResult {
432                    node_id: node_id.clone(),
433                    output: output.clone(),
434                })
435            })
436            .collect();
437
438        Ok(final_results)
439    }
440
441    /// Build execution waves: groups of nodes that can run concurrently.
442    ///
443    /// Each wave contains nodes whose predecessors are all in earlier waves.
444    fn build_execution_waves(&self, topo_order: &[NodeIndex]) -> Vec<Vec<NodeIndex>> {
445        let mut level: HashMap<NodeIndex, usize> = HashMap::new();
446
447        for &idx in topo_order {
448            let max_pred_level = self
449                .graph
450                .neighbors_directed(idx, petgraph::Direction::Incoming)
451                .map(|pred| level.get(&pred).copied().unwrap_or(0) + 1)
452                .max()
453                .unwrap_or(0);
454            level.insert(idx, max_pred_level);
455        }
456
457        let max_level = level.values().copied().max().unwrap_or(0);
458        let mut waves: Vec<Vec<NodeIndex>> = vec![Vec::new(); max_level + 1];
459        for (idx, lvl) in level {
460            if let Some(wave) = waves.get_mut(lvl) {
461                wave.push(idx);
462            }
463        }
464        waves
465    }
466
467    // ═══════════════════════════════════════════════════════════════════════════
468    // Graph Introspection Methods
469    // ═══════════════════════════════════════════════════════════════════════════
470
471    /// Get the total number of nodes in the graph
472    ///
473    /// # Example
474    ///
475    /// ```
476    /// use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
477    /// use serde_json::json;
478    ///
479    /// let mut pipeline = Pipeline::new("test");
480    /// pipeline.add_node(Node::new("a", "http", json!({})));
481    /// pipeline.add_node(Node::new("b", "http", json!({})));
482    ///
483    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
484    /// assert_eq!(executor.node_count(), 2);
485    /// ```
486    #[must_use]
487    pub fn node_count(&self) -> usize {
488        self.graph.node_count()
489    }
490
491    /// Get the total number of edges in the graph
492    ///
493    /// # Example
494    ///
495    /// ```
496    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
497    /// use serde_json::json;
498    ///
499    /// let mut pipeline = Pipeline::new("test");
500    /// pipeline.add_node(Node::new("a", "http", json!({})));
501    /// pipeline.add_node(Node::new("b", "http", json!({})));
502    /// pipeline.add_edge(Edge::new("a", "b"));
503    ///
504    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
505    /// assert_eq!(executor.edge_count(), 1);
506    /// ```
507    #[must_use]
508    pub fn edge_count(&self) -> usize {
509        self.graph.edge_count()
510    }
511
512    /// Get all node IDs in the graph
513    ///
514    /// # Example
515    ///
516    /// ```
517    /// use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
518    /// use serde_json::json;
519    ///
520    /// let mut pipeline = Pipeline::new("test");
521    /// pipeline.add_node(Node::new("fetch", "http", json!({})));
522    ///
523    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
524    /// assert!(executor.node_ids().contains(&"fetch".to_string()));
525    /// ```
526    #[must_use]
527    pub fn node_ids(&self) -> Vec<String> {
528        self.graph
529            .node_indices()
530            .map(|idx| self.graph[idx].id.clone())
531            .collect()
532    }
533
534    /// Get a node by ID
535    ///
536    /// # Example
537    ///
538    /// ```
539    /// use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
540    /// use serde_json::json;
541    ///
542    /// let mut pipeline = Pipeline::new("test");
543    /// pipeline.add_node(Node::new("fetch", "http", json!({})));
544    ///
545    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
546    /// let node = executor.get_node("fetch");
547    /// assert!(node.is_some());
548    /// assert_eq!(node.unwrap().service, "http");
549    /// ```
550    #[must_use]
551    pub fn get_node(&self, id: &str) -> Option<&Node> {
552        self.graph
553            .node_indices()
554            .find(|&idx| self.graph[idx].id == id)
555            .map(|idx| &self.graph[idx])
556    }
557
558    /// Get the predecessors (upstream nodes) of a node
559    ///
560    /// # Example
561    ///
562    /// ```
563    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
564    /// use serde_json::json;
565    ///
566    /// let mut pipeline = Pipeline::new("test");
567    /// pipeline.add_node(Node::new("a", "http", json!({})));
568    /// pipeline.add_node(Node::new("b", "http", json!({})));
569    /// pipeline.add_edge(Edge::new("a", "b"));
570    ///
571    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
572    /// let preds = executor.predecessors("b");
573    /// assert_eq!(preds, vec!["a".to_string()]);
574    /// ```
575    #[must_use]
576    pub fn predecessors(&self, id: &str) -> Vec<String> {
577        self.graph
578            .node_indices()
579            .find(|&idx| self.graph[idx].id == id)
580            .map(|idx| {
581                self.graph
582                    .neighbors_directed(idx, petgraph::Direction::Incoming)
583                    .map(|pred| self.graph[pred].id.clone())
584                    .collect()
585            })
586            .unwrap_or_default()
587    }
588
589    /// Get the successors (downstream nodes) of a node
590    ///
591    /// # Example
592    ///
593    /// ```
594    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
595    /// use serde_json::json;
596    ///
597    /// let mut pipeline = Pipeline::new("test");
598    /// pipeline.add_node(Node::new("a", "http", json!({})));
599    /// pipeline.add_node(Node::new("b", "http", json!({})));
600    /// pipeline.add_edge(Edge::new("a", "b"));
601    ///
602    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
603    /// let succs = executor.successors("a");
604    /// assert_eq!(succs, vec!["b".to_string()]);
605    /// ```
606    #[must_use]
607    pub fn successors(&self, id: &str) -> Vec<String> {
608        self.graph
609            .node_indices()
610            .find(|&idx| self.graph[idx].id == id)
611            .map(|idx| {
612                self.graph
613                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
614                    .map(|succ| self.graph[succ].id.clone())
615                    .collect()
616            })
617            .unwrap_or_default()
618    }
619
620    /// Get the topological order of nodes
621    ///
622    /// # Example
623    ///
624    /// ```
625    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
626    /// use serde_json::json;
627    ///
628    /// let mut pipeline = Pipeline::new("test");
629    /// pipeline.add_node(Node::new("a", "http", json!({})));
630    /// pipeline.add_node(Node::new("b", "http", json!({})));
631    /// pipeline.add_edge(Edge::new("a", "b"));
632    ///
633    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
634    /// let order = executor.topological_order();
635    /// // "a" must appear before "b"
636    /// let a_pos = order.iter().position(|x| x == "a").unwrap();
637    /// let b_pos = order.iter().position(|x| x == "b").unwrap();
638    /// assert!(a_pos < b_pos);
639    /// ```
640    #[must_use]
641    pub fn topological_order(&self) -> Vec<String> {
642        toposort(&self.graph, None)
643            .map(|indices| {
644                indices
645                    .iter()
646                    .map(|&idx| self.graph[idx].id.clone())
647                    .collect()
648            })
649            .unwrap_or_default()
650    }
651
652    /// Get execution waves (groups of nodes that can run concurrently)
653    ///
654    /// # Example
655    ///
656    /// ```
657    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
658    /// use serde_json::json;
659    ///
660    /// let mut pipeline = Pipeline::new("test");
661    /// pipeline.add_node(Node::new("a", "http", json!({})));
662    /// pipeline.add_node(Node::new("b", "http", json!({})));
663    /// pipeline.add_node(Node::new("c", "http", json!({})));
664    /// pipeline.add_edge(Edge::new("a", "c"));
665    /// pipeline.add_edge(Edge::new("b", "c"));
666    ///
667    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
668    /// let waves = executor.execution_waves();
669    /// // Wave 0 contains "a" and "b" (can run concurrently)
670    /// // Wave 1 contains "c" (depends on both)
671    /// assert_eq!(waves.len(), 2);
672    /// ```
673    #[must_use]
674    pub fn execution_waves(&self) -> Vec<super::introspection::ExecutionWave> {
675        let Ok(topo) = toposort(&self.graph, None) else {
676            return vec![];
677        };
678
679        let waves = self.build_execution_waves(&topo);
680
681        waves
682            .into_iter()
683            .enumerate()
684            .filter(|(_, nodes)| !nodes.is_empty())
685            .map(|(level, nodes)| super::introspection::ExecutionWave {
686                level,
687                node_ids: nodes
688                    .iter()
689                    .map(|&idx| self.graph[idx].id.clone())
690                    .collect(),
691            })
692            .collect()
693    }
694
695    /// Get information about a specific node
696    ///
697    /// # Example
698    ///
699    /// ```
700    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
701    /// use serde_json::json;
702    ///
703    /// let mut pipeline = Pipeline::new("test");
704    /// pipeline.add_node(Node::new("fetch", "http", json!({"url": "https://example.com"})));
705    /// pipeline.add_node(Node::new("extract", "ai", json!({})));
706    /// pipeline.add_edge(Edge::new("fetch", "extract"));
707    ///
708    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
709    /// let info = executor.node_info("fetch").unwrap();
710    /// assert_eq!(info.service, "http");
711    /// assert_eq!(info.in_degree, 0);
712    /// assert_eq!(info.out_degree, 1);
713    /// ```
714    #[must_use]
715    pub fn node_info(&self, id: &str) -> Option<super::introspection::NodeInfo> {
716        let depths = self.compute_depths();
717
718        self.graph
719            .node_indices()
720            .find(|&idx| self.graph[idx].id == id)
721            .map(|idx| {
722                let node = &self.graph[idx];
723                let predecessors: Vec<String> = self
724                    .graph
725                    .neighbors_directed(idx, petgraph::Direction::Incoming)
726                    .map(|pred| self.graph[pred].id.clone())
727                    .collect();
728                let successors: Vec<String> = self
729                    .graph
730                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
731                    .map(|succ| self.graph[succ].id.clone())
732                    .collect();
733
734                super::introspection::NodeInfo {
735                    id: node.id.clone(),
736                    service: node.service.clone(),
737                    depth: depths.get(&idx).copied().unwrap_or(0),
738                    in_degree: predecessors.len(),
739                    out_degree: successors.len(),
740                    predecessors,
741                    successors,
742                    config: node.config.clone(),
743                    metadata: node.metadata.clone(),
744                }
745            })
746    }
747
748    /// Get all nodes matching a query
749    ///
750    /// # Example
751    ///
752    /// ```
753    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
754    /// use stygian_graph::domain::introspection::NodeQuery;
755    /// use serde_json::json;
756    ///
757    /// let mut pipeline = Pipeline::new("test");
758    /// pipeline.add_node(Node::new("fetch1", "http", json!({})));
759    /// pipeline.add_node(Node::new("fetch2", "http", json!({})));
760    /// pipeline.add_node(Node::new("extract", "ai", json!({})));
761    ///
762    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
763    /// let http_nodes = executor.query_nodes(&NodeQuery::by_service("http"));
764    /// assert_eq!(http_nodes.len(), 2);
765    /// ```
766    #[must_use]
767    pub fn query_nodes(
768        &self,
769        query: &super::introspection::NodeQuery,
770    ) -> Vec<super::introspection::NodeInfo> {
771        self.graph
772            .node_indices()
773            .filter_map(|idx| self.node_info(&self.graph[idx].id))
774            .filter(|info| query.matches(info))
775            .collect()
776    }
777
778    /// Compute the depth of each node from root nodes
779    fn compute_depths(&self) -> HashMap<NodeIndex, usize> {
780        let mut depths = HashMap::new();
781
782        let Ok(topo) = toposort(&self.graph, None) else {
783            return depths;
784        };
785
786        for &idx in &topo {
787            let max_pred_depth = self
788                .graph
789                .neighbors_directed(idx, petgraph::Direction::Incoming)
790                .filter_map(|pred| depths.get(&pred))
791                .max()
792                .copied()
793                .map_or(0, |d| d + 1);
794            depths.insert(idx, max_pred_depth);
795        }
796
797        depths
798    }
799
800    /// Get connectivity metrics for the graph
801    ///
802    /// # Example
803    ///
804    /// ```
805    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
806    /// use serde_json::json;
807    ///
808    /// let mut pipeline = Pipeline::new("test");
809    /// pipeline.add_node(Node::new("a", "http", json!({})));
810    /// pipeline.add_node(Node::new("b", "http", json!({})));
811    /// pipeline.add_edge(Edge::new("a", "b"));
812    ///
813    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
814    /// let metrics = executor.connectivity();
815    /// assert_eq!(metrics.root_nodes, vec!["a".to_string()]);
816    /// assert_eq!(metrics.leaf_nodes, vec!["b".to_string()]);
817    /// ```
818    #[must_use]
819    pub fn connectivity(&self) -> super::introspection::ConnectivityMetrics {
820        let depths = self.compute_depths();
821
822        let root_nodes: Vec<String> = self
823            .graph
824            .node_indices()
825            .filter(|&idx| {
826                self.graph
827                    .neighbors_directed(idx, petgraph::Direction::Incoming)
828                    .next()
829                    .is_none()
830            })
831            .map(|idx| self.graph[idx].id.clone())
832            .collect();
833
834        let leaf_nodes: Vec<String> = self
835            .graph
836            .node_indices()
837            .filter(|&idx| {
838                self.graph
839                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
840                    .next()
841                    .is_none()
842            })
843            .map(|idx| self.graph[idx].id.clone())
844            .collect();
845
846        let max_depth = depths.values().copied().max().unwrap_or(0);
847
848        let total_degree: usize = self
849            .graph
850            .node_indices()
851            .map(|idx| {
852                let in_deg = self
853                    .graph
854                    .neighbors_directed(idx, petgraph::Direction::Incoming)
855                    .count();
856                let out_deg = self
857                    .graph
858                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
859                    .count();
860                in_deg + out_deg
861            })
862            .sum();
863
864        let node_count = self.graph.node_count();
865        let avg_degree = if node_count > 0 {
866            match (u32::try_from(total_degree), u32::try_from(node_count)) {
867                (Ok(total), Ok(count)) if count > 0 => f64::from(total) / f64::from(count),
868                _ => f64::from(u32::MAX),
869            }
870        } else {
871            0.0
872        };
873
874        // Count weakly connected components
875        let component_count = petgraph::algo::connected_components(&self.graph);
876
877        super::introspection::ConnectivityMetrics {
878            is_connected: component_count <= 1,
879            component_count,
880            root_nodes,
881            leaf_nodes,
882            max_depth,
883            avg_degree,
884        }
885    }
886
887    /// Get the critical path (longest path through the graph)
888    ///
889    /// # Example
890    ///
891    /// ```
892    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
893    /// use serde_json::json;
894    ///
895    /// let mut pipeline = Pipeline::new("test");
896    /// pipeline.add_node(Node::new("a", "http", json!({})));
897    /// pipeline.add_node(Node::new("b", "http", json!({})));
898    /// pipeline.add_node(Node::new("c", "http", json!({})));
899    /// pipeline.add_edge(Edge::new("a", "b"));
900    /// pipeline.add_edge(Edge::new("b", "c"));
901    ///
902    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
903    /// let critical = executor.critical_path();
904    /// assert_eq!(critical.length, 3);
905    /// assert_eq!(critical.nodes, vec!["a", "b", "c"]);
906    /// ```
907    #[must_use]
908    pub fn critical_path(&self) -> super::introspection::CriticalPath {
909        let depths = self.compute_depths();
910
911        // Find the deepest node
912        let deepest = depths.iter().max_by_key(|&(_, d)| d);
913
914        if let Some((&end_idx, _)) = deepest {
915            // Trace back from deepest to a root
916            let mut path = vec![self.graph[end_idx].id.clone()];
917            let mut current = end_idx;
918
919            while let Some(pred) = self
920                .graph
921                .neighbors_directed(current, petgraph::Direction::Incoming)
922                .max_by_key(|&p| depths.get(&p).copied().unwrap_or(0))
923            {
924                path.push(self.graph[pred].id.clone());
925                current = pred;
926            }
927
928            path.reverse();
929
930            super::introspection::CriticalPath {
931                length: path.len(),
932                nodes: path,
933            }
934        } else {
935            super::introspection::CriticalPath {
936                length: 0,
937                nodes: vec![],
938            }
939        }
940    }
941
942    /// Analyze the impact of changing a node
943    ///
944    /// Returns all nodes that would be affected (upstream dependencies
945    /// and downstream dependents, transitively).
946    ///
947    /// # Example
948    ///
949    /// ```
950    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
951    /// use serde_json::json;
952    ///
953    /// let mut pipeline = Pipeline::new("test");
954    /// pipeline.add_node(Node::new("a", "http", json!({})));
955    /// pipeline.add_node(Node::new("b", "http", json!({})));
956    /// pipeline.add_node(Node::new("c", "http", json!({})));
957    /// pipeline.add_edge(Edge::new("a", "b"));
958    /// pipeline.add_edge(Edge::new("b", "c"));
959    ///
960    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
961    /// let impact = executor.impact_analysis("b");
962    /// assert_eq!(impact.upstream, vec!["a".to_string()]);
963    /// assert_eq!(impact.downstream, vec!["c".to_string()]);
964    /// ```
965    #[must_use]
966    pub fn impact_analysis(&self, id: &str) -> super::introspection::ImpactAnalysis {
967        let node_idx = self
968            .graph
969            .node_indices()
970            .find(|&idx| self.graph[idx].id == id);
971
972        let Some(start_idx) = node_idx else {
973            return super::introspection::ImpactAnalysis {
974                node_id: id.to_string(),
975                upstream: vec![],
976                downstream: vec![],
977                total_affected: 0,
978            };
979        };
980
981        // BFS upstream
982        let mut upstream = Vec::new();
983        let mut visited = std::collections::HashSet::new();
984        let mut queue = std::collections::VecDeque::new();
985
986        for pred in self
987            .graph
988            .neighbors_directed(start_idx, petgraph::Direction::Incoming)
989        {
990            queue.push_back(pred);
991        }
992
993        while let Some(idx) = queue.pop_front() {
994            if visited.insert(idx) {
995                upstream.push(self.graph[idx].id.clone());
996                for pred in self
997                    .graph
998                    .neighbors_directed(idx, petgraph::Direction::Incoming)
999                {
1000                    queue.push_back(pred);
1001                }
1002            }
1003        }
1004
1005        // BFS downstream
1006        let mut downstream = Vec::new();
1007        visited.clear();
1008        queue.clear();
1009
1010        for succ in self
1011            .graph
1012            .neighbors_directed(start_idx, petgraph::Direction::Outgoing)
1013        {
1014            queue.push_back(succ);
1015        }
1016
1017        while let Some(idx) = queue.pop_front() {
1018            if visited.insert(idx) {
1019                downstream.push(self.graph[idx].id.clone());
1020                for succ in self
1021                    .graph
1022                    .neighbors_directed(idx, petgraph::Direction::Outgoing)
1023                {
1024                    queue.push_back(succ);
1025                }
1026            }
1027        }
1028
1029        let total_affected = upstream.len() + downstream.len();
1030
1031        super::introspection::ImpactAnalysis {
1032            node_id: id.to_string(),
1033            upstream,
1034            downstream,
1035            total_affected,
1036        }
1037    }
1038
1039    /// Get a complete snapshot of the graph for introspection
1040    ///
1041    /// # Example
1042    ///
1043    /// ```
1044    /// use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
1045    /// use serde_json::json;
1046    ///
1047    /// let mut pipeline = Pipeline::new("test");
1048    /// pipeline.add_node(Node::new("fetch", "http", json!({})));
1049    /// pipeline.add_node(Node::new("extract", "ai", json!({})));
1050    /// pipeline.add_edge(Edge::new("fetch", "extract"));
1051    ///
1052    /// let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
1053    /// let snapshot = executor.snapshot();
1054    ///
1055    /// assert_eq!(snapshot.node_count, 2);
1056    /// assert_eq!(snapshot.edge_count, 1);
1057    /// ```
1058    #[must_use]
1059    pub fn snapshot(&self) -> super::introspection::GraphSnapshot {
1060        let nodes: Vec<super::introspection::NodeInfo> = self
1061            .graph
1062            .node_indices()
1063            .filter_map(|idx| self.node_info(&self.graph[idx].id))
1064            .collect();
1065
1066        let edges: Vec<super::introspection::EdgeInfo> = self
1067            .graph
1068            .edge_references()
1069            .map(|edge| {
1070                let from = &self.graph[edge.source()].id;
1071                let to = &self.graph[edge.target()].id;
1072                super::introspection::EdgeInfo {
1073                    from: from.clone(),
1074                    to: to.clone(),
1075                    config: serde_json::Value::Null,
1076                }
1077            })
1078            .collect();
1079
1080        let mut service_distribution = HashMap::new();
1081        for node in &nodes {
1082            *service_distribution
1083                .entry(node.service.clone())
1084                .or_insert(0) += 1;
1085        }
1086
1087        super::introspection::GraphSnapshot {
1088            node_count: self.node_count(),
1089            edge_count: self.edge_count(),
1090            nodes,
1091            edges,
1092            waves: self.execution_waves(),
1093            topological_order: self.topological_order(),
1094            critical_path: self.critical_path(),
1095            connectivity: self.connectivity(),
1096            service_distribution,
1097        }
1098    }
1099}
1100
1101impl Default for DagExecutor {
1102    fn default() -> Self {
1103        Self::new()
1104    }
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109    use super::*;
1110    use crate::domain::error::Result;
1111
1112    #[test]
1113    fn test_node_creation() {
1114        let node = Node::new(
1115            "test",
1116            "http",
1117            serde_json::json!({"url": "https://example.com"}),
1118        );
1119        assert_eq!(node.id, "test");
1120        assert_eq!(node.service, "http");
1121    }
1122
1123    #[test]
1124    fn test_edge_creation() {
1125        let edge = Edge::new("a", "b");
1126        assert_eq!(edge.from, "a");
1127        assert_eq!(edge.to, "b");
1128    }
1129
1130    #[test]
1131    fn test_pipeline_validation() {
1132        let mut pipeline = Pipeline::new("test");
1133        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1134        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1135        pipeline.add_edge(Edge::new("fetch", "extract"));
1136
1137        assert!(pipeline.validate().is_ok());
1138    }
1139
1140    #[test]
1141    fn test_cycle_detection() {
1142        let mut pipeline = Pipeline::new("cyclic");
1143        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1144        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1145        pipeline.add_edge(Edge::new("a", "b"));
1146        pipeline.add_edge(Edge::new("b", "a")); // Creates a cycle
1147
1148        let result = DagExecutor::from_pipeline(&pipeline);
1149        assert!(matches!(
1150            result,
1151            Err(StygianError::Graph(GraphError::CycleDetected))
1152        ));
1153    }
1154
1155    /// Diamond graph: A → B, A → C, B+C → D
1156    /// B and C run concurrently in the same wave; D waits for both.
1157    #[tokio::test]
1158    async fn test_diamond_concurrent_execution() -> Result<()> {
1159        use crate::adapters::noop::NoopService;
1160
1161        // Build diamond pipeline
1162        let mut pipeline = Pipeline::new("diamond");
1163        pipeline.add_node(Node::new("A", "noop", serde_json::json!({"url": ""})));
1164        pipeline.add_node(Node::new("B", "noop", serde_json::json!({"url": ""})));
1165        pipeline.add_node(Node::new("C", "noop", serde_json::json!({"url": ""})));
1166        pipeline.add_node(Node::new("D", "noop", serde_json::json!({"url": ""})));
1167        pipeline.add_edge(Edge::new("A", "B"));
1168        pipeline.add_edge(Edge::new("A", "C"));
1169        pipeline.add_edge(Edge::new("B", "D"));
1170        pipeline.add_edge(Edge::new("C", "D"));
1171
1172        let executor = DagExecutor::from_pipeline(&pipeline)?;
1173
1174        let mut services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
1175            HashMap::new();
1176        services.insert("noop".to_string(), std::sync::Arc::new(NoopService));
1177
1178        let results = executor.execute(&services).await?;
1179
1180        // All 4 nodes should produce a result
1181        assert_eq!(results.len(), 4);
1182        let ids: Vec<&str> = results.iter().map(|r| r.node_id.as_str()).collect();
1183        assert!(ids.contains(&"A"));
1184        assert!(ids.contains(&"B"));
1185        assert!(ids.contains(&"C"));
1186        assert!(ids.contains(&"D"));
1187        Ok(())
1188    }
1189
1190    #[tokio::test]
1191    async fn test_missing_service_returns_error() -> Result<()> {
1192        let mut pipeline = Pipeline::new("test");
1193        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1194
1195        let executor = DagExecutor::from_pipeline(&pipeline)?;
1196        let services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
1197            HashMap::new();
1198
1199        let result = executor.execute(&services).await;
1200        assert!(result.is_err());
1201        Ok(())
1202    }
1203
1204    // ═══════════════════════════════════════════════════════════════════════════
1205    // Introspection Tests
1206    // ═══════════════════════════════════════════════════════════════════════════
1207
1208    #[test]
1209    fn test_introspection_node_count() -> std::result::Result<(), Box<dyn std::error::Error>> {
1210        let mut pipeline = Pipeline::new("test");
1211        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1212        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1213        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1214
1215        let executor = DagExecutor::from_pipeline(&pipeline)?;
1216        assert_eq!(executor.node_count(), 3);
1217        assert_eq!(executor.edge_count(), 0);
1218        Ok(())
1219    }
1220
1221    #[test]
1222    fn test_introspection_node_ids() -> std::result::Result<(), Box<dyn std::error::Error>> {
1223        let mut pipeline = Pipeline::new("test");
1224        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1225        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1226
1227        let executor = DagExecutor::from_pipeline(&pipeline)?;
1228        let ids = executor.node_ids();
1229        assert!(ids.contains(&"fetch".to_string()));
1230        assert!(ids.contains(&"extract".to_string()));
1231        Ok(())
1232    }
1233
1234    #[test]
1235    fn test_introspection_get_node() -> std::result::Result<(), Box<dyn std::error::Error>> {
1236        let mut pipeline = Pipeline::new("test");
1237        pipeline.add_node(Node::new(
1238            "fetch",
1239            "http",
1240            serde_json::json!({"url": "https://example.com"}),
1241        ));
1242
1243        let executor = DagExecutor::from_pipeline(&pipeline)?;
1244
1245        let node = executor.get_node("fetch");
1246        assert!(node.is_some());
1247        let node = node.ok_or_else(|| std::io::Error::other("expected fetch node to exist"))?;
1248        assert_eq!(node.service, "http");
1249
1250        assert!(executor.get_node("nonexistent").is_none());
1251        Ok(())
1252    }
1253
1254    #[test]
1255    fn test_introspection_predecessors_successors()
1256    -> std::result::Result<(), Box<dyn std::error::Error>> {
1257        let mut pipeline = Pipeline::new("test");
1258        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1259        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1260        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1261        pipeline.add_edge(Edge::new("a", "b"));
1262        pipeline.add_edge(Edge::new("b", "c"));
1263
1264        let executor = DagExecutor::from_pipeline(&pipeline)?;
1265
1266        assert_eq!(executor.predecessors("a"), Vec::<String>::new());
1267        assert_eq!(executor.predecessors("b"), vec!["a".to_string()]);
1268        assert_eq!(executor.predecessors("c"), vec!["b".to_string()]);
1269
1270        assert_eq!(executor.successors("a"), vec!["b".to_string()]);
1271        assert_eq!(executor.successors("b"), vec!["c".to_string()]);
1272        assert_eq!(executor.successors("c"), Vec::<String>::new());
1273        Ok(())
1274    }
1275
1276    #[test]
1277    fn test_introspection_topological_order() -> std::result::Result<(), Box<dyn std::error::Error>>
1278    {
1279        let mut pipeline = Pipeline::new("test");
1280        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1281        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1282        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1283        pipeline.add_edge(Edge::new("a", "b"));
1284        pipeline.add_edge(Edge::new("b", "c"));
1285
1286        let executor = DagExecutor::from_pipeline(&pipeline)?;
1287        let order = executor.topological_order();
1288
1289        let a_pos = order
1290            .iter()
1291            .position(|x| x == "a")
1292            .ok_or_else(|| std::io::Error::other("expected node a in order"))?;
1293        let b_pos = order
1294            .iter()
1295            .position(|x| x == "b")
1296            .ok_or_else(|| std::io::Error::other("expected node b in order"))?;
1297        let c_pos = order
1298            .iter()
1299            .position(|x| x == "c")
1300            .ok_or_else(|| std::io::Error::other("expected node c in order"))?;
1301
1302        assert!(a_pos < b_pos);
1303        assert!(b_pos < c_pos);
1304        Ok(())
1305    }
1306
1307    #[test]
1308    fn test_introspection_execution_waves_diamond()
1309    -> std::result::Result<(), Box<dyn std::error::Error>> {
1310        // Diamond: A → (B, C) → D
1311        let mut pipeline = Pipeline::new("diamond");
1312        pipeline.add_node(Node::new("A", "http", serde_json::json!({})));
1313        pipeline.add_node(Node::new("B", "http", serde_json::json!({})));
1314        pipeline.add_node(Node::new("C", "http", serde_json::json!({})));
1315        pipeline.add_node(Node::new("D", "http", serde_json::json!({})));
1316        pipeline.add_edge(Edge::new("A", "B"));
1317        pipeline.add_edge(Edge::new("A", "C"));
1318        pipeline.add_edge(Edge::new("B", "D"));
1319        pipeline.add_edge(Edge::new("C", "D"));
1320
1321        let executor = DagExecutor::from_pipeline(&pipeline)?;
1322        let waves = executor.execution_waves();
1323
1324        // Should have 3 waves: [A], [B, C], [D]
1325        assert_eq!(waves.len(), 3);
1326        let first_wave = waves
1327            .first()
1328            .ok_or_else(|| std::io::Error::other("missing wave 0"))?;
1329        let second_wave = waves
1330            .get(1)
1331            .ok_or_else(|| std::io::Error::other("missing wave 1"))?;
1332        let third_wave = waves
1333            .get(2)
1334            .ok_or_else(|| std::io::Error::other("missing wave 2"))?;
1335
1336        assert_eq!(first_wave.level, 0);
1337        assert!(first_wave.node_ids.contains(&"A".to_string()));
1338        assert_eq!(second_wave.level, 1);
1339        assert!(second_wave.node_ids.contains(&"B".to_string()));
1340        assert!(second_wave.node_ids.contains(&"C".to_string()));
1341        assert_eq!(third_wave.level, 2);
1342        assert!(third_wave.node_ids.contains(&"D".to_string()));
1343        Ok(())
1344    }
1345
1346    #[test]
1347    fn test_introspection_node_info() -> std::result::Result<(), Box<dyn std::error::Error>> {
1348        let mut pipeline = Pipeline::new("test");
1349        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1350        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1351        pipeline.add_edge(Edge::new("fetch", "extract"));
1352
1353        let executor = DagExecutor::from_pipeline(&pipeline)?;
1354
1355        let info = executor
1356            .node_info("fetch")
1357            .ok_or_else(|| std::io::Error::other("expected fetch node info"))?;
1358        assert_eq!(info.id, "fetch");
1359        assert_eq!(info.service, "http");
1360        assert_eq!(info.depth, 0);
1361        assert_eq!(info.in_degree, 0);
1362        assert_eq!(info.out_degree, 1);
1363        assert!(info.successors.contains(&"extract".to_string()));
1364
1365        let info = executor
1366            .node_info("extract")
1367            .ok_or_else(|| std::io::Error::other("expected extract node info"))?;
1368        assert_eq!(info.depth, 1);
1369        assert_eq!(info.in_degree, 1);
1370        assert_eq!(info.out_degree, 0);
1371        Ok(())
1372    }
1373
1374    #[test]
1375    fn test_introspection_connectivity() -> std::result::Result<(), Box<dyn std::error::Error>> {
1376        let mut pipeline = Pipeline::new("test");
1377        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1378        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1379        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1380        pipeline.add_edge(Edge::new("a", "b"));
1381        pipeline.add_edge(Edge::new("b", "c"));
1382
1383        let executor = DagExecutor::from_pipeline(&pipeline)?;
1384        let metrics = executor.connectivity();
1385
1386        assert!(metrics.is_connected);
1387        assert_eq!(metrics.component_count, 1);
1388        assert_eq!(metrics.root_nodes, vec!["a".to_string()]);
1389        assert_eq!(metrics.leaf_nodes, vec!["c".to_string()]);
1390        assert_eq!(metrics.max_depth, 2);
1391        Ok(())
1392    }
1393
1394    #[test]
1395    fn test_introspection_critical_path() -> std::result::Result<(), Box<dyn std::error::Error>> {
1396        let mut pipeline = Pipeline::new("test");
1397        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1398        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1399        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1400        pipeline.add_edge(Edge::new("a", "b"));
1401        pipeline.add_edge(Edge::new("b", "c"));
1402
1403        let executor = DagExecutor::from_pipeline(&pipeline)?;
1404        let critical = executor.critical_path();
1405
1406        assert_eq!(critical.length, 3);
1407        assert_eq!(critical.nodes, vec!["a", "b", "c"]);
1408        Ok(())
1409    }
1410
1411    #[test]
1412    fn test_introspection_impact_analysis() -> std::result::Result<(), Box<dyn std::error::Error>> {
1413        let mut pipeline = Pipeline::new("test");
1414        pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
1415        pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
1416        pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
1417        pipeline.add_edge(Edge::new("a", "b"));
1418        pipeline.add_edge(Edge::new("b", "c"));
1419
1420        let executor = DagExecutor::from_pipeline(&pipeline)?;
1421
1422        let impact = executor.impact_analysis("b");
1423        assert_eq!(impact.node_id, "b");
1424        assert_eq!(impact.upstream, vec!["a".to_string()]);
1425        assert_eq!(impact.downstream, vec!["c".to_string()]);
1426        assert_eq!(impact.total_affected, 2);
1427
1428        // Root node has no upstream
1429        let impact = executor.impact_analysis("a");
1430        assert!(impact.upstream.is_empty());
1431        assert_eq!(impact.downstream.len(), 2);
1432
1433        // Leaf node has no downstream
1434        let impact = executor.impact_analysis("c");
1435        assert_eq!(impact.upstream.len(), 2);
1436        assert!(impact.downstream.is_empty());
1437        Ok(())
1438    }
1439
1440    #[test]
1441    fn test_introspection_snapshot() -> std::result::Result<(), Box<dyn std::error::Error>> {
1442        let mut pipeline = Pipeline::new("test");
1443        pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
1444        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1445        pipeline.add_edge(Edge::new("fetch", "extract"));
1446
1447        let executor = DagExecutor::from_pipeline(&pipeline)?;
1448        let snapshot = executor.snapshot();
1449
1450        assert_eq!(snapshot.node_count, 2);
1451        assert_eq!(snapshot.edge_count, 1);
1452        assert_eq!(snapshot.nodes.len(), 2);
1453        assert_eq!(snapshot.edges.len(), 1);
1454        assert_eq!(snapshot.waves.len(), 2);
1455        assert_eq!(snapshot.topological_order.len(), 2);
1456        assert_eq!(snapshot.critical_path.length, 2);
1457        assert_eq!(snapshot.service_distribution.get("http"), Some(&1));
1458        assert_eq!(snapshot.service_distribution.get("ai"), Some(&1));
1459        Ok(())
1460    }
1461
1462    #[test]
1463    fn test_introspection_query_nodes() -> std::result::Result<(), Box<dyn std::error::Error>> {
1464        use super::super::introspection::NodeQuery;
1465
1466        let mut pipeline = Pipeline::new("test");
1467        pipeline.add_node(Node::new("fetch1", "http", serde_json::json!({})));
1468        pipeline.add_node(Node::new("fetch2", "http", serde_json::json!({})));
1469        pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
1470        pipeline.add_edge(Edge::new("fetch1", "extract"));
1471        pipeline.add_edge(Edge::new("fetch2", "extract"));
1472
1473        let executor = DagExecutor::from_pipeline(&pipeline)?;
1474
1475        // Query by service
1476        let http_nodes = executor.query_nodes(&NodeQuery::by_service("http"));
1477        assert_eq!(http_nodes.len(), 2);
1478
1479        let ai_nodes = executor.query_nodes(&NodeQuery::by_service("ai"));
1480        assert_eq!(ai_nodes.len(), 1);
1481
1482        // Query roots
1483        let roots = executor.query_nodes(&NodeQuery::roots());
1484        assert_eq!(roots.len(), 2);
1485
1486        // Query leaves
1487        let leaves = executor.query_nodes(&NodeQuery::leaves());
1488        assert_eq!(leaves.len(), 1);
1489        let leaf = leaves
1490            .first()
1491            .ok_or_else(|| std::io::Error::other("expected one leaf node"))?;
1492        assert_eq!(leaf.id, "extract");
1493        Ok(())
1494    }
1495}