graph-sp 2026.1.1

High-performance DAG execution engine with Python bindings
Documentation

graph-sp

A high-performance DAG (Directed Acyclic Graph) execution engine with true parallel execution, built in Rust with Python bindings.

CI PyPI Crates.io

Features

  • ⚡ True Parallel Execution: Automatic parallelization of independent nodes (44% faster for fan-out patterns)
  • 🔌 Port-based Architecture: Type-safe data flow between nodes via named ports
  • 🌿 Branching & Nested Graphs: Create isolated subgraphs for experiments and variants
  • 🔀 Merge Operations: Combine outputs from multiple branches with custom merge functions
  • 🔄 Variants & Config Sweeps: Automated parameter variation with cartesian product support
  • 🐍 Python & Rust APIs: Full feature parity across both languages
  • 🔍 Graph Inspection: Analysis, visualization, and Mermaid diagram generation
  • ✅ Cycle Detection: Built-in DAG validation with detailed error reporting
  • 📊 Rich Data Types: Primitives, collections, JSON, nested objects, and binary data
  • 🎯 Zero-Copy Optimization: Efficient data sharing using Arc

Quick Start

Python

Install from PyPI:

pip install pygraph-sp

Simple example:

import pygraphsp as gs

# Create a graph
graph = gs.Graph()

# Add nodes with Python functions using the simplified API
def data_source(inputs):
    return {"output": [1, 2, 3, 4, 5]}

def multiply_by_2(inputs):
    return {"output": [x * 2 for x in inputs["input"]]}

# Add nodes - function name becomes the node ID by default
graph.add(
    data_source,
    label="Data Source",
    outputs=["output"]
)

graph.add(
    multiply_by_2,
    label="Multiply by 2",
    inputs=["input"],
    outputs=["output"]
)

# Connect nodes using function names
graph.add_edge("data_source", "output", "multiply_by_2", "input")

# Execute with parallel processing
executor = gs.Executor()
result = executor.execute(graph)

print(result.get_output("multiply_by_2", "output"))  # [2, 4, 6, 8, 10]

Rust

Add to your Cargo.toml:

[dependencies]
graph-sp = "0.1"
tokio = { version = "1", features = ["full"] }

Simple example:

use graph_sp::{Graph, Node, NodeConfig, Port, PortData, Edge, Executor};
use std::collections::HashMap;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut graph = Graph::new();

    // Add source node
    let source = NodeConfig::new(
        "source", "Data Source",
        vec![],
        vec![Port::simple("output")],
        Arc::new(|_: &HashMap<String, PortData>| {
            Ok(HashMap::from([
                ("output".to_string(), PortData::List(vec![
                    PortData::Int(1), PortData::Int(2), PortData::Int(3)
                ]))
            ]))
        }),
    );

    graph.add(Node::new(source))?;

    // Execute
    let executor = Executor::new();
    let result = executor.execute(&mut graph).await?;

    Ok(())
}

Installation

Python

From PyPI (recommended):

pip install pygraph-sp

From source:

# Clone the repository
git clone https://github.com/briday1/graph-sp.git
cd graph-sp

# Install maturin
pip install maturin

# Build and install
maturin develop --release --features python

Rust

From crates.io:

[dependencies]
graph-sp = "0.1"

From source:

git clone https://github.com/briday1/graph-sp.git
cd graph-sp
cargo build --release

Core Concepts

Ports

Nodes communicate through typed ports with separate broadcast and implementation names:

# Python - Simple ports (broadcast_name == impl_name)
graph.add(
    my_function,
    inputs=["data", "config"],
    outputs=["result"]
)

# Advanced: Separate broadcast and implementation names
# Useful for connecting nodes with different parameter names
graph.add(
    process_data,
    inputs=[("external_data", "internal_param")],  # (broadcast, impl)
    outputs=[("result", "output_value")]
)

# Most explicit: Use Port objects
input_port = gs.Port("broadcast_name", "impl_name", "Display Name")
graph.add(my_function, inputs=[input_port], outputs=[...])
// Rust - Simple port (both names the same)
let port = Port::simple("data");

// Separate broadcast and implementation names
let port = Port::new("external_data", "internal_param");

