graph_flow/lib.rs
1//! # graph-flow
2//!
3//! A high-performance, type-safe framework for building multi-agent workflow systems in Rust.
4//!
5//! ## Features
6//!
7//! - **Type-Safe Workflows**: Compile-time guarantees for workflow correctness
8//! - **Flexible Execution**: Step-by-step, batch, or mixed execution modes
9//! - **Built-in Persistence**: PostgreSQL and in-memory storage backends
10//! - **LLM Integration**: Optional integration with Rig for AI agent capabilities
11//! - **Human-in-the-Loop**: Natural workflow interruption and resumption
12//! - **Async/Await Native**: Built from the ground up for async Rust
13//!
14//! ## Quick Start
15//!
16//! ```rust
17//! use graph_flow::{Context, Task, TaskResult, NextAction, GraphBuilder, FlowRunner, InMemorySessionStorage, Session, SessionStorage};
18//! use async_trait::async_trait;
19//! use std::sync::Arc;
20//!
21//! // Define a task
22//! struct HelloTask;
23//!
24//! #[async_trait]
25//! impl Task for HelloTask {
26//! fn id(&self) -> &str {
27//! "hello_task"
28//! }
29//!
30//! async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
31//! let name: String = context.get("name").await.unwrap_or("World".to_string());
32//! let greeting = format!("Hello, {}!", name);
33//!
34//! context.set("greeting", greeting.clone()).await;
35//! Ok(TaskResult::new(Some(greeting), NextAction::Continue))
36//! }
37//! }
38//!
39//! # #[tokio::main]
40//! # async fn main() -> graph_flow::Result<()> {
41//! // Build the workflow
42//! let hello_task = Arc::new(HelloTask);
43//! let graph = Arc::new(
44//! GraphBuilder::new("greeting_workflow")
45//! .add_task(hello_task.clone())
46//! .build()
47//! );
48//!
49//! // Set up session storage and runner
50//! let session_storage = Arc::new(InMemorySessionStorage::new());
51//! let flow_runner = FlowRunner::new(graph.clone(), session_storage.clone());
52//!
53//! // Create and execute session
54//! let session = Session::new_from_task("user_123".to_string(), hello_task.id());
55//! session.context.set("name", "Alice".to_string()).await;
56//! session_storage.save(session).await?;
57//!
58//! let result = flow_runner.run("user_123").await?;
59//! println!("Response: {:?}", result.response);
60//! # Ok(())
61//! # }
62//! ```
63//!
64//! ## Core Concepts
65//!
66//! ### Tasks
67//!
68//! Tasks are the building blocks of your workflow. They implement the [`Task`] trait:
69//!
70//! ```rust
71//! use graph_flow::{Task, TaskResult, NextAction, Context};
72//! use async_trait::async_trait;
73//!
74//! struct MyTask;
75//!
76//! #[async_trait]
77//! impl Task for MyTask {
78//! fn id(&self) -> &str {
79//! "my_task"
80//! }
81//!
82//! async fn run(&self, context: Context) -> graph_flow::Result<TaskResult> {
83//! // Your task logic here
84//! Ok(TaskResult::new(Some("Done!".to_string()), NextAction::End))
85//! }
86//! }
87//! ```
88//!
89//! ### Context
90//!
91//! The [`Context`] provides thread-safe state management across your workflow:
92//!
93//! ```rust
94//! # use graph_flow::Context;
95//! # #[tokio::main]
96//! # async fn main() {
97//! let context = Context::new();
98//!
99//! // Store and retrieve data
100//! context.set("key", "value").await;
101//! let value: Option<String> = context.get("key").await;
102//!
103//! // Chat history management
104//! context.add_user_message("Hello!".to_string()).await;
105//! context.add_assistant_message("Hi there!".to_string()).await;
106//! # }
107//! ```
108//!
109//! ### Graph Building
110//!
111//! Use [`GraphBuilder`] to create complex workflows:
112//!
113//! ```rust
114//! # use graph_flow::{GraphBuilder, Task, TaskResult, NextAction, Context};
115//! # use async_trait::async_trait;
116//! # use std::sync::Arc;
117//! # struct Task1; struct Task2; struct Task3;
118//! # #[async_trait] impl Task for Task1 { fn id(&self) -> &str { "task1" } async fn run(&self, _: Context) -> graph_flow::Result<TaskResult> { Ok(TaskResult::new(None, NextAction::End)) } }
119//! # #[async_trait] impl Task for Task2 { fn id(&self) -> &str { "task2" } async fn run(&self, _: Context) -> graph_flow::Result<TaskResult> { Ok(TaskResult::new(None, NextAction::End)) } }
120//! # #[async_trait] impl Task for Task3 { fn id(&self) -> &str { "task3" } async fn run(&self, _: Context) -> graph_flow::Result<TaskResult> { Ok(TaskResult::new(None, NextAction::End)) } }
121//! # let task1 = Arc::new(Task1); let task2 = Arc::new(Task2); let task3 = Arc::new(Task3);
122//! let graph = GraphBuilder::new("my_workflow")
123//! .add_task(task1.clone())
124//! .add_task(task2.clone())
125//! .add_task(task3.clone())
126//! .add_edge(task1.id(), task2.id())
127//! .add_conditional_edge(
128//! task2.id(),
129//! |ctx| ctx.get_sync::<bool>("condition").unwrap_or(false),
130//! task3.id(), // if true
131//! task1.id(), // if false
132//! )
133//! .build();
134//! ```
135//!
136//! ### Execution
137//!
138//! Use [`FlowRunner`] for convenient session-based execution:
139//!
140//! ```rust,no_run
141//! # use graph_flow::{FlowRunner, InMemorySessionStorage, Session, Graph, SessionStorage};
142//! # use std::sync::Arc;
143//! # #[tokio::main]
144//! # async fn main() -> graph_flow::Result<()> {
145//! # let graph = Arc::new(Graph::new("test"));
146//! let storage = Arc::new(InMemorySessionStorage::new());
147//! let runner = FlowRunner::new(graph, storage.clone());
148//!
149//! // Create session
150//! let session = Session::new_from_task("session_id".to_string(), "start_task");
151//! storage.save(session).await?;
152//!
153//! // Execute workflow
154//! let result = runner.run("session_id").await?;
155//! # Ok(())
156//! # }
157//! ```
158//!
159//! ## Features
160//!
161//! - **Default**: Core workflow functionality
162//! - **`rig`**: Enables LLM integration via the Rig crate
163//!
164//! ## Storage Backends
165//!
166//! - [`InMemorySessionStorage`]: For development and testing
167//! - [`PostgresSessionStorage`]: For production use with PostgreSQL
168
169pub mod context;
170pub mod error;
171pub mod graph;
172pub mod runner;
173pub mod storage;
174pub mod storage_postgres;
175pub mod task;
176
177// Re-export commonly used types
178pub use context::{ChatHistory, Context, MessageRole, SerializableMessage};
179pub use error::{GraphError, Result};
180pub use graph::{ExecutionResult, ExecutionStatus, Graph, GraphBuilder};
181pub use runner::FlowRunner;
182pub use storage::{
183 GraphStorage, InMemoryGraphStorage, InMemorySessionStorage, Session, SessionStorage,
184};
185pub use storage_postgres::PostgresSessionStorage;
186pub use task::{NextAction, Task, TaskResult};
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use async_trait::async_trait;
192 use std::sync::Arc;
193
194 struct TestTask {
195 id: String,
196 }
197
198 #[async_trait]
199 impl Task for TestTask {
200 fn id(&self) -> &str {
201 &self.id
202 }
203
204 async fn run(&self, context: Context) -> Result<TaskResult> {
205 let input: String = context.get("input").await.unwrap_or_default();
206 context.set("output", format!("Processed: {}", input)).await;
207
208 Ok(TaskResult::new(
209 Some("Task completed".to_string()),
210 NextAction::End,
211 ))
212 }
213 }
214
215 #[tokio::test]
216 async fn test_simple_graph_execution() {
217 let task = Arc::new(TestTask {
218 id: "test_task".to_string(),
219 });
220
221 let graph = GraphBuilder::new("test_graph").add_task(task).build();
222
223 let context = Context::new();
224 context.set("input", "Hello, World!").await;
225
226 let result = graph.execute("test_task", context.clone()).await.unwrap();
227
228 assert!(result.response.is_some());
229 assert!(matches!(result.next_action, NextAction::End));
230
231 let output: String = context.get("output").await.unwrap();
232 assert_eq!(output, "Processed: Hello, World!");
233 }
234
235 #[tokio::test]
236 async fn test_storage() {
237 let graph_storage = InMemoryGraphStorage::new();
238 let session_storage = InMemorySessionStorage::new();
239
240 let graph = Arc::new(Graph::new("test"));
241 graph_storage
242 .save("test".to_string(), graph.clone())
243 .await
244 .unwrap();
245
246 let retrieved = graph_storage.get("test").await.unwrap();
247 assert!(retrieved.is_some());
248
249 let session = Session {
250 id: "session1".to_string(),
251 graph_id: "test".to_string(),
252 current_task_id: "task1".to_string(),
253 status_message: None,
254 context: Context::new(),
255 };
256
257 session_storage.save(session.clone()).await.unwrap();
258 let retrieved_session = session_storage.get("session1").await.unwrap();
259 assert!(retrieved_session.is_some());
260 }
261}