graph-flow 0.3.0

A high-performance, type-safe framework for building multi-agent workflow systems in Rust
Documentation

graph-flow

A high-performance, type-safe framework for building multi-agent workflow systems in Rust.

Features

  • Type-Safe Workflows: Compile-time guarantees for workflow correctness
  • Flexible Execution: Step-by-step, batch, or mixed execution modes
  • Built-in Persistence: PostgreSQL and in-memory storage backends
  • LLM Integration: Optional integration with Rig for AI agent capabilities
  • Human-in-the-Loop: Natural workflow interruption and resumption
  • Async/Await Native: Built from the ground up for async Rust

Quick Start

Add to your Cargo.toml:

[dependencies]
graph-flow = "0.2"

# For LLM integration
graph-flow = { version = "0.2", features = ["rig"] }

Basic Example

use graph_flow::{Context, Task, TaskResult, NextAction, GraphBuilder, FlowRunner, InMemorySessionStorage, Session};
use async_trait::async_trait;
use std::sync::Arc;

// Define a simple greeting task
struct HelloTask;

#[async_trait]
impl Task for HelloTask {
    fn id(&self) -> &str {
        "hello_task"
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        let name: String = context.get("name").await.unwrap_or("World".to_string());
        let greeting = format!("Hello, {}! How are you today?", name);
        
        // Store the greeting in context for other tasks
        context.set("greeting", greeting.clone()).await;
        
        Ok(TaskResult::new(Some(greeting), NextAction::Continue))
    }
}

#[tokio::main]
async fn main() -> graph_flow::Result<()> {
    // Build the graph
    let hello_task = Arc::new(HelloTask);
    let graph = Arc::new(
        GraphBuilder::new("greeting_workflow")
            .add_task(hello_task.clone())
            .build()
    );

    // Set up session storage and runner
    let session_storage = Arc::new(InMemorySessionStorage::new());
    let flow_runner = FlowRunner::new(graph.clone(), session_storage.clone());
    
    // Create a session with initial data
    let session = Session::new_from_task("user_123".to_string(), hello_task.id());
    session.context.set("name", "Alice".to_string()).await;
    session_storage.save(session).await?;
    
    // Execute the workflow
    let result = flow_runner.run("user_123").await?;
    println!("Response: {:?}", result.response);
    
    Ok(())
}

Core API Reference

Tasks - The Building Blocks

Tasks implement the core Task trait and define the units of work in your workflow:

Basic Task Implementation

use graph_flow::{Task, TaskResult, NextAction, Context};
use async_trait::async_trait;

struct DataProcessingTask {
    name: String,
}

#[async_trait]
impl Task for DataProcessingTask {
    fn id(&self) -> &str {
        &self.name
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        // Get input data from context
        let input: String = context.get("user_input").await.unwrap_or_default();
        
        // Process the data
        let processed = format!("Processed: {}", input.to_uppercase());
        
        // Store result for next task
        context.set("processed_data", processed.clone()).await;
        
        // Return result with next action
        Ok(TaskResult::new(
            Some(format!("Data processed: {}", processed)),
            NextAction::Continue
        ))
    }
}

Task with Status Messages

struct ValidationTask;

#[async_trait]
impl Task for ValidationTask {
    fn id(&self) -> &str {
        "validator"
    }

    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        let data: Option<String> = context.get("processed_data").await;
        
        match data {
            Some(data) if data.len() > 10 => {
                Ok(TaskResult::new_with_status(
                    Some("Data validation passed".to_string()),
                    NextAction::Continue,
                    Some("Data meets minimum length requirement".to_string())
                ))
            }
            Some(_) => {
                Ok(TaskResult::new_with_status(
                    Some("Data validation failed - too short".to_string()),
                    NextAction::WaitForInput,
                    Some("Waiting for user to provide more data".to_string())
                ))
            }
            None => {
                Ok(TaskResult::new_with_status(
                    Some("No data found to validate".to_string()),
                    NextAction::GoTo("data_input".to_string()),
                    Some("Redirecting to data input task".to_string())
                ))
            }
        }
    }
}

NextAction - Controlling Flow

The NextAction enum controls how your workflow progresses:

// Continue to next task, but pause execution (step-by-step mode)
Ok(TaskResult::new(Some("Step completed".to_string()), NextAction::Continue))

// Continue and execute the next task immediately (continuous mode)
Ok(TaskResult::new(Some("Moving forward".to_string()), NextAction::ContinueAndExecute))

// Wait for user input before continuing
Ok(TaskResult::new(Some("Need more info".to_string()), NextAction::WaitForInput))