Port Name Types:

  • broadcast_name: External name used for connecting nodes via edges
  • impl_name: Internal parameter name used in function signatures
  • display_name: Human-readable name for visualizations (defaults to broadcast_name)

Data Types

Support for multiple data types:

  • Primitives: Int, Float, String, Bool, None
  • Collections: List, Map (nested HashMap)
  • Structured: JSON (arbitrary structures)
  • Binary: Bytes (raw binary data)
# Python - nested objects work seamlessly
user = {
    "name": "Alice",
    "age": 30,
    "address": {
        "city": "NYC",
        "zip": "10001"
    },
    "hobbies": ["reading", "coding"]
}

Parallel Execution

The engine automatically parallelizes independent branches:

# This creates a fan-out pattern with 3 parallel branches
#        source
#       /   |   \
#   slow  fast  medium    <- These run in parallel!
#       \   |   /
#        merger

# Execution time: ~500ms (parallel) vs ~900ms (sequential)
# 44% speedup achieved automatically!

Graph Inspection

# Analyze graph structure
analysis = graph.analyze()
print(f"Nodes: {analysis.node_count}")
print(f"Depth: {analysis.depth}")
print(f"Width: {analysis.width}")  # Parallelization potential

# Generate Mermaid diagram
mermaid = graph.to_mermaid()
print(mermaid)  # GitHub-compatible markdown

Branching & Variants (New in v0.2.0)

Branches allow you to create isolated subgraphs:

# Create experimental branches
graph.create_branch("experiment_a")
graph.create_branch("experiment_b")

# Check branches
print(graph.branch_names())  # ["experiment_a", "experiment_b"]
print(graph.has_branch("experiment_a"))  # True

Variants enable config sweeps and hyperparameter tuning:

# Python - Create variants with different learning rates
def learning_rate_gen(i):
    rates = [0.001, 0.01, 0.1]
    return gs.PortData(rates[i])

branches = graph.create_variants(
    name_prefix="lr",
    count=3,
    param_name="learning_rate",
    variant_function=learning_rate_gen,
    parallel=True  # Enable parallel execution
)
# Creates: lr_0 (0.001), lr_1 (0.01), lr_2 (0.1)
// Rust - Same functionality
use graph_sp::core::{VariantConfig, VariantFunction};

let variant_fn: VariantFunction = Arc::new(|i: usize| {
    PortData::Float((i as f64 + 1.0) * 0.01)
});

let config = VariantConfig::new("lr", 3, "learning_rate", variant_fn);
let branches = graph.create_variants(config)?;

Merge combines outputs from multiple branches:

# Python - Merge outputs from variant branches
graph.merge_branches(
    node_id="select_best",
    branches=variant_branches,
    port="accuracy"  # Merges to "accuracies" (pluralized)
)

# Custom merge function (e.g., average)
def compute_average(values):
    return gs.PortData(sum(values) / len(values))

graph.merge_branches(
    node_id="summarize",
    branches=variant_branches,
    port="score",
    merge_function=compute_average
)
// Rust - Same functionality
use graph_sp::core::MergeConfig;

let merge_config = MergeConfig::new(
    vec!["branch_a".to_string(), "branch_b".to_string()],
    "result".to_string()
);
graph.merge("merge_node", merge_config)?;

// Custom merge function
let max_fn = Arc::new(|inputs: Vec<&PortData>| -> Result<PortData> {
    let max = inputs.iter()
        .filter_map(|d| if let PortData::Int(v) = d { Some(*v) } else { None })
        .max()
        .unwrap_or(0);
    Ok(PortData::Int(max))
});

let config = MergeConfig::new(branches, "score".to_string())
    .with_merge_fn(max_fn);

Nested Variants create cartesian products:

// 2 learning rates × 3 batch sizes = 6 total configurations
let lr_config = VariantConfig::new("lr", 2, "learning_rate", lr_fn);
let lr_branches = graph.create_variants(lr_config)?;

for lr_branch in &lr_branches {
    let branch = graph.get_branch_mut(lr_branch)?;
    let batch_config = VariantConfig::new("batch", 3, "batch_size", batch_fn);
    branch.create_variants(batch_config)?;
}

