graph_flow/
runner.rs

1//! FlowRunner – convenience wrapper that loads a session, executes exactly **one** graph step, and
2//! persists the updated session back to storage.
3//!
4//! ## When should you use `FlowRunner`?
5//! * **Interactive workflows / web services**: you usually want to run _one_ step per HTTP
6//!   request, send the assistant's reply back to the client, and have the session automatically
7//!   saved for the next roundtrip. `FlowRunner` makes that a one-liner.
8//! * **CLI demos & examples**: keeps example code tiny; no need to repeat the
9//!   load-execute-save boilerplate.
10//!
11//! ## When should you use `Graph::execute_session` directly?
12//! * **Batch processing** where you intentionally want to run many steps in a tight loop and save
13//!   once at the end to reduce I/O.
14//! * **Custom persistence logic** (e.g. optimistic locking, distributed transactions).
15//! * **Advanced diagnostics** where you want to inspect the intermediate `Session` before saving.
16//!
17//! Both APIs are 100 % compatible – `FlowRunner` merely builds on top of the low-level function.
18//!
19//! ## Patterns for Stateless HTTP Services
20//!
21//! ### Pattern 1: Shared FlowRunner (RECOMMENDED)
22//! Create `FlowRunner` once at startup, share across all requests:
23//! ```rust,no_run
24//! use graph_flow::FlowRunner;
25//! use std::sync::Arc;
26//!
27//! // At startup
28//! struct AppState {
29//!     flow_runner: FlowRunner,
30//! }
31//!
32//! // In request handler (async context)
33//! # async fn example(state: AppState, session_id: String) -> Result<(), Box<dyn std::error::Error>> {
34//! let result = state.flow_runner.run(&session_id).await?;
35//! # Ok(())
36//! # }
37//! ```
38//! **Pros**: Most efficient, zero allocation per request  
39//! **Cons**: Requires the same graph for all requests
40//!
41//! ### Pattern 2: Per-Request FlowRunner
42//! Create `FlowRunner` fresh for each request:
43//! ```rust,no_run
44//! use graph_flow::{FlowRunner, Graph, InMemorySessionStorage};
45//! use std::sync::Arc;
46//!
47//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
48//! # let graph = Arc::new(Graph::new("my-graph"));
49//! # let storage: Arc<dyn graph_flow::SessionStorage> = Arc::new(InMemorySessionStorage::new());
50//! # let session_id = "test-session";
51//! // In request handler
52//! let runner = FlowRunner::new(graph.clone(), storage.clone());
53//! let result = runner.run(&session_id).await?;
54//! # Ok(())
55//! # }
56//! ```
57//! **Pros**: Flexible, can use different graphs per request  
58//! **Cons**: Tiny allocation cost per request (still very cheap)
59//!
60//! ### Pattern 3: Manual (Original)
61//! Use `Graph::execute_session` directly:
62//! ```rust,no_run
63//! use graph_flow::{Graph, SessionStorage, InMemorySessionStorage};
64//! use std::sync::Arc;
65//!
66//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
67//! # let graph = Arc::new(Graph::new("my-graph"));
68//! # let storage: Arc<dyn SessionStorage> = Arc::new(InMemorySessionStorage::new());
69//! # let session_id = "test-session";
70//! let mut session = storage.get(&session_id).await?.unwrap();
71//! let result = graph.execute_session(&mut session).await?;
72//! storage.save(session).await?;
73//! # Ok(())
74//! # }
75//! ```
76//! **Pros**: Maximum control  
77//! **Cons**: More boilerplate, easy to forget session.save()
78//!
79//! ## Performance Characteristics
80//! - **FlowRunner creation cost**: ~2 pointer copies (negligible)
81//! - **Memory overhead**: 16 bytes (2 × `Arc<T>`)
82//! - **Runtime cost**: Identical to manual approach
83//!
84//! For high-throughput services, Pattern 1 is recommended. For services with different
85//! graphs per request or complex routing, Pattern 2 is perfectly fine.
86//!
87//! # Examples
88//!
89//! ## Basic Usage
90//!
91//! ```rust,no_run
92//! use graph_flow::{FlowRunner, Graph, InMemorySessionStorage};
93//! use std::sync::Arc;
94//!
95//! # #[tokio::main]
96//! # async fn main() -> graph_flow::Result<()> {
97//! let graph = Arc::new(Graph::new("my_workflow"));
98//! let storage = Arc::new(InMemorySessionStorage::new());
99//! let runner = FlowRunner::new(graph, storage);
100//!
101//! // Execute workflow step (note: this will fail if session doesn't exist)
102//! let result = runner.run("session_id").await?;
103//! println!("Response: {:?}", result.response);
104//! # Ok(())
105//! # }
106//! ```
107//!
108//! ## Shared Runner Pattern (Recommended for Web Services)
109//!
110//! ```rust
111//! use graph_flow::FlowRunner;
112//! use std::sync::Arc;
113//!
114//! // Application state
115//! struct AppState {
116//!     flow_runner: Arc<FlowRunner>,
117//! }
118//!
119//! impl AppState {
120//!     fn new(runner: FlowRunner) -> Self {
121//!         Self {
122//!             flow_runner: Arc::new(runner),
123//!         }
124//!     }
125//! }
126//!
127//! // Request handler
128//! async fn handle_request(
129//!     state: Arc<AppState>,
130//!     session_id: String,
131//! ) -> Result<String, Box<dyn std::error::Error>> {
132//!     let result = state.flow_runner.run(&session_id).await?;
133//!     Ok(result.response.unwrap_or_default())
134//! }
135//! ```
136
137use std::sync::Arc;
138
139use crate::{
140    error::{GraphError, Result},
141    graph::{ExecutionResult, Graph},
142    storage::SessionStorage,
143};
144
145/// High-level helper that orchestrates the common _load → execute → save_ pattern.
146///
147/// `FlowRunner` provides a convenient wrapper around the lower-level graph execution
148/// API. It automatically handles session loading, execution, and persistence.
149///
150/// # When to Use FlowRunner
151///
152/// - **Web services**: Execute one step per HTTP request
153/// - **Interactive applications**: Step-by-step workflow progression
154/// - **Simple demos**: Minimal boilerplate for common use cases
155///
156/// # Performance
157///
158/// `FlowRunner` is lightweight and efficient:
159/// - Creation cost: ~2 pointer copies (negligible)
160/// - Memory overhead: 16 bytes (2 × `Arc<T>`)
161/// - Runtime cost: Identical to manual approach
162///
163/// # Examples
164///
165/// ## Basic Usage
166///
167/// ```rust,no_run
168/// use graph_flow::{FlowRunner, Graph, InMemorySessionStorage, Session, SessionStorage};
169/// use std::sync::Arc;
170///
171/// # #[tokio::main]
172/// # async fn main() -> graph_flow::Result<()> {
173/// let graph = Arc::new(Graph::new("my_workflow"));
174/// let storage = Arc::new(InMemorySessionStorage::new());
175/// let runner = FlowRunner::new(graph, storage.clone());
176///
177/// // Create a session first
178/// let session = Session::new_from_task("session_id".to_string(), "start_task");
179/// storage.save(session).await?;
180///
181/// // Execute workflow step
182/// let result = runner.run("session_id").await?;
183/// println!("Response: {:?}", result.response);
184/// # Ok(())
185/// # }
186/// ```
187///
188/// ## Shared Runner Pattern (Recommended for Web Services)
189///
190/// ```rust
191/// use graph_flow::FlowRunner;
192/// use std::sync::Arc;
193///
194/// // Application state
195/// struct AppState {
196///     flow_runner: Arc<FlowRunner>,
197/// }
198///
199/// impl AppState {
200///     fn new(runner: FlowRunner) -> Self {
201///         Self {
202///             flow_runner: Arc::new(runner),
203///         }
204///     }
205/// }
206///
207/// // Request handler
208/// async fn handle_request(
209///     state: Arc<AppState>,
210///     session_id: String,
211/// ) -> Result<String, Box<dyn std::error::Error>> {
212///     let result = state.flow_runner.run(&session_id).await?;
213///     Ok(result.response.unwrap_or_default())
214/// }
215/// ```
216#[derive(Clone)]
217pub struct FlowRunner {
218    graph: Arc<Graph>,
219    storage: Arc<dyn SessionStorage>,
220}
221
222impl FlowRunner {
223    /// Create a new `FlowRunner` from an `Arc<Graph>` and any `SessionStorage` implementation.
224    ///
225    /// # Parameters
226    ///
227    /// * `graph` - The workflow graph to execute
228    /// * `storage` - Storage backend for session persistence
229    ///
230    /// # Examples
231    ///
232    /// ```rust
233    /// use graph_flow::{FlowRunner, Graph, InMemorySessionStorage};
234    /// use std::sync::Arc;
235    ///
236    /// let graph = Arc::new(Graph::new("my_workflow"));
237    /// let storage = Arc::new(InMemorySessionStorage::new());
238    /// let runner = FlowRunner::new(graph, storage);
239    /// ```
240    ///
241    /// ## With PostgreSQL Storage
242    ///
243    /// ```rust,no_run
244    /// use graph_flow::{FlowRunner, Graph, PostgresSessionStorage};
245    /// use std::sync::Arc;
246    ///
247    /// # #[tokio::main]
248    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
249    /// let graph = Arc::new(Graph::new("my_workflow"));
250    /// let storage = Arc::new(
251    ///     PostgresSessionStorage::connect("postgresql://localhost/mydb").await?
252    /// );
253    /// let runner = FlowRunner::new(graph, storage);
254    /// # Ok(())
255    /// # }
256    /// ```
257    pub fn new(graph: Arc<Graph>, storage: Arc<dyn SessionStorage>) -> Self {
258        Self { graph, storage }
259    }
260
261    /// Execute **exactly one** task for the given `session_id` and persist the updated session.
262    ///
263    /// This method:
264    /// 1. Loads the session from storage
265    /// 2. Executes the current task
266    /// 3. Saves the updated session back to storage
267    /// 4. Returns the execution result
268    ///
269    /// # Parameters
270    ///
271    /// * `session_id` - Unique identifier for the session to execute
272    ///
273    /// # Returns
274    ///
275    /// Returns the same [`ExecutionResult`] that `Graph::execute_session` does, so callers can
276    /// inspect the assistant's response and the status (`WaitingForInput`, `Completed`, etc.).
277    ///
278    /// # Errors
279    ///
280    /// Returns an error if:
281    /// - The session doesn't exist
282    /// - Task execution fails
283    /// - Storage operations fail
284    ///
285    /// # Examples
286    ///
287    /// ## Basic Execution
288    ///
289    /// ```rust,no_run
290    /// # use graph_flow::{FlowRunner, Graph, InMemorySessionStorage, Session, SessionStorage};
291    /// # use std::sync::Arc;
292    /// # #[tokio::main]
293    /// # async fn main() -> graph_flow::Result<()> {
294    /// # let graph = Arc::new(Graph::new("test"));
295    /// # let storage = Arc::new(InMemorySessionStorage::new());
296    /// # let runner = FlowRunner::new(graph, storage.clone());
297    /// # let session = Session::new_from_task("test_session".to_string(), "start_task");
298    /// # storage.save(session).await?;
299    /// let result = runner.run("test_session").await?;
300    ///
301    /// match result.status {
302    ///     graph_flow::ExecutionStatus::Completed => {
303    ///         println!("Workflow completed: {:?}", result.response);
304    ///     }
305    ///     graph_flow::ExecutionStatus::WaitingForInput => {
306    ///         println!("Waiting for user input: {:?}", result.response);
307    ///     }
308    ///     graph_flow::ExecutionStatus::Paused { next_task_id, reason } => {
309    ///         println!("Paused, next task: {}, reason: {}", next_task_id, reason);
310    ///     }
311    ///     graph_flow::ExecutionStatus::Error(e) => {
312    ///         eprintln!("Error: {}", e);
313    ///     }
314    /// }
315    /// # Ok(())
316    /// # }
317    /// ```
318    ///
319    /// ## Interactive Loop
320    ///
321    /// ```rust,no_run
322    /// # use graph_flow::{FlowRunner, ExecutionStatus, Session, SessionStorage};
323    /// # use std::sync::Arc;
324    /// # #[tokio::main]
325    /// # async fn main() -> graph_flow::Result<()> {
326    /// # let storage = Arc::new(graph_flow::InMemorySessionStorage::new());
327    /// # let runner = FlowRunner::new(Arc::new(graph_flow::Graph::new("test")), storage.clone());
328    /// # let session = Session::new_from_task("session_id".to_string(), "start_task");
329    /// # storage.save(session).await?;
330    /// loop {
331    ///     let result = runner.run("session_id").await?;
332    ///     
333    ///     match result.status {
334    ///         ExecutionStatus::Completed => break,
335    ///         ExecutionStatus::WaitingForInput => {
336    ///             // Get user input and update context
337    ///             // Then continue loop
338    ///             break; // For demo
339    ///         }
340    ///         ExecutionStatus::Paused { .. } => {
341    ///             // Continue to next step
342    ///             continue;
343    ///         }
344    ///         ExecutionStatus::Error(e) => {
345    ///             eprintln!("Error: {}", e);
346    ///             break;
347    ///         }
348    ///     }
349    /// }
350    /// # Ok(())
351    /// # }
352    /// ```
353    ///
354    /// ## Error Handling
355    ///
356    /// ```rust,no_run
357    /// # use graph_flow::{FlowRunner, GraphError};
358    /// # use std::sync::Arc;
359    /// # #[tokio::main]
360    /// # async fn main() {
361    /// # let runner = FlowRunner::new(Arc::new(graph_flow::Graph::new("test")), Arc::new(graph_flow::InMemorySessionStorage::new()));
362    /// match runner.run("nonexistent_session").await {
363    ///     Ok(result) => {
364    ///         println!("Success: {:?}", result.response);
365    ///     }
366    ///     Err(GraphError::SessionNotFound(session_id)) => {
367    ///         eprintln!("Session not found: {}", session_id);
368    ///     }
369    ///     Err(GraphError::TaskExecutionFailed(msg)) => {
370    ///         eprintln!("Task failed: {}", msg);
371    ///     }
372    ///     Err(e) => {
373    ///         eprintln!("Other error: {}", e);
374    ///     }
375    /// }
376    /// # }
377    /// ```
378    pub async fn run(&self, session_id: &str) -> Result<ExecutionResult> {
379        // 1. Load session
380        let mut session = self
381            .storage
382            .get(session_id)
383            .await?
384            .ok_or_else(|| GraphError::SessionNotFound(session_id.to_string()))?;
385
386        // 2. Execute current task (exactly one step)
387        let result = self.graph.execute_session(&mut session).await?;
388
389        // 3. Persist new state so the next call starts where we left off
390        self.storage.save(session).await?;
391
392        Ok(result)
393    }
394}