graph_flow/runner.rs
1//! FlowRunner – convenience wrapper that loads a session, executes exactly **one** graph step, and
2//! persists the updated session back to storage.
3//!
4//! ## When should you use `FlowRunner`?
5//! * **Interactive workflows / web services**: you usually want to run _one_ step per HTTP
6//! request, send the assistant's reply back to the client, and have the session automatically
7//! saved for the next roundtrip. `FlowRunner` makes that a one-liner.
8//! * **CLI demos & examples**: keeps example code tiny; no need to repeat the
9//! load-execute-save boilerplate.
10//!
11//! ## When should you use `Graph::execute_session` directly?
12//! * **Batch processing** where you intentionally want to run many steps in a tight loop and save
13//! once at the end to reduce I/O.
14//! * **Custom persistence logic** (e.g. optimistic locking, distributed transactions).
15//! * **Advanced diagnostics** where you want to inspect the intermediate `Session` before saving.
16//!
17//! Both APIs are 100 % compatible – `FlowRunner` merely builds on top of the low-level function.
18//!
19//! ## Patterns for Stateless HTTP Services
20//!
21//! ### Pattern 1: Shared FlowRunner (RECOMMENDED)
22//! Create `FlowRunner` once at startup, share across all requests:
23//! ```rust,no_run
24//! use graph_flow::FlowRunner;
25//! use std::sync::Arc;
26//!
27//! // At startup
28//! struct AppState {
29//! flow_runner: FlowRunner,
30//! }
31//!
32//! // In request handler (async context)
33//! # async fn example(state: AppState, session_id: String) -> Result<(), Box<dyn std::error::Error>> {
34//! let result = state.flow_runner.run(&session_id).await?;
35//! # Ok(())
36//! # }
37//! ```
38//! **Pros**: Most efficient, zero allocation per request
39//! **Cons**: Requires the same graph for all requests
40//!
41//! ### Pattern 2: Per-Request FlowRunner
42//! Create `FlowRunner` fresh for each request:
43//! ```rust,no_run
44//! use graph_flow::{FlowRunner, Graph, InMemorySessionStorage};
45//! use std::sync::Arc;
46//!
47//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
48//! # let graph = Arc::new(Graph::new("my-graph"));
49//! # let storage: Arc<dyn graph_flow::SessionStorage> = Arc::new(InMemorySessionStorage::new());
50//! # let session_id = "test-session";
51//! // In request handler
52//! let runner = FlowRunner::new(graph.clone(), storage.clone());
53//! let result = runner.run(&session_id).await?;
54//! # Ok(())
55//! # }
56//! ```
57//! **Pros**: Flexible, can use different graphs per request
58//! **Cons**: Tiny allocation cost per request (still very cheap)
59//!
60//! ### Pattern 3: Manual (Original)
61//! Use `Graph::execute_session` directly:
62//! ```rust,no_run
63//! use graph_flow::{Graph, SessionStorage, InMemorySessionStorage};
64//! use std::sync::Arc;
65//!
66//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
67//! # let graph = Arc::new(Graph::new("my-graph"));
68//! # let storage: Arc<dyn SessionStorage> = Arc::new(InMemorySessionStorage::new());
69//! # let session_id = "test-session";
70//! let mut session = storage.get(&session_id).await?.unwrap();
71//! let result = graph.execute_session(&mut session).await?;
72//! storage.save(session).await?;
73//! # Ok(())
74//! # }
75//! ```
76//! **Pros**: Maximum control
77//! **Cons**: More boilerplate, easy to forget session.save()
78//!
79//! ## Performance Characteristics
80//! - **FlowRunner creation cost**: ~2 pointer copies (negligible)
81//! - **Memory overhead**: 16 bytes (2 × `Arc<T>`)
82//! - **Runtime cost**: Identical to manual approach
83//!
84//! For high-throughput services, Pattern 1 is recommended. For services with different
85//! graphs per request or complex routing, Pattern 2 is perfectly fine.
86//!
87//! # Examples
88//!
89//! ## Basic Usage
90//!
91//! ```rust,no_run
92//! use graph_flow::{FlowRunner, Graph, InMemorySessionStorage};
93//! use std::sync::Arc;
94//!
95//! # #[tokio::main]
96//! # async fn main() -> graph_flow::Result<()> {
97//! let graph = Arc::new(Graph::new("my_workflow"));
98//! let storage = Arc::new(InMemorySessionStorage::new());
99//! let runner = FlowRunner::new(graph, storage);
100//!
101//! // Execute workflow step (note: this will fail if session doesn't exist)
102//! let result = runner.run("session_id").await?;
103//! println!("Response: {:?}", result.response);
104//! # Ok(())
105//! # }
106//! ```
107//!
108//! ## Shared Runner Pattern (Recommended for Web Services)
109//!
110//! ```rust
111//! use graph_flow::FlowRunner;
112//! use std::sync::Arc;
113//!
114//! // Application state
115//! struct AppState {
116//! flow_runner: Arc<FlowRunner>,
117//! }
118//!
119//! impl AppState {
120//! fn new(runner: FlowRunner) -> Self {
121//! Self {
122//! flow_runner: Arc::new(runner),
123//! }
124//! }
125//! }
126//!
127//! // Request handler
128//! async fn handle_request(
129//! state: Arc<AppState>,
130//! session_id: String,
131//! ) -> Result<String, Box<dyn std::error::Error>> {
132//! let result = state.flow_runner.run(&session_id).await?;
133//! Ok(result.response.unwrap_or_default())
134//! }
135//! ```
136
137use std::sync::Arc;
138
139use crate::{
140 error::{GraphError, Result},
141 graph::{ExecutionResult, Graph},
142 storage::SessionStorage,
143};
144
145/// High-level helper that orchestrates the common _load → execute → save_ pattern.
146///
147/// `FlowRunner` provides a convenient wrapper around the lower-level graph execution
148/// API. It automatically handles session loading, execution, and persistence.
149///
150/// # When to Use FlowRunner
151///
152/// - **Web services**: Execute one step per HTTP request
153/// - **Interactive applications**: Step-by-step workflow progression
154/// - **Simple demos**: Minimal boilerplate for common use cases
155///
156/// # Performance
157///
158/// `FlowRunner` is lightweight and efficient:
159/// - Creation cost: ~2 pointer copies (negligible)
160/// - Memory overhead: 16 bytes (2 × `Arc<T>`)
161/// - Runtime cost: Identical to manual approach
162///
163/// # Examples
164///
165/// ## Basic Usage
166///
167/// ```rust,no_run
168/// use graph_flow::{FlowRunner, Graph, InMemorySessionStorage, Session, SessionStorage};
169/// use std::sync::Arc;
170///
171/// # #[tokio::main]
172/// # async fn main() -> graph_flow::Result<()> {
173/// let graph = Arc::new(Graph::new("my_workflow"));
174/// let storage = Arc::new(InMemorySessionStorage::new());
175/// let runner = FlowRunner::new(graph, storage.clone());
176///
177/// // Create a session first
178/// let session = Session::new_from_task("session_id".to_string(), "start_task");
179/// storage.save(session).await?;
180///
181/// // Execute workflow step
182/// let result = runner.run("session_id").await?;
183/// println!("Response: {:?}", result.response);
184/// # Ok(())
185/// # }
186/// ```
187///
188/// ## Shared Runner Pattern (Recommended for Web Services)
189///
190/// ```rust
191/// use graph_flow::FlowRunner;
192/// use std::sync::Arc;
193///
194/// // Application state
195/// struct AppState {
196/// flow_runner: Arc<FlowRunner>,
197/// }
198///
199/// impl AppState {
200/// fn new(runner: FlowRunner) -> Self {
201/// Self {
202/// flow_runner: Arc::new(runner),
203/// }
204/// }
205/// }
206///
207/// // Request handler
208/// async fn handle_request(
209/// state: Arc<AppState>,
210/// session_id: String,
211/// ) -> Result<String, Box<dyn std::error::Error>> {
212/// let result = state.flow_runner.run(&session_id).await?;
213/// Ok(result.response.unwrap_or_default())
214/// }
215/// ```
216#[derive(Clone)]
217pub struct FlowRunner {
218 graph: Arc<Graph>,
219 storage: Arc<dyn SessionStorage>,
220}
221
222impl FlowRunner {
223 /// Create a new `FlowRunner` from an `Arc<Graph>` and any `SessionStorage` implementation.
224 ///
225 /// # Parameters
226 ///
227 /// * `graph` - The workflow graph to execute
228 /// * `storage` - Storage backend for session persistence
229 ///
230 /// # Examples
231 ///
232 /// ```rust
233 /// use graph_flow::{FlowRunner, Graph, InMemorySessionStorage};
234 /// use std::sync::Arc;
235 ///
236 /// let graph = Arc::new(Graph::new("my_workflow"));
237 /// let storage = Arc::new(InMemorySessionStorage::new());
238 /// let runner = FlowRunner::new(graph, storage);
239 /// ```
240 ///
241 /// ## With PostgreSQL Storage
242 ///
243 /// ```rust,no_run
244 /// use graph_flow::{FlowRunner, Graph, PostgresSessionStorage};
245 /// use std::sync::Arc;
246 ///
247 /// # #[tokio::main]
248 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
249 /// let graph = Arc::new(Graph::new("my_workflow"));
250 /// let storage = Arc::new(
251 /// PostgresSessionStorage::connect("postgresql://localhost/mydb").await?
252 /// );
253 /// let runner = FlowRunner::new(graph, storage);
254 /// # Ok(())
255 /// # }
256 /// ```
257 pub fn new(graph: Arc<Graph>, storage: Arc<dyn SessionStorage>) -> Self {
258 Self { graph, storage }
259 }
260
261 /// Execute **exactly one** task for the given `session_id` and persist the updated session.
262 ///
263 /// This method:
264 /// 1. Loads the session from storage
265 /// 2. Executes the current task
266 /// 3. Saves the updated session back to storage
267 /// 4. Returns the execution result
268 ///
269 /// # Parameters
270 ///
271 /// * `session_id` - Unique identifier for the session to execute
272 ///
273 /// # Returns
274 ///
275 /// Returns the same [`ExecutionResult`] that `Graph::execute_session` does, so callers can
276 /// inspect the assistant's response and the status (`WaitingForInput`, `Completed`, etc.).
277 ///
278 /// # Errors
279 ///
280 /// Returns an error if:
281 /// - The session doesn't exist
282 /// - Task execution fails
283 /// - Storage operations fail
284 ///
285 /// # Examples
286 ///
287 /// ## Basic Execution
288 ///
289 /// ```rust,no_run
290 /// # use graph_flow::{FlowRunner, Graph, InMemorySessionStorage, Session, SessionStorage};
291 /// # use std::sync::Arc;
292 /// # #[tokio::main]
293 /// # async fn main() -> graph_flow::Result<()> {
294 /// # let graph = Arc::new(Graph::new("test"));
295 /// # let storage = Arc::new(InMemorySessionStorage::new());
296 /// # let runner = FlowRunner::new(graph, storage.clone());
297 /// # let session = Session::new_from_task("test_session".to_string(), "start_task");
298 /// # storage.save(session).await?;
299 /// let result = runner.run("test_session").await?;
300 ///
301 /// match result.status {
302 /// graph_flow::ExecutionStatus::Completed => {
303 /// println!("Workflow completed: {:?}", result.response);
304 /// }
305 /// graph_flow::ExecutionStatus::WaitingForInput => {
306 /// println!("Waiting for user input: {:?}", result.response);
307 /// }
308 /// graph_flow::ExecutionStatus::Paused { next_task_id, reason } => {
309 /// println!("Paused, next task: {}, reason: {}", next_task_id, reason);
310 /// }
311 /// graph_flow::ExecutionStatus::Error(e) => {
312 /// eprintln!("Error: {}", e);
313 /// }
314 /// }
315 /// # Ok(())
316 /// # }
317 /// ```
318 ///
319 /// ## Interactive Loop
320 ///
321 /// ```rust,no_run
322 /// # use graph_flow::{FlowRunner, ExecutionStatus, Session, SessionStorage};
323 /// # use std::sync::Arc;
324 /// # #[tokio::main]
325 /// # async fn main() -> graph_flow::Result<()> {
326 /// # let storage = Arc::new(graph_flow::InMemorySessionStorage::new());
327 /// # let runner = FlowRunner::new(Arc::new(graph_flow::Graph::new("test")), storage.clone());
328 /// # let session = Session::new_from_task("session_id".to_string(), "start_task");
329 /// # storage.save(session).await?;
330 /// loop {
331 /// let result = runner.run("session_id").await?;
332 ///
333 /// match result.status {
334 /// ExecutionStatus::Completed => break,
335 /// ExecutionStatus::WaitingForInput => {
336 /// // Get user input and update context
337 /// // Then continue loop
338 /// break; // For demo
339 /// }
340 /// ExecutionStatus::Paused { .. } => {
341 /// // Continue to next step
342 /// continue;
343 /// }
344 /// ExecutionStatus::Error(e) => {
345 /// eprintln!("Error: {}", e);
346 /// break;
347 /// }
348 /// }
349 /// }
350 /// # Ok(())
351 /// # }
352 /// ```
353 ///
354 /// ## Error Handling
355 ///
356 /// ```rust,no_run
357 /// # use graph_flow::{FlowRunner, GraphError};
358 /// # use std::sync::Arc;
359 /// # #[tokio::main]
360 /// # async fn main() {
361 /// # let runner = FlowRunner::new(Arc::new(graph_flow::Graph::new("test")), Arc::new(graph_flow::InMemorySessionStorage::new()));
362 /// match runner.run("nonexistent_session").await {
363 /// Ok(result) => {
364 /// println!("Success: {:?}", result.response);
365 /// }
366 /// Err(GraphError::SessionNotFound(session_id)) => {
367 /// eprintln!("Session not found: {}", session_id);
368 /// }
369 /// Err(GraphError::TaskExecutionFailed(msg)) => {
370 /// eprintln!("Task failed: {}", msg);
371 /// }
372 /// Err(e) => {
373 /// eprintln!("Other error: {}", e);
374 /// }
375 /// }
376 /// # }
377 /// ```
378 pub async fn run(&self, session_id: &str) -> Result<ExecutionResult> {
379 // 1. Load session
380 let mut session = self
381 .storage
382 .get(session_id)
383 .await?
384 .ok_or_else(|| GraphError::SessionNotFound(session_id.to_string()))?;
385
386 // 2. Execute current task (exactly one step)
387 let result = self.graph.execute_session(&mut session).await?;
388
389 // 3. Persist new state so the next call starts where we left off
390 self.storage.save(session).await?;
391
392 Ok(result)
393 }
394}