Expand description
§graph-flow
A high-performance, type-safe framework for building multi-agent workflow systems in Rust.
§Features
- Type-Safe Workflows: Compile-time guarantees for workflow correctness
- Flexible Execution: Step-by-step, batch, or mixed execution modes
- Built-in Persistence: PostgreSQL and in-memory storage backends
- LLM Integration: Optional integration with Rig for AI agent capabilities
- Human-in-the-Loop: Natural workflow interruption and resumption
- Async/Await Native: Built from the ground up for async Rust
§Quick Start
use graph_flow::{Context, Task, TaskResult, NextAction, GraphBuilder, FlowRunner, InMemorySessionStorage, Session, SessionStorage};
use async_trait::async_trait;
use std::sync::Arc;
// Define a task
struct HelloTask;
#[async_trait]
impl Task for HelloTask {
fn id(&self) -> &str {
"hello_task"
}
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
let name: String = context.get("name").await.unwrap_or("World".to_string());
let greeting = format!("Hello, {}!", name);
context.set("greeting", greeting.clone()).await;
Ok(TaskResult::new(Some(greeting), NextAction::Continue))
}
}
// Build the workflow
let hello_task = Arc::new(HelloTask);
let graph = Arc::new(
GraphBuilder::new("greeting_workflow")
.add_task(hello_task.clone())
.build()
);
// Set up session storage and runner
let session_storage = Arc::new(InMemorySessionStorage::new());
let flow_runner = FlowRunner::new(graph.clone(), session_storage.clone());
// Create and execute session
let session = Session::new_from_task("user_123".to_string(), hello_task.id());
session.context.set("name", "Alice".to_string()).await;
session_storage.save(session).await?;
let result = flow_runner.run("user_123").await?;
println!("Response: {:?}", result.response);§Core Concepts
§Tasks
Tasks are the building blocks of your workflow. They implement the Task trait:
use graph_flow::{Task, TaskResult, NextAction, Context};
use async_trait::async_trait;
struct MyTask;
#[async_trait]
impl Task for MyTask {
fn id(&self) -> &str {
"my_task"
}
async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
// Your task logic here
Ok(TaskResult::new(Some("Done!".to_string()), NextAction::End))
}
}§Context
The Context provides thread-safe state management across your workflow:
let context = Context::new();
// Store and retrieve data
context.set("key", "value").await;
let value: Option<String> = context.get("key").await;
// Chat history management
context.add_user_message("Hello!".to_string()).await;
context.add_assistant_message("Hi there!".to_string()).await;§Graph Building
Use GraphBuilder to create complex workflows:
let graph = GraphBuilder::new("my_workflow")
.add_task(task1.clone())
.add_task(task2.clone())
.add_task(task3.clone())
.add_edge(task1.id(), task2.id())
.add_conditional_edge(
task2.id(),
|ctx| ctx.get_sync::<bool>("condition").unwrap_or(false),
task3.id(), // if true
task1.id(), // if false
)
.build();§Execution
Use FlowRunner for convenient session-based execution:
let storage = Arc::new(InMemorySessionStorage::new());
let runner = FlowRunner::new(graph, storage.clone());
// Create session
let session = Session::new_from_task("session_id".to_string(), "start_task");
storage.save(session).await?;
// Execute workflow
let result = runner.run("session_id").await?;§Features
- Default: Core workflow functionality
rig: Enables LLM integration via the Rig crate
§Storage Backends
InMemorySessionStorage: For development and testingPostgresSessionStorage: For production use with PostgreSQL
Re-exports§
pub use context::ChatHistory;pub use context::Context;pub use context::MessageRole;pub use context::SerializableMessage;pub use error::GraphError;pub use error::Result;pub use graph::ExecutionResult;pub use graph::ExecutionStatus;pub use graph::Graph;pub use graph::GraphBuilder;pub use runner::FlowRunner;pub use storage::GraphStorage;pub use storage::InMemoryGraphStorage;pub use storage::InMemorySessionStorage;pub use storage::Session;pub use storage::SessionStorage;pub use storage_postgres::PostgresSessionStorage;pub use task::NextAction;pub use task::Task;pub use task::TaskResult;pub use fanout::FanOutTask;
Modules§
- context
- Context and state management for workflows.
- error
- fanout
- FanOutTask – a composite task that runs multiple child tasks in parallel
- graph
- runner
- FlowRunner – convenience wrapper that loads a session, executes exactly one graph step, and persists the updated session back to storage.
- storage
- storage_
postgres - task
- Task definition and execution control.