pub struct Graph { /* private fields */ }Expand description
A graph containing nodes and edges.
Graphs represent the structure of a data processing pipeline, with nodes representing processing components and edges representing data flow between them.
See the module-level documentation for detailed information about graph execution, nested graphs, lifecycle control, and usage examples.
Implementations§
Source§impl Graph
impl Graph
Sourcepub fn expose_input_port(
&mut self,
internal_node: &str,
internal_port: &str,
external_name: &str,
) -> Result<(), String>
pub fn expose_input_port( &mut self, internal_node: &str, internal_port: &str, external_name: &str, ) -> Result<(), String>
Exposes an internal node’s input port as an external input port.
This allows external streams to flow into the graph through specific internal nodes.
§Arguments
internal_node- The name of the internal nodeinternal_port- The name of the input port on the internal nodeexternal_name- The name of the external port (must be “configuration” or “input”)
§Returns
Ok(()) if the port was exposed successfully, or an error if:
- The internal node doesn’t exist
- The internal port doesn’t exist on the node
- The external port name is invalid (must be “configuration” or “input”)
§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
let mut graph = Graph::new("g".to_string());
graph.add_node("source".to_string(), Box::new(VariableNode::new("source".to_string()))).unwrap();
graph.expose_input_port("source", "value", "input").unwrap();Sourcepub fn expose_output_port(
&mut self,
internal_node: &str,
internal_port: &str,
external_name: &str,
) -> Result<(), String>
pub fn expose_output_port( &mut self, internal_node: &str, internal_port: &str, external_name: &str, ) -> Result<(), String>
Exposes an internal node’s output port as an external output port.
This allows internal streams to flow out of the graph through specific internal nodes.
§Arguments
internal_node- The name of the internal nodeinternal_port- The name of the output port on the internal nodeexternal_name- The name of the external port (must be “output” or “error”)
§Returns
Ok(()) if the port was exposed successfully, or an error if:
- The internal node doesn’t exist
- The internal port doesn’t exist on the node
- The external port name is invalid (must be “output” or “error”)
§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
let mut graph = Graph::new("g".to_string());
graph.add_node("sink".to_string(), Box::new(VariableNode::new("sink".to_string()))).unwrap();
graph.expose_output_port("sink", "out", "output").unwrap();Sourcepub fn connect_input_channel(
&mut self,
external_port: &str,
receiver: Receiver<Arc<dyn Any + Send + Sync>>,
) -> Result<(), String>
pub fn connect_input_channel( &mut self, external_port: &str, receiver: Receiver<Arc<dyn Any + Send + Sync>>, ) -> Result<(), String>
Connects an input channel to an exposed input port.
This allows external data to be sent to the graph through the specified port.
§Arguments
external_port- The name of the exposed input portreceiver- The channel receiver for input data
§Returns
Ok(()) if the connection was successful, or an error if the port is not exposed.
§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
use tokio::sync::mpsc;
use std::sync::Arc;
let mut graph = Graph::new("g".to_string());
graph.add_node("n".to_string(), Box::new(VariableNode::new("n".to_string()))).unwrap();
graph.expose_input_port("n", "value", "configuration").unwrap();
let (tx, rx) = mpsc::channel(10);
graph.connect_input_channel("configuration", rx).unwrap();Sourcepub fn connect_output_channel(
&mut self,
external_port: &str,
sender: Sender<Arc<dyn Any + Send + Sync>>,
) -> Result<(), String>
pub fn connect_output_channel( &mut self, external_port: &str, sender: Sender<Arc<dyn Any + Send + Sync>>, ) -> Result<(), String>
Connects an output channel to an exposed output port.
This allows graph output to be sent to external consumers through the specified port.
§Arguments
external_port- The name of the exposed output portsender- The channel sender for output data
§Returns
Ok(()) if the connection was successful, or an error if the port is not exposed.
§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
use tokio::sync::mpsc;
use std::sync::Arc;
let mut graph = Graph::new("g".to_string());
graph.add_node("n".to_string(), Box::new(VariableNode::new("n".to_string()))).unwrap();
graph.expose_output_port("n", "out", "output").unwrap();
let (tx, rx) = mpsc::channel(10);
graph.connect_output_channel("output", tx).unwrap();Sourcepub fn get_nodes(&self) -> MutexGuard<'_, HashMap<String, Box<dyn Node>>>
pub fn get_nodes(&self) -> MutexGuard<'_, HashMap<String, Box<dyn Node>>>
Returns a guard over the nodes map. Use .values() to iterate node references.
§Returns
A mutex guard that derefs to the nodes HashMap.
Sourcepub fn find_node_by_name(
&self,
name: &str,
) -> Option<MutexGuard<'_, HashMap<String, Box<dyn Node>>>>
pub fn find_node_by_name( &self, name: &str, ) -> Option<MutexGuard<'_, HashMap<String, Box<dyn Node>>>>
Sourcepub fn node_count(&self) -> usize
pub fn node_count(&self) -> usize
Sourcepub fn edge_count(&self) -> usize
pub fn edge_count(&self) -> usize
Sourcepub fn add_node(
&mut self,
name: String,
node: Box<dyn Node>,
) -> Result<(), String>
pub fn add_node( &mut self, name: String, node: Box<dyn Node>, ) -> Result<(), String>
Adds a node to the graph.
§Arguments
name- The name to assign to the node (should matchnode.name())node- The node to add to the graph
§Returns
Ok(()) if the node was added successfully, or an error if a node with
the same name already exists.
§Errors
Returns an error string if a node with the given name already exists in the graph.
Sourcepub fn remove_node(&mut self, name: &str) -> Result<(), String>
pub fn remove_node(&mut self, name: &str) -> Result<(), String>
Removes a node from the graph.
§Arguments
name- The name of the node to remove
§Returns
Ok(()) if the node was removed successfully, or an error if the node
doesn’t exist or has connected edges.
§Errors
Returns an error string if the node doesn’t exist or cannot be removed (e.g., it has connected edges).
Sourcepub fn find_edge_by_nodes_and_ports(
&self,
source_node: &str,
source_port: &str,
target_node: &str,
target_port: &str,
) -> Option<&Edge>
pub fn find_edge_by_nodes_and_ports( &self, source_node: &str, source_port: &str, target_node: &str, target_port: &str, ) -> Option<&Edge>
Gets an edge by source and target node and port.
§Arguments
source_node- The name of the source nodesource_port- The name of the source output porttarget_node- The name of the target nodetarget_port- The name of the target input port
§Returns
Some(&Edge) if an edge matching the given parameters exists, None otherwise.
Sourcepub fn add_edge(&mut self, edge: Edge) -> Result<(), String>
pub fn add_edge(&mut self, edge: Edge) -> Result<(), String>
Adds an edge to the graph.
§Arguments
edge- The edge to add to the graph
§Returns
Ok(()) if the edge was added successfully, or an error if the edge is invalid
(e.g., nodes don’t exist or ports don’t exist).
§Errors
Returns an error string if:
- The source or target node doesn’t exist
- The source or target port doesn’t exist on the respective node
- The edge would create a duplicate connection
Sourcepub fn remove_edge(
&mut self,
source_node: &str,
source_port: &str,
target_node: &str,
target_port: &str,
) -> Result<(), String>
pub fn remove_edge( &mut self, source_node: &str, source_port: &str, target_node: &str, target_port: &str, ) -> Result<(), String>
Removes an edge from the graph.
§Arguments
source_node- The name of the source nodesource_port- The name of the source output porttarget_node- The name of the target nodetarget_port- The name of the target input port
§Returns
Ok(()) if the edge was removed successfully, or an error if the edge
doesn’t exist.
§Errors
Returns an error string if no edge matching the given parameters exists.
Sourcepub async fn execute(&mut self) -> Result<(), GraphExecutionError>
pub async fn execute(&mut self) -> Result<(), GraphExecutionError>
Executes the graph by connecting streams between nodes.
This method:
- Routes external input streams to exposed input ports
- Performs topological sort to determine execution order
- For each node, collects input streams from connected upstream nodes
- Calls
node.execute(inputs)which returns output streams - Routes exposed output ports to external output senders
- Connects output streams to downstream nodes’ input streams
- Drives all streams to completion
Channels are used internally for backpressure, but are never exposed to nodes. Nodes only see and work with streams. External I/O is handled through exposed ports.
§Returns
Ok(()) if execution was started successfully, or an error if:
- Nodes have invalid port configurations
- Streams cannot be created or connected
- Tasks cannot be spawned
- External I/O connections are invalid
§Errors
Returns GraphExecutionError if execution cannot be started.
§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut graph = Graph::new("g".to_string());
graph.add_node("map".to_string(), Box::new(VariableNode::new("map".to_string())))?;
graph.expose_input_port("map", "value", "configuration")?;
graph.expose_output_port("map", "out", "output")?;
let (config_tx, config_rx) = mpsc::channel(1);
let (output_tx, _output_rx) = mpsc::channel(10);
graph.connect_input_channel("configuration", config_rx)?;
graph.connect_output_channel("output", output_tx)?;
graph.execute().await.unwrap();
graph.wait_for_completion().await.unwrap();
Ok(())
}Sourcepub fn start(&self)
pub fn start(&self)
Starts graph execution.
Sets the execution state to running. The graph will begin processing when data arrives on input ports (pull-based model).
§Returns
Ok(()) if execution was started successfully.
Sourcepub fn pause(&self)
pub fn pause(&self)
Pauses graph execution.
The graph will stop processing new data but maintains its current state.
§Returns
Ok(()) if execution was paused successfully.
Sourcepub fn resume(&self)
pub fn resume(&self)
Resumes graph execution.
Resumes processing after a pause.
§Returns
Ok(()) if execution was resumed successfully.
Sourcepub async fn stop(&self) -> Result<(), GraphExecutionError>
pub async fn stop(&self) -> Result<(), GraphExecutionError>
Stops graph execution and clears all state.
This method:
- Signals all nodes to stop processing
- Clears all execution handles
- Resets execution state to None
- All data flowing through the graph is discarded
§Returns
Ok(()) if execution was stopped successfully, or an error if stopping failed.
§Errors
Returns GraphExecutionError if execution cannot be stopped gracefully.
Sourcepub async fn wait_for_completion(&mut self) -> Result<(), GraphExecutionError>
pub async fn wait_for_completion(&mut self) -> Result<(), GraphExecutionError>
Waits for all nodes in the graph to complete execution.
This method blocks until all node tasks have finished. Use this after calling
execute() to wait for the graph to finish processing. When using the dataflow
execution model, nodes are restored into the graph after all tasks complete.
§Returns
Ok(()) if all nodes completed successfully, or an error if any node failed.
§Errors
Returns GraphExecutionError if any node execution failed or if waiting timed out.