Skip to main content

Graph

Struct Graph 

Source
pub struct Graph { /* private fields */ }
Expand description

A graph containing nodes and edges.

Graphs represent the structure of a data processing pipeline, with nodes representing processing components and edges representing data flow between them.

See the module-level documentation for detailed information about graph execution, nested graphs, lifecycle control, and usage examples.

Implementations§

Source§

impl Graph

Source

pub fn new(name: String) -> Self

Creates a new empty graph with the given name.

§Arguments
  • name - The name for the graph
§Returns

A new Graph instance with no nodes or edges.

§Example
use streamweave::graph::Graph;

let graph = Graph::new("my_graph".to_string());
Source

pub fn expose_input_port( &mut self, internal_node: &str, internal_port: &str, external_name: &str, ) -> Result<(), String>

Exposes an internal node’s input port as an external input port.

This allows external streams to flow into the graph through specific internal nodes.

§Arguments
  • internal_node - The name of the internal node
  • internal_port - The name of the input port on the internal node
  • external_name - The name of the external port (must be “configuration” or “input”)
§Returns

Ok(()) if the port was exposed successfully, or an error if:

  • The internal node doesn’t exist
  • The internal port doesn’t exist on the node
  • The external port name is invalid (must be “configuration” or “input”)
§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
let mut graph = Graph::new("g".to_string());
graph.add_node("source".to_string(), Box::new(VariableNode::new("source".to_string()))).unwrap();
graph.expose_input_port("source", "value", "input").unwrap();
Source

pub fn expose_output_port( &mut self, internal_node: &str, internal_port: &str, external_name: &str, ) -> Result<(), String>

Exposes an internal node’s output port as an external output port.

This allows internal streams to flow out of the graph through specific internal nodes.

§Arguments
  • internal_node - The name of the internal node
  • internal_port - The name of the output port on the internal node
  • external_name - The name of the external port (must be “output” or “error”)
§Returns

Ok(()) if the port was exposed successfully, or an error if:

  • The internal node doesn’t exist
  • The internal port doesn’t exist on the node
  • The external port name is invalid (must be “output” or “error”)
§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
let mut graph = Graph::new("g".to_string());
graph.add_node("sink".to_string(), Box::new(VariableNode::new("sink".to_string()))).unwrap();
graph.expose_output_port("sink", "out", "output").unwrap();
Source

pub fn connect_input_channel( &mut self, external_port: &str, receiver: Receiver<Arc<dyn Any + Send + Sync>>, ) -> Result<(), String>

Connects an input channel to an exposed input port.

This allows external data to be sent to the graph through the specified port.

§Arguments
  • external_port - The name of the exposed input port
  • receiver - The channel receiver for input data
§Returns

Ok(()) if the connection was successful, or an error if the port is not exposed.

§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
use tokio::sync::mpsc;
use std::sync::Arc;
let mut graph = Graph::new("g".to_string());
graph.add_node("n".to_string(), Box::new(VariableNode::new("n".to_string()))).unwrap();
graph.expose_input_port("n", "value", "configuration").unwrap();
let (tx, rx) = mpsc::channel(10);
graph.connect_input_channel("configuration", rx).unwrap();
Source

pub fn connect_output_channel( &mut self, external_port: &str, sender: Sender<Arc<dyn Any + Send + Sync>>, ) -> Result<(), String>

Connects an output channel to an exposed output port.

This allows graph output to be sent to external consumers through the specified port.

§Arguments
  • external_port - The name of the exposed output port
  • sender - The channel sender for output data
§Returns

Ok(()) if the connection was successful, or an error if the port is not exposed.

§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
use tokio::sync::mpsc;
use std::sync::Arc;
let mut graph = Graph::new("g".to_string());
graph.add_node("n".to_string(), Box::new(VariableNode::new("n".to_string()))).unwrap();
graph.expose_output_port("n", "out", "output").unwrap();
let (tx, rx) = mpsc::channel(10);
graph.connect_output_channel("output", tx).unwrap();
Source

