use std::collections::HashMap;
use std::sync::Arc;
use petgraph::algo::toposort;
use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::EdgeRef;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use super::error::{GraphError, StygianError};
use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Node {
pub id: String,
pub service: String,
pub config: serde_json::Value,
#[serde(default)]
pub metadata: serde_json::Value,
}
impl Node {
pub fn new(
id: impl Into<String>,
service: impl Into<String>,
config: serde_json::Value,
) -> Self {
Self {
id: id.into(),
service: service.into(),
config,
metadata: serde_json::Value::Null,
}
}
pub fn with_metadata(
id: impl Into<String>,
service: impl Into<String>,
config: serde_json::Value,
metadata: serde_json::Value,
) -> Self {
Self {
id: id.into(),
service: service.into(),
config,
metadata,
}
}
pub fn validate(&self) -> Result<(), StygianError> {
if self.id.is_empty() {
return Err(GraphError::InvalidEdge("Node ID cannot be empty".into()).into());
}
if self.service.is_empty() {
return Err(GraphError::InvalidEdge("Node service type cannot be empty".into()).into());
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Edge {
pub from: String,
pub to: String,
#[serde(default)]
pub config: serde_json::Value,
}
impl Edge {
pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
Self {
from: from.into(),
to: to.into(),
config: serde_json::Value::Null,
}
}
pub fn with_config(
from: impl Into<String>,
to: impl Into<String>,
config: serde_json::Value,
) -> Self {
Self {
from: from.into(),
to: to.into(),
config,
}
}
pub fn validate(&self) -> Result<(), StygianError> {
if self.from.is_empty() || self.to.is_empty() {
return Err(GraphError::InvalidEdge("Edge endpoints cannot be empty".into()).into());
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Pipeline {
pub name: String,
pub nodes: Vec<Node>,
pub edges: Vec<Edge>,
#[serde(default)]
pub metadata: serde_json::Value,
}
impl Pipeline {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
nodes: Vec::new(),
edges: Vec::new(),
metadata: serde_json::Value::Null,
}
}
pub fn add_node(&mut self, node: Node) {
self.nodes.push(node);
}
pub fn add_edge(&mut self, edge: Edge) {
self.edges.push(edge);
}
pub fn validate(&self) -> Result<(), StygianError> {
for node in &self.nodes {
node.validate()?;
}
for edge in &self.edges {
edge.validate()?;
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct NodeResult {
pub node_id: String,
pub output: ServiceOutput,
}
pub struct DagExecutor {
graph: DiGraph<Node, ()>,
_node_indices: HashMap<String, NodeIndex>,
}
impl DagExecutor {
pub fn new() -> Self {
Self {
graph: DiGraph::new(),
_node_indices: HashMap::new(),
}
}
pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, StygianError> {
pipeline.validate()?;
let mut graph = DiGraph::new();
let mut node_indices = HashMap::new();
for node in &pipeline.nodes {
let idx = graph.add_node(node.clone());
node_indices.insert(node.id.clone(), idx);
}
for edge in &pipeline.edges {
let from_idx = node_indices
.get(&edge.from)
.ok_or_else(|| GraphError::NodeNotFound(edge.from.clone()))?;
let to_idx = node_indices
.get(&edge.to)
.ok_or_else(|| GraphError::NodeNotFound(edge.to.clone()))?;
graph.add_edge(*from_idx, *to_idx, ());
}
if petgraph::algo::is_cyclic_directed(&graph) {
return Err(GraphError::CycleDetected.into());
}
Ok(Self {
graph,
_node_indices: node_indices,
})
}
pub async fn execute(
&self,
services: &HashMap<String, Arc<dyn ScrapingService>>,
) -> Result<Vec<NodeResult>, StygianError> {
let topo_order = toposort(&self.graph, None).map_err(|_| GraphError::CycleDetected)?;
let waves = self.build_execution_waves(&topo_order);
let results: Arc<Mutex<HashMap<String, ServiceOutput>>> =
Arc::new(Mutex::new(HashMap::new()));
for wave in waves {
let mut handles = Vec::new();
for node_idx in wave {
let node = self.graph[node_idx].clone();
let service = services.get(&node.service).cloned().ok_or_else(|| {
GraphError::InvalidPipeline(format!(
"No service registered for type '{}'",
node.service
))
})?;
let upstream_data = {
let store = results.lock().await;
let mut data = serde_json::Map::new();
for pred_idx in self
.graph
.neighbors_directed(node_idx, petgraph::Direction::Incoming)
{
let pred_id = &self.graph[pred_idx].id;
if let Some(out) = store.get(pred_id) {
data.insert(
pred_id.clone(),
serde_json::Value::String(out.data.clone()),
);
}
}
serde_json::Value::Object(data)
};
let input = ServiceInput {
url: node
.config
.get("url")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
params: upstream_data,
};
let results_clone = Arc::clone(&results);
let node_id = node.id.clone();
handles.push(tokio::spawn(async move {
let output = service.execute(input).await?;
results_clone
.lock()
.await
.insert(node_id.clone(), output.clone());
Ok::<NodeResult, StygianError>(NodeResult { node_id, output })
}));
}
for handle in handles {
handle
.await
.map_err(|e| GraphError::ExecutionFailed(format!("Task join error: {e}")))??;
}
}
let store = results.lock().await;
let final_results = topo_order
.iter()
.filter_map(|idx| {
let node_id = &self.graph[*idx].id;
store.get(node_id).map(|output| NodeResult {
node_id: node_id.clone(),
output: output.clone(),
})
})
.collect();
Ok(final_results)
}
fn build_execution_waves(&self, topo_order: &[NodeIndex]) -> Vec<Vec<NodeIndex>> {
let mut level: HashMap<NodeIndex, usize> = HashMap::new();
for &idx in topo_order {
let max_pred_level = self
.graph
.neighbors_directed(idx, petgraph::Direction::Incoming)
.map(|pred| level.get(&pred).copied().unwrap_or(0) + 1)
.max()
.unwrap_or(0);
level.insert(idx, max_pred_level);
}
let max_level = level.values().copied().max().unwrap_or(0);
let mut waves: Vec<Vec<NodeIndex>> = vec![Vec::new(); max_level + 1];
for (idx, lvl) in level {
if let Some(wave) = waves.get_mut(lvl) {
wave.push(idx);
}
}
waves
}
#[must_use]
pub fn node_count(&self) -> usize {
self.graph.node_count()
}
#[must_use]
pub fn edge_count(&self) -> usize {
self.graph.edge_count()
}
#[must_use]
pub fn node_ids(&self) -> Vec<String> {
self.graph
.node_indices()
.map(|idx| self.graph[idx].id.clone())
.collect()
}
#[must_use]
pub fn get_node(&self, id: &str) -> Option<&Node> {
self.graph
.node_indices()
.find(|&idx| self.graph[idx].id == id)
.map(|idx| &self.graph[idx])
}
#[must_use]
pub fn predecessors(&self, id: &str) -> Vec<String> {
self.graph
.node_indices()
.find(|&idx| self.graph[idx].id == id)
.map(|idx| {
self.graph
.neighbors_directed(idx, petgraph::Direction::Incoming)
.map(|pred| self.graph[pred].id.clone())
.collect()
})
.unwrap_or_default()
}
#[must_use]
pub fn successors(&self, id: &str) -> Vec<String> {
self.graph
.node_indices()
.find(|&idx| self.graph[idx].id == id)
.map(|idx| {
self.graph
.neighbors_directed(idx, petgraph::Direction::Outgoing)
.map(|succ| self.graph[succ].id.clone())
.collect()
})
.unwrap_or_default()
}
#[must_use]
pub fn topological_order(&self) -> Vec<String> {
toposort(&self.graph, None)
.map(|indices| {
indices
.iter()
.map(|&idx| self.graph[idx].id.clone())
.collect()
})
.unwrap_or_default()
}
#[must_use]
pub fn execution_waves(&self) -> Vec<super::introspection::ExecutionWave> {
let Ok(topo) = toposort(&self.graph, None) else {
return vec![];
};
let waves = self.build_execution_waves(&topo);
waves
.into_iter()
.enumerate()
.filter(|(_, nodes)| !nodes.is_empty())
.map(|(level, nodes)| super::introspection::ExecutionWave {
level,
node_ids: nodes
.iter()
.map(|&idx| self.graph[idx].id.clone())
.collect(),
})
.collect()
}
#[must_use]
pub fn node_info(&self, id: &str) -> Option<super::introspection::NodeInfo> {
let depths = self.compute_depths();
self.graph
.node_indices()
.find(|&idx| self.graph[idx].id == id)
.map(|idx| {
let node = &self.graph[idx];
let predecessors: Vec<String> = self
.graph
.neighbors_directed(idx, petgraph::Direction::Incoming)
.map(|pred| self.graph[pred].id.clone())
.collect();
let successors: Vec<String> = self
.graph
.neighbors_directed(idx, petgraph::Direction::Outgoing)
.map(|succ| self.graph[succ].id.clone())
.collect();
super::introspection::NodeInfo {
id: node.id.clone(),
service: node.service.clone(),
depth: depths.get(&idx).copied().unwrap_or(0),
in_degree: predecessors.len(),
out_degree: successors.len(),
predecessors,
successors,
config: node.config.clone(),
metadata: node.metadata.clone(),
}
})
}
#[must_use]
pub fn query_nodes(
&self,
query: &super::introspection::NodeQuery,
) -> Vec<super::introspection::NodeInfo> {
self.graph
.node_indices()
.filter_map(|idx| self.node_info(&self.graph[idx].id))
.filter(|info| query.matches(info))
.collect()
}
fn compute_depths(&self) -> HashMap<NodeIndex, usize> {
let mut depths = HashMap::new();
let Ok(topo) = toposort(&self.graph, None) else {
return depths;
};
for &idx in &topo {
let max_pred_depth = self
.graph
.neighbors_directed(idx, petgraph::Direction::Incoming)
.filter_map(|pred| depths.get(&pred))
.max()
.copied()
.map_or(0, |d| d + 1);
depths.insert(idx, max_pred_depth);
}
depths
}
#[must_use]
pub fn connectivity(&self) -> super::introspection::ConnectivityMetrics {
let depths = self.compute_depths();
let root_nodes: Vec<String> = self
.graph
.node_indices()
.filter(|&idx| {
self.graph
.neighbors_directed(idx, petgraph::Direction::Incoming)
.next()
.is_none()
})
.map(|idx| self.graph[idx].id.clone())
.collect();
let leaf_nodes: Vec<String> = self
.graph
.node_indices()
.filter(|&idx| {
self.graph
.neighbors_directed(idx, petgraph::Direction::Outgoing)
.next()
.is_none()
})
.map(|idx| self.graph[idx].id.clone())
.collect();
let max_depth = depths.values().copied().max().unwrap_or(0);
let total_degree: usize = self
.graph
.node_indices()
.map(|idx| {
let in_deg = self
.graph
.neighbors_directed(idx, petgraph::Direction::Incoming)
.count();
let out_deg = self
.graph
.neighbors_directed(idx, petgraph::Direction::Outgoing)
.count();
in_deg + out_deg
})
.sum();
let node_count = self.graph.node_count();
let avg_degree = if node_count > 0 {
match (u32::try_from(total_degree), u32::try_from(node_count)) {
(Ok(total), Ok(count)) if count > 0 => f64::from(total) / f64::from(count),
_ => f64::from(u32::MAX),
}
} else {
0.0
};
let component_count = petgraph::algo::connected_components(&self.graph);
super::introspection::ConnectivityMetrics {
is_connected: component_count <= 1,
component_count,
root_nodes,
leaf_nodes,
max_depth,
avg_degree,
}
}
#[must_use]
pub fn critical_path(&self) -> super::introspection::CriticalPath {
let depths = self.compute_depths();
let deepest = depths.iter().max_by_key(|&(_, d)| d);
if let Some((&end_idx, _)) = deepest {
let mut path = vec![self.graph[end_idx].id.clone()];
let mut current = end_idx;
while let Some(pred) = self
.graph
.neighbors_directed(current, petgraph::Direction::Incoming)
.max_by_key(|&p| depths.get(&p).copied().unwrap_or(0))
{
path.push(self.graph[pred].id.clone());
current = pred;
}
path.reverse();
super::introspection::CriticalPath {
length: path.len(),
nodes: path,
}
} else {
super::introspection::CriticalPath {
length: 0,
nodes: vec![],
}
}
}
#[must_use]
pub fn impact_analysis(&self, id: &str) -> super::introspection::ImpactAnalysis {
let node_idx = self
.graph
.node_indices()
.find(|&idx| self.graph[idx].id == id);
let Some(start_idx) = node_idx else {
return super::introspection::ImpactAnalysis {
node_id: id.to_string(),
upstream: vec![],
downstream: vec![],
total_affected: 0,
};
};
let mut upstream = Vec::new();
let mut visited = std::collections::HashSet::new();
let mut queue = std::collections::VecDeque::new();
for pred in self
.graph
.neighbors_directed(start_idx, petgraph::Direction::Incoming)
{
queue.push_back(pred);
}
while let Some(idx) = queue.pop_front() {
if visited.insert(idx) {
upstream.push(self.graph[idx].id.clone());
for pred in self
.graph
.neighbors_directed(idx, petgraph::Direction::Incoming)
{
queue.push_back(pred);
}
}
}
let mut downstream = Vec::new();
visited.clear();
queue.clear();
for succ in self
.graph
.neighbors_directed(start_idx, petgraph::Direction::Outgoing)
{
queue.push_back(succ);
}
while let Some(idx) = queue.pop_front() {
if visited.insert(idx) {
downstream.push(self.graph[idx].id.clone());
for succ in self
.graph
.neighbors_directed(idx, petgraph::Direction::Outgoing)
{
queue.push_back(succ);
}
}
}
let total_affected = upstream.len() + downstream.len();
super::introspection::ImpactAnalysis {
node_id: id.to_string(),
upstream,
downstream,
total_affected,
}
}
#[must_use]
pub fn snapshot(&self) -> super::introspection::GraphSnapshot {
let nodes: Vec<super::introspection::NodeInfo> = self
.graph
.node_indices()
.filter_map(|idx| self.node_info(&self.graph[idx].id))
.collect();
let edges: Vec<super::introspection::EdgeInfo> = self
.graph
.edge_references()
.map(|edge| {
let from = &self.graph[edge.source()].id;
let to = &self.graph[edge.target()].id;
super::introspection::EdgeInfo {
from: from.clone(),
to: to.clone(),
config: serde_json::Value::Null,
}
})
.collect();
let mut service_distribution = HashMap::new();
for node in &nodes {
*service_distribution
.entry(node.service.clone())
.or_insert(0) += 1;
}
super::introspection::GraphSnapshot {
node_count: self.node_count(),
edge_count: self.edge_count(),
nodes,
edges,
waves: self.execution_waves(),
topological_order: self.topological_order(),
critical_path: self.critical_path(),
connectivity: self.connectivity(),
service_distribution,
}
}
}
impl Default for DagExecutor {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::error::Result;
#[test]
fn test_node_creation() {
let node = Node::new(
"test",
"http",
serde_json::json!({"url": "https://example.com"}),
);
assert_eq!(node.id, "test");
assert_eq!(node.service, "http");
}
#[test]
fn test_edge_creation() {
let edge = Edge::new("a", "b");
assert_eq!(edge.from, "a");
assert_eq!(edge.to, "b");
}
#[test]
fn test_pipeline_validation() {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("fetch", "extract"));
assert!(pipeline.validate().is_ok());
}
#[test]
fn test_cycle_detection() {
let mut pipeline = Pipeline::new("cyclic");
pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
pipeline.add_edge(Edge::new("a", "b"));
pipeline.add_edge(Edge::new("b", "a"));
let result = DagExecutor::from_pipeline(&pipeline);
assert!(matches!(
result,
Err(StygianError::Graph(GraphError::CycleDetected))
));
}
#[tokio::test]
async fn test_diamond_concurrent_execution() -> Result<()> {
use crate::adapters::noop::NoopService;
let mut pipeline = Pipeline::new("diamond");
pipeline.add_node(Node::new("A", "noop", serde_json::json!({"url": ""})));
pipeline.add_node(Node::new("B", "noop", serde_json::json!({"url": ""})));
pipeline.add_node(Node::new("C", "noop", serde_json::json!({"url": ""})));
pipeline.add_node(Node::new("D", "noop", serde_json::json!({"url": ""})));
pipeline.add_edge(Edge::new("A", "B"));
pipeline.add_edge(Edge::new("A", "C"));
pipeline.add_edge(Edge::new("B", "D"));
pipeline.add_edge(Edge::new("C", "D"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let mut services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
HashMap::new();
services.insert("noop".to_string(), std::sync::Arc::new(NoopService));
let results = executor.execute(&services).await?;
assert_eq!(results.len(), 4);
let ids: Vec<&str> = results.iter().map(|r| r.node_id.as_str()).collect();
assert!(ids.contains(&"A"));
assert!(ids.contains(&"B"));
assert!(ids.contains(&"C"));
assert!(ids.contains(&"D"));
Ok(())
}
#[tokio::test]
async fn test_missing_service_returns_error() -> Result<()> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let services: HashMap<String, std::sync::Arc<dyn crate::ports::ScrapingService>> =
HashMap::new();
let result = executor.execute(&services).await;
assert!(result.is_err());
Ok(())
}
#[test]
fn test_introspection_node_count() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
let executor = DagExecutor::from_pipeline(&pipeline)?;
assert_eq!(executor.node_count(), 3);
assert_eq!(executor.edge_count(), 0);
Ok(())
}
#[test]
fn test_introspection_node_ids() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let ids = executor.node_ids();
assert!(ids.contains(&"fetch".to_string()));
assert!(ids.contains(&"extract".to_string()));
Ok(())
}
#[test]
fn test_introspection_get_node() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new(
"fetch",
"http",
serde_json::json!({"url": "https://example.com"}),
));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let node = executor.get_node("fetch");
assert!(node.is_some());
let node = node.ok_or_else(|| std::io::Error::other("expected fetch node to exist"))?;
assert_eq!(node.service, "http");
assert!(executor.get_node("nonexistent").is_none());
Ok(())
}
#[test]
fn test_introspection_predecessors_successors()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("a", "b"));
pipeline.add_edge(Edge::new("b", "c"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
assert_eq!(executor.predecessors("a"), Vec::<String>::new());
assert_eq!(executor.predecessors("b"), vec!["a".to_string()]);
assert_eq!(executor.predecessors("c"), vec!["b".to_string()]);
assert_eq!(executor.successors("a"), vec!["b".to_string()]);
assert_eq!(executor.successors("b"), vec!["c".to_string()]);
assert_eq!(executor.successors("c"), Vec::<String>::new());
Ok(())
}
#[test]
fn test_introspection_topological_order() -> std::result::Result<(), Box<dyn std::error::Error>>
{
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("a", "b"));
pipeline.add_edge(Edge::new("b", "c"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let order = executor.topological_order();
let a_pos = order
.iter()
.position(|x| x == "a")
.ok_or_else(|| std::io::Error::other("expected node a in order"))?;
let b_pos = order
.iter()
.position(|x| x == "b")
.ok_or_else(|| std::io::Error::other("expected node b in order"))?;
let c_pos = order
.iter()
.position(|x| x == "c")
.ok_or_else(|| std::io::Error::other("expected node c in order"))?;
assert!(a_pos < b_pos);
assert!(b_pos < c_pos);
Ok(())
}
#[test]
fn test_introspection_execution_waves_diamond()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("diamond");
pipeline.add_node(Node::new("A", "http", serde_json::json!({})));
pipeline.add_node(Node::new("B", "http", serde_json::json!({})));
pipeline.add_node(Node::new("C", "http", serde_json::json!({})));
pipeline.add_node(Node::new("D", "http", serde_json::json!({})));
pipeline.add_edge(Edge::new("A", "B"));
pipeline.add_edge(Edge::new("A", "C"));
pipeline.add_edge(Edge::new("B", "D"));
pipeline.add_edge(Edge::new("C", "D"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let waves = executor.execution_waves();
assert_eq!(waves.len(), 3);
let first_wave = waves
.first()
.ok_or_else(|| std::io::Error::other("missing wave 0"))?;
let second_wave = waves
.get(1)
.ok_or_else(|| std::io::Error::other("missing wave 1"))?;
let third_wave = waves
.get(2)
.ok_or_else(|| std::io::Error::other("missing wave 2"))?;
assert_eq!(first_wave.level, 0);
assert!(first_wave.node_ids.contains(&"A".to_string()));
assert_eq!(second_wave.level, 1);
assert!(second_wave.node_ids.contains(&"B".to_string()));
assert!(second_wave.node_ids.contains(&"C".to_string()));
assert_eq!(third_wave.level, 2);
assert!(third_wave.node_ids.contains(&"D".to_string()));
Ok(())
}
#[test]
fn test_introspection_node_info() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("fetch", "extract"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let info = executor
.node_info("fetch")
.ok_or_else(|| std::io::Error::other("expected fetch node info"))?;
assert_eq!(info.id, "fetch");
assert_eq!(info.service, "http");
assert_eq!(info.depth, 0);
assert_eq!(info.in_degree, 0);
assert_eq!(info.out_degree, 1);
assert!(info.successors.contains(&"extract".to_string()));
let info = executor
.node_info("extract")
.ok_or_else(|| std::io::Error::other("expected extract node info"))?;
assert_eq!(info.depth, 1);
assert_eq!(info.in_degree, 1);
assert_eq!(info.out_degree, 0);
Ok(())
}
#[test]
fn test_introspection_connectivity() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("a", "b"));
pipeline.add_edge(Edge::new("b", "c"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let metrics = executor.connectivity();
assert!(metrics.is_connected);
assert_eq!(metrics.component_count, 1);
assert_eq!(metrics.root_nodes, vec!["a".to_string()]);
assert_eq!(metrics.leaf_nodes, vec!["c".to_string()]);
assert_eq!(metrics.max_depth, 2);
Ok(())
}
#[test]
fn test_introspection_critical_path() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("a", "b"));
pipeline.add_edge(Edge::new("b", "c"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let critical = executor.critical_path();
assert_eq!(critical.length, 3);
assert_eq!(critical.nodes, vec!["a", "b", "c"]);
Ok(())
}
#[test]
fn test_introspection_impact_analysis() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", serde_json::json!({})));
pipeline.add_node(Node::new("b", "http", serde_json::json!({})));
pipeline.add_node(Node::new("c", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("a", "b"));
pipeline.add_edge(Edge::new("b", "c"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let impact = executor.impact_analysis("b");
assert_eq!(impact.node_id, "b");
assert_eq!(impact.upstream, vec!["a".to_string()]);
assert_eq!(impact.downstream, vec!["c".to_string()]);
assert_eq!(impact.total_affected, 2);
let impact = executor.impact_analysis("a");
assert!(impact.upstream.is_empty());
assert_eq!(impact.downstream.len(), 2);
let impact = executor.impact_analysis("c");
assert_eq!(impact.upstream.len(), 2);
assert!(impact.downstream.is_empty());
Ok(())
}
#[test]
fn test_introspection_snapshot() -> std::result::Result<(), Box<dyn std::error::Error>> {
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", serde_json::json!({})));
pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("fetch", "extract"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let snapshot = executor.snapshot();
assert_eq!(snapshot.node_count, 2);
assert_eq!(snapshot.edge_count, 1);
assert_eq!(snapshot.nodes.len(), 2);
assert_eq!(snapshot.edges.len(), 1);
assert_eq!(snapshot.waves.len(), 2);
assert_eq!(snapshot.topological_order.len(), 2);
assert_eq!(snapshot.critical_path.length, 2);
assert_eq!(snapshot.service_distribution.get("http"), Some(&1));
assert_eq!(snapshot.service_distribution.get("ai"), Some(&1));
Ok(())
}
#[test]
fn test_introspection_query_nodes() -> std::result::Result<(), Box<dyn std::error::Error>> {
use super::super::introspection::NodeQuery;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch1", "http", serde_json::json!({})));
pipeline.add_node(Node::new("fetch2", "http", serde_json::json!({})));
pipeline.add_node(Node::new("extract", "ai", serde_json::json!({})));
pipeline.add_edge(Edge::new("fetch1", "extract"));
pipeline.add_edge(Edge::new("fetch2", "extract"));
let executor = DagExecutor::from_pipeline(&pipeline)?;
let http_nodes = executor.query_nodes(&NodeQuery::by_service("http"));
assert_eq!(http_nodes.len(), 2);
let ai_nodes = executor.query_nodes(&NodeQuery::by_service("ai"));
assert_eq!(ai_nodes.len(), 1);
let roots = executor.query_nodes(&NodeQuery::roots());
assert_eq!(roots.len(), 2);
let leaves = executor.query_nodes(&NodeQuery::leaves());
assert_eq!(leaves.len(), 1);
let leaf = leaves
.first()
.ok_or_else(|| std::io::Error::other("expected one leaf node"))?;
assert_eq!(leaf.id, "extract");
Ok(())
}
}