// Jump to a specific task
Ok(TaskResult::new(Some("Redirecting".to_string()), NextAction::GoTo("specific_task".to_string())))

// Go back to the previous task
Ok(TaskResult::new(Some("Going back".to_string()), NextAction::GoBack))

// End the workflow
Ok(TaskResult::new(Some("All done!".to_string()), NextAction::End))

// Convenience methods
TaskResult::move_to_next()        // NextAction::Continue
TaskResult::move_to_next_direct() // NextAction::ContinueAndExecute

Context - State Management

The Context provides thread-safe state sharing across tasks:

Basic Context Operations

// Setting values
context.set("key", "value").await;
context.set("number", 42).await;
context.set("complex_data", MyStruct { field: "value" }).await;

// Getting values
let value: Option<String> = context.get("key").await;
let number: Option<i32> = context.get("number").await;
let complex: Option<MyStruct> = context.get("complex_data").await;

// Synchronous operations (useful in edge conditions)
context.set_sync("sync_key", "sync_value");
let sync_value: Option<String> = context.get_sync("sync_key");

// Removing values
let removed: Option<serde_json::Value> = context.remove("key").await;

// Clearing all data (preserves chat history)
context.clear().await;

Chat History Management

// Adding messages
context.add_user_message("Hello, assistant!".to_string()).await;
context.add_assistant_message("Hello! How can I help you?".to_string()).await;
context.add_system_message("System: Session started".to_string()).await;

// Getting chat history
let history = context.get_chat_history().await;
let all_messages = context.get_all_messages().await;
let last_5 = context.get_last_messages(5).await;

// Chat history info
let count = context.chat_history_len().await;
let is_empty = context.is_chat_history_empty().await;

// Clear chat history
context.clear_chat_history().await;

// Context with message limits
let context = Context::with_max_chat_messages(100);

LLM Integration (with rig feature)

#[cfg(feature = "rig")]
{
    // Get messages in rig format for LLM calls
    let rig_messages = context.get_rig_messages().await;
    let last_10_for_llm = context.get_last_rig_messages(10).await;
    
    // Use with rig's completion API
    // let response = agent.completion(&rig_messages).await?;
}

Graph Building

Create complex workflows using the GraphBuilder:

Linear Workflow

let graph = GraphBuilder::new("linear_workflow")
    .add_task(task1.clone())
    .add_task(task2.clone())
    .add_task(task3.clone())
    .add_edge(task1.id(), task2.id())  // task1 -> task2
    .add_edge(task2.id(), task3.id())  // task2 -> task3
    .build();

Conditional Workflow

let graph = GraphBuilder::new("conditional_workflow")
    .add_task(input_task.clone())
    .add_task(process_a.clone())
    .add_task(process_b.clone())
    .add_task(final_task.clone())
    .add_conditional_edge(
        input_task.id(),
        |ctx| ctx.get_sync::<String>("user_type").unwrap_or_default() == "premium",
        process_a.id(),    // if premium user
        process_b.id(),    // if regular user
    )
    .add_edge(process_a.id(), final_task.id())
    .add_edge(process_b.id(), final_task.id())
    .build();

Complex Branching

let graph = GraphBuilder::new("complex_workflow")
    .add_task(start_task.clone())
    .add_task(validation_task.clone())
    .add_task(processing_task.clone())
    .add_task(error_handler.clone())
    .add_task(success_task.clone())
    .add_task(retry_task.clone())
    // Initial flow
    .add_edge(start_task.id(), validation_task.id())
    // Validation branches
    .add_conditional_edge(
        validation_task.id(),
        |ctx| ctx.get_sync::<bool>("is_valid").unwrap_or(false),
        processing_task.id(),  // valid -> process
        error_handler.id(),    // invalid -> error
    )
    // Processing branches
    .add_conditional_edge(
        processing_task.id(),
        |ctx| ctx.get_sync::<bool>("success").unwrap_or(false),
        success_task.id(),     // success -> done
        retry_task.id(),       // failure -> retry
    )
    // Retry logic
    .add_conditional_edge(
        retry_task.id(),
        |ctx| ctx.get_sync::<i32>("retry_count").unwrap_or(0) < 3,
        validation_task.id(),  // retry -> validate again
        error_handler.id(),    // max retries -> error
    )
    .set_start_task(start_task.id())
    .build();

Execution Patterns

Step-by-Step Execution

Best for interactive applications where you want control between each step:

let flow_runner = FlowRunner::new(graph, session_storage);