pub fn name(&self) -> &str

Returns the name of the graph.

§Returns

A string slice containing the graph’s name.

Source

pub fn set_name(&mut self, name: &str)

Sets the name of the graph.

§Arguments
  • name - The new name for the graph
Source

pub fn get_nodes(&self) -> MutexGuard<'_, HashMap<String, Box<dyn Node>>>

Returns a guard over the nodes map. Use .values() to iterate node references.

§Returns

A mutex guard that derefs to the nodes HashMap.

Source

pub fn find_node_by_name( &self, name: &str, ) -> Option<MutexGuard<'_, HashMap<String, Box<dyn Node>>>>

Returns a guard over the nodes map if the given node exists. Use .get(name) to get the node.

§Arguments
  • name - The name of the node to find
§Returns

Some(guard) if the node exists; the guard derefs to the nodes HashMap.

Source

pub fn node_count(&self) -> usize

Returns the number of nodes in the graph.

§Returns

The number of nodes in the graph.

Source

pub fn edge_count(&self) -> usize

Returns the number of edges in the graph.

§Returns

The number of edges in the graph.

Source

pub fn has_node(&self, name: &str) -> bool

Checks if a node with the given name exists in the graph.

§Arguments
  • name - The name of the node to check
§Returns

true if a node with the given name exists, false otherwise.

Source

pub fn has_edge(&self, source_node: &str, target_node: &str) -> bool

Checks if an edge exists between two nodes and ports.

§Arguments
  • source_node - The name of the source node
  • target_node - The name of the target node
§Returns

true if an edge exists between the nodes, false otherwise.

Source

pub fn add_node( &mut self, name: String, node: Box<dyn Node>, ) -> Result<(), String>

Adds a node to the graph.

§Arguments
  • name - The name to assign to the node (should match node.name())
  • node - The node to add to the graph
§Returns

Ok(()) if the node was added successfully, or an error if a node with the same name already exists.

§Errors

Returns an error string if a node with the given name already exists in the graph.

Source

pub fn remove_node(&mut self, name: &str) -> Result<(), String>

Removes a node from the graph.

§Arguments
  • name - The name of the node to remove
§Returns

Ok(()) if the node was removed successfully, or an error if the node doesn’t exist or has connected edges.

§Errors

Returns an error string if the node doesn’t exist or cannot be removed (e.g., it has connected edges).

Source

pub fn get_edges(&self) -> Vec<&Edge>

Returns all edges in the graph.

§Returns

A vector of references to all edges in the graph.

Source

pub fn find_edge_by_nodes_and_ports( &self, source_node: &str, source_port: &str, target_node: &str, target_port: &str, ) -> Option<&Edge>

Gets an edge by source and target node and port.

§Arguments
  • source_node - The name of the source node
  • source_port - The name of the source output port
  • target_node - The name of the target node
  • target_port - The name of the target input port
§Returns

Some(&Edge) if an edge matching the given parameters exists, None otherwise.

Source

pub fn add_edge(&mut self, edge: Edge) -> Result<(), String>

Adds an edge to the graph.

§Arguments
  • edge - The edge to add to the graph
§Returns

Ok(()) if the edge was added successfully, or an error if the edge is invalid (e.g., nodes don’t exist or ports don’t exist).

§Errors

Returns an error string if:

  • The source or target node doesn’t exist
  • The source or target port doesn’t exist on the respective node
  • The edge would create a duplicate connection
Source

pub fn remove_edge( &mut self, source_node: &str, source_port: &str, target_node: &str, target_port: &str, ) -> Result<(), String>

Removes an edge from the graph.

§Arguments
  • source_node - The name of the source node
  • source_port - The name of the source output port
  • target_node - The name of the target node
  • target_port - The name of the target input port
§Returns

Ok(()) if the edge was removed successfully, or an error if the edge doesn’t exist.

§Errors

Returns an error string if no edge matching the given parameters exists.

Source

pub async fn execute(&mut self) -> Result<(), GraphExecutionError>

