pub struct DagExecutor { /* private fields */ }Expand description
DAG executor that processes pipeline graphs
Executes scraping pipelines as directed acyclic graphs using petgraph.
Independent branches are executed concurrently using tokio::spawn.
Data from upstream nodes is passed as input to downstream nodes.
Implementations§
Source§impl DagExecutor
impl DagExecutor
Sourcepub fn new() -> Self
pub fn new() -> Self
Create a new DAG executor
§Example
use stygian_graph::domain::graph::DagExecutor;
let executor = DagExecutor::new();Sourcepub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, StygianError>
pub fn from_pipeline(pipeline: &Pipeline) -> Result<Self, StygianError>
Build a graph from a pipeline definition
§Errors
Returns GraphError::CycleDetected if the pipeline contains a cycle.
Returns GraphError::NodeNotFound if an edge references an unknown node.
Sourcepub async fn execute(
&self,
services: &HashMap<String, Arc<dyn ScrapingService>>,
) -> Result<Vec<NodeResult>, StygianError>
pub async fn execute( &self, services: &HashMap<String, Arc<dyn ScrapingService>>, ) -> Result<Vec<NodeResult>, StygianError>
Execute the pipeline using the provided service registry.
Nodes are executed in topological order. Independent nodes at the same
depth are spawned concurrently via tokio::spawn. The output of each
node is available to all downstream nodes as their ServiceInput.params.
§Errors
Returns GraphError::ExecutionFailed if any node execution fails.
Sourcepub fn node_count(&self) -> usize
pub fn node_count(&self) -> usize
Get the total number of nodes in the graph
§Example
use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
assert_eq!(executor.node_count(), 2);Sourcepub fn edge_count(&self) -> usize
pub fn edge_count(&self) -> usize
Get the total number of edges in the graph
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
pipeline.add_edge(Edge::new("a", "b"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
assert_eq!(executor.edge_count(), 1);Sourcepub fn node_ids(&self) -> Vec<String>
pub fn node_ids(&self) -> Vec<String>
Get all node IDs in the graph
§Example
use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", json!({})));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
assert!(executor.node_ids().contains(&"fetch".to_string()));Sourcepub fn get_node(&self, id: &str) -> Option<&Node>
pub fn get_node(&self, id: &str) -> Option<&Node>
Get a node by ID
§Example
use stygian_graph::domain::graph::{Pipeline, Node, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", json!({})));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let node = executor.get_node("fetch");
assert!(node.is_some());
assert_eq!(node.unwrap().service, "http");Sourcepub fn predecessors(&self, id: &str) -> Vec<String>
pub fn predecessors(&self, id: &str) -> Vec<String>
Get the predecessors (upstream nodes) of a node
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
pipeline.add_edge(Edge::new("a", "b"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let preds = executor.predecessors("b");
assert_eq!(preds, vec!["a".to_string()]);Sourcepub fn successors(&self, id: &str) -> Vec<String>
pub fn successors(&self, id: &str) -> Vec<String>
Get the successors (downstream nodes) of a node
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
pipeline.add_edge(Edge::new("a", "b"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let succs = executor.successors("a");
assert_eq!(succs, vec!["b".to_string()]);Sourcepub fn topological_order(&self) -> Vec<String>
pub fn topological_order(&self) -> Vec<String>
Get the topological order of nodes
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
pipeline.add_edge(Edge::new("a", "b"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let order = executor.topological_order();
// "a" must appear before "b"
let a_pos = order.iter().position(|x| x == "a").unwrap();
let b_pos = order.iter().position(|x| x == "b").unwrap();
assert!(a_pos < b_pos);Sourcepub fn execution_waves(&self) -> Vec<ExecutionWave>
pub fn execution_waves(&self) -> Vec<ExecutionWave>
Get execution waves (groups of nodes that can run concurrently)
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
pipeline.add_node(Node::new("c", "http", json!({})));
pipeline.add_edge(Edge::new("a", "c"));
pipeline.add_edge(Edge::new("b", "c"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let waves = executor.execution_waves();
// Wave 0 contains "a" and "b" (can run concurrently)
// Wave 1 contains "c" (depends on both)
assert_eq!(waves.len(), 2);Sourcepub fn node_info(&self, id: &str) -> Option<NodeInfo>
pub fn node_info(&self, id: &str) -> Option<NodeInfo>
Get information about a specific node
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", json!({"url": "https://example.com"})));
pipeline.add_node(Node::new("extract", "ai", json!({})));
pipeline.add_edge(Edge::new("fetch", "extract"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let info = executor.node_info("fetch").unwrap();
assert_eq!(info.service, "http");
assert_eq!(info.in_degree, 0);
assert_eq!(info.out_degree, 1);Sourcepub fn query_nodes(&self, query: &NodeQuery) -> Vec<NodeInfo>
pub fn query_nodes(&self, query: &NodeQuery) -> Vec<NodeInfo>
Get all nodes matching a query
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use stygian_graph::domain::introspection::NodeQuery;
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch1", "http", json!({})));
pipeline.add_node(Node::new("fetch2", "http", json!({})));
pipeline.add_node(Node::new("extract", "ai", json!({})));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let http_nodes = executor.query_nodes(&NodeQuery::by_service("http"));
assert_eq!(http_nodes.len(), 2);Sourcepub fn connectivity(&self) -> ConnectivityMetrics
pub fn connectivity(&self) -> ConnectivityMetrics
Get connectivity metrics for the graph
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
pipeline.add_edge(Edge::new("a", "b"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let metrics = executor.connectivity();
assert_eq!(metrics.root_nodes, vec!["a".to_string()]);
assert_eq!(metrics.leaf_nodes, vec!["b".to_string()]);Sourcepub fn critical_path(&self) -> CriticalPath
pub fn critical_path(&self) -> CriticalPath
Get the critical path (longest path through the graph)
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
pipeline.add_node(Node::new("c", "http", json!({})));
pipeline.add_edge(Edge::new("a", "b"));
pipeline.add_edge(Edge::new("b", "c"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let critical = executor.critical_path();
assert_eq!(critical.length, 3);
assert_eq!(critical.nodes, vec!["a", "b", "c"]);Sourcepub fn impact_analysis(&self, id: &str) -> ImpactAnalysis
pub fn impact_analysis(&self, id: &str) -> ImpactAnalysis
Analyze the impact of changing a node
Returns all nodes that would be affected (upstream dependencies and downstream dependents, transitively).
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("a", "http", json!({})));
pipeline.add_node(Node::new("b", "http", json!({})));
pipeline.add_node(Node::new("c", "http", json!({})));
pipeline.add_edge(Edge::new("a", "b"));
pipeline.add_edge(Edge::new("b", "c"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let impact = executor.impact_analysis("b");
assert_eq!(impact.upstream, vec!["a".to_string()]);
assert_eq!(impact.downstream, vec!["c".to_string()]);Sourcepub fn snapshot(&self) -> GraphSnapshot
pub fn snapshot(&self) -> GraphSnapshot
Get a complete snapshot of the graph for introspection
§Example
use stygian_graph::domain::graph::{Pipeline, Node, Edge, DagExecutor};
use serde_json::json;
let mut pipeline = Pipeline::new("test");
pipeline.add_node(Node::new("fetch", "http", json!({})));
pipeline.add_node(Node::new("extract", "ai", json!({})));
pipeline.add_edge(Edge::new("fetch", "extract"));
let executor = DagExecutor::from_pipeline(&pipeline).unwrap();
let snapshot = executor.snapshot();
assert_eq!(snapshot.node_count, 2);
assert_eq!(snapshot.edge_count, 1);Trait Implementations§
Auto Trait Implementations§
impl Freeze for DagExecutor
impl RefUnwindSafe for DagExecutor
impl Send for DagExecutor
impl Sync for DagExecutor
impl Unpin for DagExecutor
impl UnsafeUnpin for DagExecutor
impl UnwindSafe for DagExecutor
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> Paint for Twhere
T: ?Sized,
impl<T> Paint for Twhere
T: ?Sized,
Source§fn fg(&self, value: Color) -> Painted<&T>
fn fg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the foreground set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like red() and
green(), which have the same functionality but are
pithier.
§Example
Set foreground color to white using fg():
use yansi::{Paint, Color};
painted.fg(Color::White);Set foreground color to white using white().
use yansi::Paint;
painted.white();Source§fn bright_black(&self) -> Painted<&T>
fn bright_black(&self) -> Painted<&T>
Source§fn bright_red(&self) -> Painted<&T>
fn bright_red(&self) -> Painted<&T>
Source§fn bright_green(&self) -> Painted<&T>
fn bright_green(&self) -> Painted<&T>
Source§fn bright_yellow(&self) -> Painted<&T>
fn bright_yellow(&self) -> Painted<&T>
Source§fn bright_blue(&self) -> Painted<&T>
fn bright_blue(&self) -> Painted<&T>
Source§fn bright_magenta(&self) -> Painted<&T>
fn bright_magenta(&self) -> Painted<&T>
Source§fn bright_cyan(&self) -> Painted<&T>
fn bright_cyan(&self) -> Painted<&T>
Source§fn bright_white(&self) -> Painted<&T>
fn bright_white(&self) -> Painted<&T>
Source§fn bg(&self, value: Color) -> Painted<&T>
fn bg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the background set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like on_red() and
on_green(), which have the same functionality but
are pithier.
§Example
Set background color to red using fg():
use yansi::{Paint, Color};
painted.bg(Color::Red);Set background color to red using on_red().
use yansi::Paint;
painted.on_red();Source§fn on_primary(&self) -> Painted<&T>
fn on_primary(&self) -> Painted<&T>
Source§fn on_magenta(&self) -> Painted<&T>
fn on_magenta(&self) -> Painted<&T>
Source§fn on_bright_black(&self) -> Painted<&T>
fn on_bright_black(&self) -> Painted<&T>
Source§fn on_bright_red(&self) -> Painted<&T>
fn on_bright_red(&self) -> Painted<&T>
Source§fn on_bright_green(&self) -> Painted<&T>
fn on_bright_green(&self) -> Painted<&T>
Source§fn on_bright_yellow(&self) -> Painted<&T>
fn on_bright_yellow(&self) -> Painted<&T>
Source§fn on_bright_blue(&self) -> Painted<&T>
fn on_bright_blue(&self) -> Painted<&T>
Source§fn on_bright_magenta(&self) -> Painted<&T>
fn on_bright_magenta(&self) -> Painted<&T>
Source§fn on_bright_cyan(&self) -> Painted<&T>
fn on_bright_cyan(&self) -> Painted<&T>
Source§fn on_bright_white(&self) -> Painted<&T>
fn on_bright_white(&self) -> Painted<&T>
Source§fn attr(&self, value: Attribute) -> Painted<&T>
fn attr(&self, value: Attribute) -> Painted<&T>
Enables the styling Attribute value.
This method should be used rarely. Instead, prefer to use
attribute-specific builder methods like bold() and
underline(), which have the same functionality
but are pithier.
§Example
Make text bold using attr():
use yansi::{Paint, Attribute};
painted.attr(Attribute::Bold);Make text bold using using bold().
use yansi::Paint;
painted.bold();Source§fn rapid_blink(&self) -> Painted<&T>
fn rapid_blink(&self) -> Painted<&T>
Source§fn quirk(&self, value: Quirk) -> Painted<&T>
fn quirk(&self, value: Quirk) -> Painted<&T>
Enables the yansi Quirk value.
This method should be used rarely. Instead, prefer to use quirk-specific
builder methods like mask() and
wrap(), which have the same functionality but are
pithier.
§Example
Enable wrapping using .quirk():
use yansi::{Paint, Quirk};
painted.quirk(Quirk::Wrap);Enable wrapping using wrap().
use yansi::Paint;
painted.wrap();Source§fn clear(&self) -> Painted<&T>
👎Deprecated since 1.0.1: renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
fn clear(&self) -> Painted<&T>
renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
Source§fn whenever(&self, value: Condition) -> Painted<&T>
fn whenever(&self, value: Condition) -> Painted<&T>
Conditionally enable styling based on whether the Condition value
applies. Replaces any previous condition.
See the crate level docs for more details.
§Example
Enable styling painted only when both stdout and stderr are TTYs:
use yansi::{Paint, Condition};
painted.red().on_yellow().whenever(Condition::STDOUTERR_ARE_TTY);