loop {
    let result = flow_runner.run(&session_id).await?;
    
    match result.status {
        ExecutionStatus::Completed => {
            println!("Workflow completed: {:?}", result.response);
            break;
        }
        ExecutionStatus::WaitingForInput => {
            println!("Waiting for input: {:?}", result.response);
            // Get user input and update context
            // context.set("user_input", user_input).await;
            continue;
        }
        ExecutionStatus::Paused { next_task_id } => {
            println!("Paused at {}: {:?}", next_task_id, result.response);
            // Optionally do something before next step
            continue;
        }
        ExecutionStatus::Error(e) => {
            eprintln!("Error: {}", e);
            break;
        }
    }
}

Continuous Execution

For tasks that should run automatically until completion:

// Tasks use NextAction::ContinueAndExecute
struct AutoTask;

#[async_trait]
impl Task for AutoTask {
    fn id(&self) -> &str { "auto_task" }
    
    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        // Do work...
        Ok(TaskResult::new(
            Some("Work done".to_string()),
            NextAction::ContinueAndExecute  // Continue automatically
        ))
    }
}

// Single call executes until completion or interruption
let result = flow_runner.run(&session_id).await?;

Mixed Execution

Combine both patterns in the same workflow:

struct InteractiveTask;

#[async_trait]
impl Task for InteractiveTask {
    fn id(&self) -> &str { "interactive" }
    
    async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
        let needs_input: bool = context.get("needs_user_input").await.unwrap_or(false);
        
        if needs_input {
            Ok(TaskResult::new(
                Some("Please provide input".to_string()),
                NextAction::WaitForInput  // Stop and wait
            ))
        } else {
            Ok(TaskResult::new(
                Some("Processing automatically".to_string()),
                NextAction::ContinueAndExecute  // Continue automatically
            ))
        }
    }
}

Storage Backends

In-Memory Storage (Development)

use graph_flow::InMemorySessionStorage;

let storage = Arc::new(InMemorySessionStorage::new());

// Create and save a session
let session = Session::new_from_task("session_1".to_string(), "start_task");
session.context.set("initial_data", "value").await;
storage.save(session).await?;

// Retrieve and use
let session = storage.get("session_1").await?.unwrap();
let data: String = session.context.get("initial_data").await.unwrap();

PostgreSQL Storage (Production)

use graph_flow::PostgresSessionStorage;

// Connect to database
let storage = Arc::new(
    PostgresSessionStorage::connect(&database_url).await?
);

// Works the same as in-memory
let session = Session::new_from_task("session_1".to_string(), "start_task");
storage.save(session).await?;

Advanced Examples

Multi-Agent Conversation System

use graph_flow::*;

struct AgentTask {
    agent_name: String,
    system_prompt: String,
}

#[async_trait]
impl Task for AgentTask {
    fn id(&self) -> &str {
        &self.agent_name
    }

    async fn run(&self, context: Context) -> Result<TaskResult> {
        // Get conversation history
        let messages = context.get_all_messages().await;
        
        // Add system context if first message from this agent
        if messages.is_empty() {
            context.add_system_message(self.system_prompt.clone()).await;
        }
        
        // Get latest user message
        let user_input: Option<String> = context.get("user_input").await;
        
        if let Some(input) = user_input {
            context.add_user_message(input).await;
            
            // Here you would integrate with your LLM
            let response = format!("[{}] Processed: {}", self.agent_name, "response");
            context.add_assistant_message(response.clone()).await;
            
            // Store for next agent or user
            context.set("last_agent_response", response.clone()).await;
            
            Ok(TaskResult::new(Some(response), NextAction::Continue))
        } else {
            Ok(TaskResult::new(
                Some("Waiting for user input".to_string()),
                NextAction::WaitForInput
            ))
        }
    }
}

// Build multi-agent workflow
let analyst = Arc::new(AgentTask {
    agent_name: "analyst".to_string(),
    system_prompt: "You are a data analyst.".to_string(),
});

let reviewer = Arc::new(AgentTask {
    agent_name: "reviewer".to_string(),
    system_prompt: "You review and critique analysis.".to_string(),
});

let graph = GraphBuilder::new("multi_agent_chat")
    .add_task(analyst.clone())
    .add_task(reviewer.clone())
    .add_conditional_edge(
        analyst.id(),
        |ctx| ctx.get_sync::<bool>("needs_review").unwrap_or(true),
        reviewer.id(),
        analyst.id(), // Loop back for more analysis
    )
    .build();

Error Handling and Recovery

struct ResilientTask {
    max_retries: usize,
}

#[async_trait]
impl Task for ResilientTask {
    fn id(&self) -> &str {
        "resilient_task"
    }