Examples

Python Examples

Located in python_examples/:

  • simple_pipeline.py: Basic 3-node pipeline with implicit port mapping
  • parallel_execution.py: Fan-out/fan-in pattern with 3 parallel branches
  • variants_demo.py: Hyperparameter sweeps, custom merge, nested variants
  • port_mapping_demo.py: ⭐ Explicit port mapping (broadcast vs implementation names)
  • complex_objects.py: Nested objects, JSON, and lists
  • branching_example.py: Branch creation and management
  • All other examples: Use simple implicit port mapping

Note: Most examples use implicit port mapping where port names and function parameters match.
See port_mapping_demo.py for the one example showing explicit broadcast/implementation name separation.

Run an example:

cd python_examples
python simple_pipeline.py
python variants_demo.py

Rust Examples

Located in examples/:

  • simple_pipeline.rs: 4-node data processing pipeline with implicit ports
  • parallel_execution.rs: Fan-out/fan-in pattern with performance analysis
  • branching_and_variants.rs: Comprehensive demo of branches, merge, and variants
  • all_features_demo.rs: ⭐ Shows both implicit and explicit port mapping
  • complex_objects.rs: All PortData types with nested structures

Note: Most examples use Port::simple("name") for implicit mapping.
See all_features_demo.rs for examples using Port::new("broadcast", "impl") for explicit mapping.

Run an example:

cargo run --example simple_pipeline
cargo run --example parallel_execution
cargo run --example branching_and_variants

Performance

Measured with 3-branch parallel execution example:

Metric Sequential Parallel Improvement
Rust 900ms 500ms 44% faster
Python 900ms 502ms 44% faster

The executor identifies dependency levels and executes all independent nodes concurrently using tokio::task::spawn_blocking.

Architecture

Core Components

  1. Data Model (src/core/data.rs)

    • PortData: Enum for all supported types
    • GraphData: HashMap for port-to-port storage
    • Support for nested structures via recursive variants
  2. Graph (src/core/graph.rs)

    • petgraph-backed DAG representation
    • Topological sorting and cycle detection
    • Type-safe port connections
  3. Executor (src/executor/mod.rs)

    • Dependency-level grouping
    • Concurrent execution with tokio::task::spawn_blocking
    • Automatic concurrency management
  4. Inspector (src/inspector/mod.rs)

    • Graph statistics (depth, width, sources, sinks)
    • Mermaid diagram generation
    • Optimization suggestions
  5. Python Bindings (src/python/mod.rs)

    • PyO3-based wrappers
    • GIL-aware parallel execution
    • Automatic type conversion

Building & Testing

Rust

# Run tests
cargo test

# Run with all features
cargo test --all-features

# Build release
cargo build --release

# Run examples
cargo run --example simple_pipeline

Python

# Install development dependencies
pip install maturin pytest

# Build Python bindings
maturin develop --features python

# Run Python examples
python python_examples/simple_pipeline.py

Build for Multiple Platforms

# Linux (using Docker for manylinux compatibility)
docker run --rm -v $(pwd):/io ghcr.io/pyo3/maturin build --release --features python

# macOS
maturin build --release --features python

# Windows
maturin build --release --features python

Documentation

  • Python API: See python_examples/README.md for detailed Python usage
  • Rust API: Run cargo doc --open for full API documentation
  • Port Data Types: See docs/PORT_DATA_TYPES.md for supported types

Publishing

PyPI

Wheels are automatically built and published to PyPI on version tags:

git tag v0.1.0
git push origin v0.1.0

Builds wheels for:

  • Linux (manylinux)
  • macOS (Intel & ARM)
  • Windows

Crates.io

cargo publish

Contributing

Contributions welcome! Please:

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass (cargo test --all-features)
  5. Submit a pull request

License

MIT License - see LICENSE file for details

Roadmap

  • True parallel execution
  • Python bindings with PyPI distribution
  • Mermaid diagram generation
  • Comprehensive examples
  • Distributed execution support
  • Graph serialization/deserialization
  • WebAssembly support
  • Real-time monitoring dashboard
  • Advanced optimization algorithms