graph-flow
A high-performance, type-safe framework for building multi-agent workflow systems in Rust.
Features
- Type-Safe Workflows: Compile-time guarantees for workflow correctness
- Flexible Execution: Step-by-step, batch, or mixed execution modes
- Built-in Persistence: PostgreSQL and in-memory storage backends
- LLM Integration: Optional integration with Rig for AI agent capabilities
- Human-in-the-Loop: Natural workflow interruption and resumption
- Async/Await Native: Built from the ground up for async Rust
Quick Start
Add to your Cargo.toml
:
[]
= "0.2"
# For LLM integration
= { = "0.2", = ["rig"] }
Basic Example
use ;
use async_trait;
use Arc;
// Define a simple greeting task
;
async
Core API Reference
Tasks - The Building Blocks
Tasks implement the core Task
trait and define the units of work in your workflow:
Basic Task Implementation
use ;
use async_trait;
Task with Status Messages
;
NextAction - Controlling Flow
The NextAction
enum controls how your workflow progresses:
// Continue to next task, but pause execution (step-by-step mode)
Ok
// Continue and execute the next task immediately (continuous mode)
Ok
// Wait for user input before continuing
Ok
// Jump to a specific task
Ok
// Go back to the previous task
Ok
// End the workflow
Ok
// Convenience methods
move_to_next // NextAction::Continue
move_to_next_direct // NextAction::ContinueAndExecute
Context - State Management
The Context
provides thread-safe state sharing across tasks:
Basic Context Operations
// Setting values
context.set.await;
context.set.await;
context.set.await;
// Getting values
let value: = context.get.await;
let number: = context.get.await;
let complex: = context.get.await;
// Synchronous operations (useful in edge conditions)
context.set_sync;
let sync_value: = context.get_sync;
// Removing values
let removed: = context.remove.await;
// Clearing all data (preserves chat history)
context.clear.await;
Chat History Management
// Adding messages
context.add_user_message.await;
context.add_assistant_message.await;
context.add_system_message.await;
// Getting chat history
let history = context.get_chat_history.await;
let all_messages = context.get_all_messages.await;
let last_5 = context.get_last_messages.await;
// Chat history info
let count = context.chat_history_len.await;
let is_empty = context.is_chat_history_empty.await;
// Clear chat history
context.clear_chat_history.await;
// Context with message limits
let context = with_max_chat_messages;
LLM Integration (with rig
feature)
Graph Building
Create complex workflows using the GraphBuilder
:
Linear Workflow
let graph = new
.add_task
.add_task
.add_task
.add_edge // task1 -> task2
.add_edge // task2 -> task3
.build;
Conditional Workflow
let graph = new
.add_task
.add_task
.add_task
.add_task
.add_conditional_edge
.add_edge
.add_edge
.build;
Complex Branching
let graph = new
.add_task
.add_task
.add_task
.add_task
.add_task
.add_task
// Initial flow
.add_edge
// Validation branches
.add_conditional_edge
// Processing branches
.add_conditional_edge
// Retry logic
.add_conditional_edge
.set_start_task
.build;
Execution Patterns
Step-by-Step Execution
Best for interactive applications where you want control between each step:
let flow_runner = new;
loop
Continuous Execution
For tasks that should run automatically until completion:
// Tasks use NextAction::ContinueAndExecute
;
// Single call executes until completion or interruption
let result = flow_runner.run.await?;
Mixed Execution
Combine both patterns in the same workflow:
;
Storage Backends
In-Memory Storage (Development)
use InMemorySessionStorage;
let storage = new;
// Create and save a session
let session = new_from_task;
session.context.set.await;
storage.save.await?;
// Retrieve and use
let session = storage.get.await?.unwrap;
let data: String = session.context.get.await.unwrap;
PostgreSQL Storage (Production)
use PostgresSessionStorage;
// Connect to database
let storage = new;
// Works the same as in-memory
let session = new_from_task;
storage.save.await?;
Advanced Examples
Multi-Agent Conversation System
use *;
// Build multi-agent workflow
let analyst = new;
let reviewer = new;
let graph = new
.add_task
.add_task
.add_conditional_edge
.build;
Error Handling and Recovery
Dynamic Task Selection
;
Performance and Best Practices
Efficient Context Usage
// ✅ Good: Batch context operations
context.set.await;
context.set.await;
context.set.await;
// ✅ Good: Use sync methods in edge conditions
.add_conditional_edge
// ✅ Good: Limit chat history size for long conversations
let context = with_max_chat_messages;
Memory Management
// ✅ Good: Reuse Arc references
let shared_task = new;
let graph = new
.add_task // Clone the Arc, not the task
.build;
// ✅ Good: Clear unused context data
context.remove.await;
Error Handling
// ✅ Good: Proper error propagation
async
Features
Default Features
The crate works out of the box with basic workflow capabilities.
rig
Feature
Enables LLM integration through the Rig crate:
[]
= { = "0.2", = ["rig"] }
Testing Your Workflows
Migration from 0.1.x
Context::get_rig_messages()
replaces manual message conversionTaskResult::new_with_status()
adds debugging supportFlowRunner
provides simplified session management- PostgreSQL storage is now more robust with connection pooling
Project Structure
This section describes the purpose and contents of each file in the graph-flow crate:
Source Files (src/
)
lib.rs
The main library entry point that:
- Defines the crate's public API and exports commonly used types
- Contains comprehensive module-level documentation with examples
- Provides a complete Quick Start guide demonstrating the basic workflow
- Includes integration tests for graph execution and storage functionality
Public re-exports:
Context
,ChatHistory
,MessageRole
,SerializableMessage
GraphError
,Result
ExecutionResult
,ExecutionStatus
,Graph
,GraphBuilder
FlowRunner
GraphStorage
,InMemoryGraphStorage
,InMemorySessionStorage
,Session
,SessionStorage
PostgresSessionStorage
NextAction
,Task
,TaskResult
context.rs
Context and state management for workflows:
- Provides both async and sync accessor methods for different use cases
- Optional Rig integration for LLM message format conversion (behind
rig
feature flag) - Full serialization/deserialization support for persistence
Public types:
Context
: Thread-safe state container usingArc<DashMap>
for data storageChatHistory
: Specialized container for conversation management with automatic message pruningSerializableMessage
: Unified message format with role-based typing (User/Assistant/System)MessageRole
: Enum defining message sender types (User
,Assistant
,System
)
error.rs
Centralized error handling:
- Includes variants for task execution, storage, session management, and validation errors
- Uses
thiserror
for ergonomic error handling with descriptive messages
Public types:
GraphError
: Comprehensive error enum with variants:TaskExecutionFailed(String)
GraphNotFound(String)
InvalidEdge(String)
TaskNotFound(String)
ContextError(String)
StorageError(String)
SessionNotFound(String)
Other(anyhow::Error)
Result<T>
: Type alias forstd::result::Result<T, GraphError>
graph.rs
Core graph execution engine:
- Supports conditional branching, task timeouts, and recursive execution
- Session-aware execution that preserves state between calls
- Automatic task validation and orphaned task detection
Public types:
Graph
: Main workflow orchestrator with task execution and flow controlGraphBuilder
: Fluent API for constructing workflows with validationEdge
: Represents connections between tasks with optional condition functionsExecutionResult
: Contains response and execution statusExecutionStatus
: Enum indicating workflow state:Paused { next_task_id: String }
WaitingForInput
Completed
Error(String)
EdgeCondition
: Type alias for condition functions
runner.rs
High-level workflow execution wrapper:
- Designed for interactive applications and web services
- Handles session persistence automatically
- Optimized for step-by-step execution with minimal overhead
- Extensive documentation with usage patterns for different architectures
- Error handling with automatic session rollback on failures
Public types:
FlowRunner
: Convenience wrapper implementing the load → execute → save pattern
storage.rs
Session and graph persistence abstractions:
- Thread-safe implementations using
Arc<DashMap>
for concurrent access
Public types:
Session
: Workflow state container with id, current task, and contextSessionStorage
trait: Abstract interface for session persistenceGraphStorage
trait: Abstract interface for graph persistenceInMemorySessionStorage
: Fast in-memory implementation for development/testingInMemoryGraphStorage
: In-memory graph storage for development
storage_postgres.rs
Production-ready PostgreSQL storage backend:
- Automatic database migration with proper schema creation
- Connection pooling for high-performance concurrent access
- JSONB storage for efficient context serialization
- Optimistic concurrency control with timestamp-based conflict resolution
- Comprehensive error handling with database-specific error mapping
Public types:
PostgresSessionStorage
: Robust PostgreSQL implementation ofSessionStorage
task.rs
Task definition and execution control:
- Supports both simple and complex task implementations
- Automatic task ID generation using type names with override capability
- Extensive examples showing different task patterns and use cases
Public types:
Task
trait: Core interface that all workflow steps must implementTaskResult
: Return type containing response and flow control informationNextAction
: Enum controlling workflow progression:Continue
- Step-by-step executionContinueAndExecute
- Continuous executionGoTo(String)
- Jump to specific taskGoBack
- Go to previous taskEnd
- Terminate workflowWaitForInput
- Pause for user input
Configuration Files
Cargo.toml
Package configuration defining:
- Crate metadata (name, version, description, authors)
- Dependencies with feature flags (
rig
for LLM integration) - Feature definitions and optional dependencies
- Workspace configuration if part of a larger project
README.md
Comprehensive documentation including:
- Feature overview and quick start guide
- Complete API reference with examples
- Advanced usage patterns and best practices
- Performance optimization guidelines
- Migration guides and troubleshooting information
Each file is designed with a single responsibility and clear interfaces, making the codebase maintainable and extensible. The modular architecture allows users to leverage only the components they need while providing full-featured workflow capabilities out of the box.
License
MIT