    async fn run(&self, context: Context) -> Result<TaskResult> {
        let retry_count: usize = context.get("retry_count").await.unwrap_or(0);
        
        // Simulate work that might fail
        let success = retry_count > 2; // Succeed after 3 attempts
        
        if success {
            context.set("retry_count", 0).await; // Reset for next time
            Ok(TaskResult::new(
                Some("Task completed successfully".to_string()),
                NextAction::Continue
            ))
        } else if retry_count < self.max_retries {
            context.set("retry_count", retry_count + 1).await;
            Ok(TaskResult::new_with_status(
                Some(format!("Attempt {} failed, retrying...", retry_count + 1)),
                NextAction::GoTo("resilient_task".to_string()), // Retry self
                Some(format!("Retry {}/{}", retry_count + 1, self.max_retries))
            ))
        } else {
            Ok(TaskResult::new(
                Some("Task failed after maximum retries".to_string()),
                NextAction::GoTo("error_handler".to_string())
            ))
        }
    }
}

Dynamic Task Selection

struct RouterTask;

#[async_trait]
impl Task for RouterTask {
    fn id(&self) -> &str {
        "router"
    }

    async fn run(&self, context: Context) -> Result<TaskResult> {
        let user_type: String = context.get("user_type").await.unwrap_or_default();
        let urgency: String = context.get("urgency").await.unwrap_or_default();
        
        let next_task = match (user_type.as_str(), urgency.as_str()) {
            ("premium", "high") => "priority_handler",
            ("premium", _) => "premium_handler",
            (_, "high") => "urgent_handler",
            _ => "standard_handler",
        };
        
        Ok(TaskResult::new(
            Some(format!("Routing to {}", next_task)),
            NextAction::GoTo(next_task.to_string())
        ))
    }
}

Performance and Best Practices

Efficient Context Usage

// ✅ Good: Batch context operations
context.set("key1", value1).await;
context.set("key2", value2).await;
context.set("key3", value3).await;

// ✅ Good: Use sync methods in edge conditions  
.add_conditional_edge(
    "task1",
    |ctx| ctx.get_sync::<bool>("condition").unwrap_or(false),
    "task2",
    "task3"
)

// ✅ Good: Limit chat history size for long conversations
let context = Context::with_max_chat_messages(100);

Memory Management

// ✅ Good: Reuse Arc references
let shared_task = Arc::new(MyTask::new());
let graph = GraphBuilder::new("workflow")
    .add_task(shared_task.clone())  // Clone the Arc, not the task
    .build();

// ✅ Good: Clear unused context data
context.remove("large_temporary_data").await;

Error Handling

// ✅ Good: Proper error propagation
async fn run(&self, context: Context) -> Result<TaskResult> {
    let data = context.get("required_data").await
        .ok_or_else(|| GraphError::TaskExecutionFailed(
            "Missing required data".to_string()
        ))?;
    
    // Process data...
    Ok(TaskResult::new(Some("Success".to_string()), NextAction::Continue))
}

Features

Default Features

The crate works out of the box with basic workflow capabilities.

rig Feature

Enables LLM integration through the Rig crate:

[dependencies]
graph-flow = { version = "0.2", features = ["rig"] }
#[cfg(feature = "rig")]
{
    // Get messages formatted for LLM consumption
    let messages = context.get_rig_messages().await;
    let recent = context.get_last_rig_messages(10).await;
}

Testing Your Workflows

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_workflow() {
        let task = Arc::new(MyTask::new());
        let graph = Arc::new(
            GraphBuilder::new("test")
                .add_task(task.clone())
                .build()
        );
        
        let storage = Arc::new(InMemorySessionStorage::new());
        let runner = FlowRunner::new(graph, storage.clone());
        
        // Create test session
        let session = Session::new_from_task("test_session".to_string(), task.id());
        session.context.set("test_input", "test_value").await;
        storage.save(session).await.unwrap();
        
        // Execute and verify
        let result = runner.run("test_session").await.unwrap();
        assert!(result.response.is_some());
        
        // Check context was updated
        let session = storage.get("test_session").await.unwrap().unwrap();
        let output: String = session.context.get("expected_output").await.unwrap();
        assert_eq!(output, "expected_value");
    }
}

Migration from 0.1.x

  • Context::get_rig_messages() replaces manual message conversion
  • TaskResult::new_with_status() adds debugging support
  • FlowRunner provides simplified session management
  • PostgreSQL storage is now more robust with connection pooling

Project Structure

This section describes the purpose and contents of each file in the graph-flow crate:

Source Files (src/)

lib.rs

The main library entry point that:

  • Defines the crate's public API and exports commonly used types
  • Contains comprehensive module-level documentation with examples
  • Provides a complete Quick Start guide demonstrating the basic workflow
  • Includes integration tests for graph execution and storage functionality

