graph-flow
A high-performance, type-safe framework for building multi-agent workflow systems in Rust.
Features
- Type-Safe Workflows: Compile-time guarantees for workflow correctness
- Flexible Execution: Step-by-step, batch, or mixed execution modes
- Built-in Persistence: PostgreSQL and in-memory storage backends
- LLM Integration: Optional integration with Rig for AI agent capabilities
- Human-in-the-Loop: Natural workflow interruption and resumption
- Async/Await Native: Built from the ground up for async Rust
- Parallel Blocks (FanOutTask): Run multiple tasks concurrently inside a single node
Quick Start
Add to your Cargo.toml:
[]
= "0.2"
# For LLM integration
= { = "0.2", = ["rig"] }
Basic Example
use ;
use async_trait;
use Arc;
// Define a simple greeting task
;
async
Core API Reference
Tasks - The Building Blocks
Tasks implement the core Task trait and define the units of work in your workflow:
Basic Task Implementation
use ;
use async_trait;
Task with Status Messages
;
NextAction - Controlling Flow
The NextAction enum controls how your workflow progresses:
// Continue to next task, but pause execution (step-by-step mode)
Ok
// Continue and execute the next task immediately (continuous mode)
Ok
// Wait for user input before continuing
Ok
// Jump to a specific task
Ok
// Go back to the previous task
Ok
// End the workflow
Ok
// Convenience methods
move_to_next // NextAction::Continue
move_to_next_direct // NextAction::ContinueAndExecute
Context - State Management
The Context provides thread-safe state sharing across tasks:
Basic Context Operations
// Setting values
context.set.await;
context.set.await;
context.set.await;
// Getting values
let value: = context.get.await;
let number: = context.get.await;
let complex: = context.get.await;
// Synchronous operations (useful in edge conditions)
context.set_sync;
let sync_value: = context.get_sync;
// Removing values
let removed: = context.remove.await;
// Clearing all data (preserves chat history)
context.clear.await;
Chat History Management
// Adding messages
context.add_user_message.await;
context.add_assistant_message.await;
context.add_system_message.await;
// Getting chat history
let history = context.get_chat_history.await;
let all_messages = context.get_all_messages.await;
let last_5 = context.get_last_messages.await;
// Chat history info
let count = context.chat_history_len.await;
let is_empty = context.is_chat_history_empty.await;
// Clear chat history
context.clear_chat_history.await;
// Context with message limits
let context = with_max_chat_messages;
LLM Integration (with rig feature)
Graph Building
Create complex workflows using the GraphBuilder:
Linear Workflow
let graph = new
.add_task
.add_task
.add_task
.add_edge // task1 -> task2
.add_edge // task2 -> task3
.build;
Conditional Workflow
let graph = new
.add_task
.add_task
.add_task
.add_task
.add_conditional_edge
.add_edge
.add_edge
.build;
Complex Branching
let graph = new
.add_task
.add_task
.add_task
.add_task
.add_task
.add_task
// Initial flow
.add_edge
// Validation branches
.add_conditional_edge
// Processing branches
.add_conditional_edge
// Retry logic
.add_conditional_edge
.set_start_task
.build;
Execution Patterns
Step-by-Step Execution
Best for interactive applications where you want control between each step:
let flow_runner = new;
loop
Continuous Execution
For tasks that should run automatically until completion:
// Tasks use NextAction::ContinueAndExecute
;
// Single call executes until completion or interruption
let result = flow_runner.run.await?;
Mixed Execution
Combine both patterns in the same workflow:
;
Storage Backends
In-Memory Storage (Development)
use InMemorySessionStorage;
let storage = new;
// Create and save a session
let session = new_from_task;
session.context.set.await;
storage.save.await?;
// Retrieve and use
let session = storage.get.await?.unwrap;
let data: String = session.context.get.await.unwrap;
PostgreSQL Storage (Production)
use PostgresSessionStorage;
// Connect to database
let storage = new;
// Works the same as in-memory
let session = new_from_task;
storage.save.await?;
Advanced Examples
Multi-Agent Conversation System
use *;
// Build multi-agent workflow
let analyst = new;
let reviewer = new;
let graph = new
.add_task
.add_task
.add_conditional_edge
.build;
Error Handling and Recovery
Dynamic Task Selection
;
Performance and Best Practices
Efficient Context Usage
// ✅ Good: Batch context operations
context.set.await;
context.set.await;
context.set.await;
// ✅ Good: Use sync methods in edge conditions
.add_conditional_edge
// ✅ Good: Limit chat history size for long conversations
let context = with_max_chat_messages;
Memory Management
// ✅ Good: Reuse Arc references
let shared_task = new;
let graph = new
.add_task // Clone the Arc, not the task
.build;
// ✅ Good: Clear unused context data
context.remove.await;
Error Handling
// ✅ Good: Proper error propagation
async
Features
Default Features
The crate works out of the box with basic workflow capabilities.
rig Feature
Enables LLM integration through the Rig crate:
[]
= { = "0.2", = ["rig"] }
Testing Your Workflows
Migration from 0.1.x
Context::get_rig_messages()replaces manual message conversionTaskResult::new_with_status()adds debugging supportFlowRunnerprovides simplified session management- PostgreSQL storage is now more robust with connection pooling
Project Structure
This section describes the purpose and contents of each file in the graph-flow crate:
Source Files (src/)
lib.rs
The main library entry point that:
- Defines the crate's public API and exports commonly used types
- Contains comprehensive module-level documentation with examples
- Provides a complete Quick Start guide demonstrating the basic workflow
- Includes integration tests for graph execution and storage functionality
Public re-exports:
Context,ChatHistory,MessageRole,SerializableMessageGraphError,ResultExecutionResult,ExecutionStatus,Graph,GraphBuilderFlowRunnerGraphStorage,InMemoryGraphStorage,InMemorySessionStorage,Session,SessionStoragePostgresSessionStorageNextAction,Task,TaskResult
context.rs
Context and state management for workflows:
- Provides both async and sync accessor methods for different use cases
- Optional Rig integration for LLM message format conversion (behind
rigfeature flag) - Full serialization/deserialization support for persistence
Public types:
Context: Thread-safe state container usingArc<DashMap>for data storageChatHistory: Specialized container for conversation management with automatic message pruningSerializableMessage: Unified message format with role-based typing (User/Assistant/System)MessageRole: Enum defining message sender types (User,Assistant,System)
error.rs
Centralized error handling:
- Includes variants for task execution, storage, session management, and validation errors
- Uses
thiserrorfor ergonomic error handling with descriptive messages
Public types:
GraphError: Comprehensive error enum with variants:TaskExecutionFailed(String)GraphNotFound(String)InvalidEdge(String)TaskNotFound(String)ContextError(String)StorageError(String)SessionNotFound(String)Other(anyhow::Error)
Result<T>: Type alias forstd::result::Result<T, GraphError>
graph.rs
Core graph execution engine:
- Supports conditional branching, task timeouts, and recursive execution
- Session-aware execution that preserves state between calls
- Automatic task validation and orphaned task detection
Public types:
Graph: Main workflow orchestrator with task execution and flow controlGraphBuilder: Fluent API for constructing workflows with validationEdge: Represents connections between tasks with optional condition functionsExecutionResult: Contains response and execution statusExecutionStatus: Enum indicating workflow state:Paused { next_task_id: String }WaitingForInputCompletedError(String)
EdgeCondition: Type alias for condition functions
runner.rs
High-level workflow execution wrapper:
- Designed for interactive applications and web services
- Handles session persistence automatically
- Optimized for step-by-step execution with minimal overhead
- Extensive documentation with usage patterns for different architectures
- Error handling with automatic session rollback on failures
Public types:
FlowRunner: Convenience wrapper implementing the load → execute → save pattern
storage.rs
Session and graph persistence abstractions:
- Thread-safe implementations using
Arc<DashMap>for concurrent access
Public types:
Session: Workflow state container with id, current task, and contextSessionStoragetrait: Abstract interface for session persistenceGraphStoragetrait: Abstract interface for graph persistenceInMemorySessionStorage: Fast in-memory implementation for development/testingInMemoryGraphStorage: In-memory graph storage for development
storage_postgres.rs
Production-ready PostgreSQL storage backend:
- Automatic database migration with proper schema creation
- Connection pooling for high-performance concurrent access
- JSONB storage for efficient context serialization
- Optimistic concurrency control with timestamp-based conflict resolution
- Comprehensive error handling with database-specific error mapping
Public types:
PostgresSessionStorage: Robust PostgreSQL implementation ofSessionStorage
task.rs
Task definition and execution control:
- Supports both simple and complex task implementations
- Automatic task ID generation using type names with override capability
- Extensive examples showing different task patterns and use cases
Public types:
Tasktrait: Core interface that all workflow steps must implementTaskResult: Return type containing response and flow control informationNextAction: Enum controlling workflow progression:Continue- Step-by-step executionContinueAndExecute- Continuous executionGoTo(String)- Jump to specific taskGoBack- Go to previous taskEnd- Terminate workflowWaitForInput- Pause for user input
Configuration Files
Cargo.toml
Package configuration defining:
- Crate metadata (name, version, description, authors)
- Dependencies with feature flags (
rigfor LLM integration) - Feature definitions and optional dependencies
- Workspace configuration if part of a larger project
README.md
Comprehensive documentation including:
- Feature overview and quick start guide
- Complete API reference with examples
- Advanced usage patterns and best practices
- Performance optimization guidelines
- Migration guides and troubleshooting information
Each file is designed with a single responsibility and clear interfaces, making the codebase maintainable and extensible. The modular architecture allows users to leverage only the components they need while providing full-featured workflow capabilities out of the box.
License
MIT
Parallel Execution with FanOutTask
FanOutTask is a composite task that runs multiple child tasks concurrently, aggregates their outputs into the shared Context, and then continues the graph. It is the simplest way to add parallelism without changing the graph engine.
Key properties:
- Children share the same
Context(concurrent reads/writes are supported). To avoid key collisions,FanOutTaskstores each child’s outputs under a prefixed key by default:fanout.<child_id>.<field>. - Children’s
NextActionis ignored (they act as units of work). The control flow is decided by theFanOutTaskitself, which returnsNextAction::Continueby default. - If any child fails, the whole
FanOutTaskfails.
Basic example:
use ;
use async_trait;
use Arc;
; ; ; ;
let prepare: = new;
let child_a: = new;
let child_b: = new;
let fanout = new;
let consume: = new;
let graph = new.add_task
.add_task
.add_task
.add_edge
.add_edge
.build;
See a runnable example at graph-flow/examples/fanout_basic.rs.