Executes the graph by connecting streams between nodes.

This method:

  1. Routes external input streams to exposed input ports
  2. Performs topological sort to determine execution order
  3. For each node, collects input streams from connected upstream nodes
  4. Calls node.execute(inputs) which returns output streams
  5. Routes exposed output ports to external output senders
  6. Connects output streams to downstream nodes’ input streams
  7. Drives all streams to completion

Channels are used internally for backpressure, but are never exposed to nodes. Nodes only see and work with streams. External I/O is handled through exposed ports.

§Returns

Ok(()) if execution was started successfully, or an error if:

  • Nodes have invalid port configurations
  • Streams cannot be created or connected
  • Tasks cannot be spawned
  • External I/O connections are invalid
§Errors

Returns GraphExecutionError if execution cannot be started.

§Example
use streamweave::graph::Graph;
use streamweave::nodes::variable_node::VariableNode;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  let mut graph = Graph::new("g".to_string());
  graph.add_node("map".to_string(), Box::new(VariableNode::new("map".to_string())))?;
  graph.expose_input_port("map", "value", "configuration")?;
  graph.expose_output_port("map", "out", "output")?;
  let (config_tx, config_rx) = mpsc::channel(1);
  let (output_tx, _output_rx) = mpsc::channel(10);
  graph.connect_input_channel("configuration", config_rx)?;
  graph.connect_output_channel("output", output_tx)?;
  graph.execute().await.unwrap();
  graph.wait_for_completion().await.unwrap();
  Ok(())
}
Source

pub fn start(&self)

Starts graph execution.

Sets the execution state to running. The graph will begin processing when data arrives on input ports (pull-based model).

§Returns

Ok(()) if execution was started successfully.

Source

pub fn pause(&self)

Pauses graph execution.

The graph will stop processing new data but maintains its current state.

§Returns

Ok(()) if execution was paused successfully.

Source

pub fn resume(&self)

Resumes graph execution.

Resumes processing after a pause.

§Returns

Ok(()) if execution was resumed successfully.

Source

pub async fn stop(&self) -> Result<(), GraphExecutionError>

Stops graph execution and clears all state.

This method:

  1. Signals all nodes to stop processing
  2. Clears all execution handles
  3. Resets execution state to None
  4. All data flowing through the graph is discarded
§Returns

Ok(()) if execution was stopped successfully, or an error if stopping failed.

§Errors

Returns GraphExecutionError if execution cannot be stopped gracefully.

Source

pub async fn wait_for_completion(&mut self) -> Result<(), GraphExecutionError>

Waits for all nodes in the graph to complete execution.

This method blocks until all node tasks have finished. Use this after calling execute() to wait for the graph to finish processing. When using the dataflow execution model, nodes are restored into the graph after all tasks complete.

§Returns

Ok(()) if all nodes completed successfully, or an error if any node failed.

§Errors

Returns GraphExecutionError if any node execution failed or if waiting timed out.

Trait Implementations§

Source§

impl Node for Graph

Source§

fn name(&self) -> &str

Returns the name of the node.
Source§

fn set_name(&mut self, name: &str)

Sets the name of the node.
Source§

fn input_port_names(&self) -> &[String]

Returns the names of all input ports. Read more
Source§

fn output_port_names(&self) -> &[String]

Returns the names of all output ports. Read more
Source§

fn has_input_port(&self, name: &str) -> bool

Checks if this node has an input port with the given name.
Source§

fn has_output_port(&self, name: &str) -> bool

Checks if this node has an output port with the given name.
Source§

fn execute( &self, inputs: InputStreams, ) -> Pin<Box<dyn Future<Output = Result<OutputStreams, NodeExecutionError>> + Send + '_>>

Executes the node’s logic. Read more

Auto Trait Implementations§

§

impl Freeze for Graph

§

impl !RefUnwindSafe for Graph

§

impl Send for Graph

§

impl Sync for Graph

§

impl Unpin for Graph

§

impl !UnwindSafe for Graph

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.