Public re-exports:

  • Context, ChatHistory, MessageRole, SerializableMessage
  • GraphError, Result
  • ExecutionResult, ExecutionStatus, Graph, GraphBuilder
  • FlowRunner
  • GraphStorage, InMemoryGraphStorage, InMemorySessionStorage, Session, SessionStorage
  • PostgresSessionStorage
  • NextAction, Task, TaskResult

context.rs

Context and state management for workflows:

  • Provides both async and sync accessor methods for different use cases
  • Optional Rig integration for LLM message format conversion (behind rig feature flag)
  • Full serialization/deserialization support for persistence

Public types:

  • Context: Thread-safe state container using Arc<DashMap> for data storage
  • ChatHistory: Specialized container for conversation management with automatic message pruning
  • SerializableMessage: Unified message format with role-based typing (User/Assistant/System)
  • MessageRole: Enum defining message sender types (User, Assistant, System)

error.rs

Centralized error handling:

  • Includes variants for task execution, storage, session management, and validation errors
  • Uses thiserror for ergonomic error handling with descriptive messages

Public types:

  • GraphError: Comprehensive error enum with variants:
    • TaskExecutionFailed(String)
    • GraphNotFound(String)
    • InvalidEdge(String)
    • TaskNotFound(String)
    • ContextError(String)
    • StorageError(String)
    • SessionNotFound(String)
    • Other(anyhow::Error)
  • Result<T>: Type alias for std::result::Result<T, GraphError>

graph.rs

Core graph execution engine:

  • Supports conditional branching, task timeouts, and recursive execution
  • Session-aware execution that preserves state between calls
  • Automatic task validation and orphaned task detection

Public types:

  • Graph: Main workflow orchestrator with task execution and flow control
  • GraphBuilder: Fluent API for constructing workflows with validation
  • Edge: Represents connections between tasks with optional condition functions
  • ExecutionResult: Contains response and execution status
  • ExecutionStatus: Enum indicating workflow state:
    • Paused { next_task_id: String }
    • WaitingForInput
    • Completed
    • Error(String)
  • EdgeCondition: Type alias for condition functions

runner.rs

High-level workflow execution wrapper:

  • Designed for interactive applications and web services
  • Handles session persistence automatically
  • Optimized for step-by-step execution with minimal overhead
  • Extensive documentation with usage patterns for different architectures
  • Error handling with automatic session rollback on failures

Public types:

  • FlowRunner: Convenience wrapper implementing the load → execute → save pattern

storage.rs

Session and graph persistence abstractions:

  • Thread-safe implementations using Arc<DashMap> for concurrent access

Public types:

  • Session: Workflow state container with id, current task, and context
  • SessionStorage trait: Abstract interface for session persistence
  • GraphStorage trait: Abstract interface for graph persistence
  • InMemorySessionStorage: Fast in-memory implementation for development/testing
  • InMemoryGraphStorage: In-memory graph storage for development

storage_postgres.rs

Production-ready PostgreSQL storage backend:

  • Automatic database migration with proper schema creation
  • Connection pooling for high-performance concurrent access
  • JSONB storage for efficient context serialization
  • Optimistic concurrency control with timestamp-based conflict resolution
  • Comprehensive error handling with database-specific error mapping

Public types:

  • PostgresSessionStorage: Robust PostgreSQL implementation of SessionStorage

task.rs

Task definition and execution control:

  • Supports both simple and complex task implementations
  • Automatic task ID generation using type names with override capability
  • Extensive examples showing different task patterns and use cases

Public types:

  • Task trait: Core interface that all workflow steps must implement
  • TaskResult: Return type containing response and flow control information
  • NextAction: Enum controlling workflow progression:
    • Continue - Step-by-step execution
    • ContinueAndExecute - Continuous execution
    • GoTo(String) - Jump to specific task
    • GoBack - Go to previous task
    • End - Terminate workflow
    • WaitForInput - Pause for user input

Configuration Files

Cargo.toml

Package configuration defining:

  • Crate metadata (name, version, description, authors)
  • Dependencies with feature flags (rig for LLM integration)
  • Feature definitions and optional dependencies
  • Workspace configuration if part of a larger project

README.md

Comprehensive documentation including:

  • Feature overview and quick start guide
  • Complete API reference with examples
  • Advanced usage patterns and best practices
  • Performance optimization guidelines
  • Migration guides and troubleshooting information

Each file is designed with a single responsibility and clear interfaces, making the codebase maintainable and extensible. The modular architecture allows users to leverage only the components they need while providing full-featured workflow capabilities out of the box.

License

MIT