flowgentra_ai/core/runtime/mod.rs
1//! ## Advanced Patterns
2//!
3//! - Support custom middleware for logging, metrics, etc.
4//! - Allow users to hook into state transitions, node execution, and errors.
5//! ## Observability & Debugging
6//!
7//! - Use structured logging (e.g., `tracing`) throughout runtime and handlers.
8//! - Integrate distributed tracing for workflow execution.
9//! - Provide health check endpoints for production deployments.
10
11/// Example: health check stub
12///
13/// ```ignore
14/// // Example: log a state transition
15/// // tracing::info!("Transitioned from {:?} to {:?}", old_state, new_state);
16/// ```
17pub fn health_check() -> bool {
18 // Implement real health logic (e.g., DB, LLM, etc.)
19 true
20}
21// # Runtime for typed state engine
22//
23// ## Node & Handler Patterns
24//
25// - Register all handlers with macros for auto-discovery.
26// - Each handler should only mutate the state fields it owns.
27// - Document handler contracts (fields read/written).
28// - Use clear error types and never panic in handlers or runtime.
29// - Compose handlers for complex logic, don't overload one function.
30use crate::core::reducer::ReducerConfig;
31/// Runtime for typed state engine
32use crate::core::state::State;
33
34/// Merge a partial state update into the current state using per-field reducers.
35///
36/// Works with any `State` type by converting to/from JSON values.
37/// The `ReducerConfig` maps field names to `JsonReducer` strategies.
38/// Fields without explicit reducers use `Overwrite` (replace) semantics.
39///
40/// # Example
41/// ```ignore
42/// let config = ReducerConfig::new()
43/// .field("messages", JsonReducer::Append)
44/// .field("count", JsonReducer::Sum);
45/// let merged = merge_state(¤t, &partial_update, &config)?;
46/// ```
47pub fn merge_state<T: State>(
48 current: &T,
49 update: &T,
50 reducers: &ReducerConfig,
51) -> crate::core::error::Result<T> {
52 let current_value = current.to_value();
53 let update_value = update.to_value();
54 let merged_value = reducers.merge_values(¤t_value, &update_value);
55 T::from_json(merged_value)
56}
57
58/// Merge multiple partial state updates into the current state, applying them in order.
59pub fn merge_state_many<T: State>(
60 current: &T,
61 updates: &[T],
62 reducers: &ReducerConfig,
63) -> crate::core::error::Result<T> {
64 let mut result = current.clone();
65 for update in updates {
66 result = merge_state(&result, update, reducers)?;
67 }
68 Ok(result)
69}
70
71use crate::core::config::AgentConfig;
72use crate::core::error::{FlowgentraError, Result};
73use crate::core::graph::Graph;
74use crate::core::llm::{create_llm_client, LLMClient};
75use crate::core::mcp::{DefaultMCPClient, MCPClient};
76use crate::core::memory::{
77 CheckpointMetadata, Checkpointer, GenericCheckpointer, MemoryCheckpointer,
78};
79use crate::core::middleware::ExecutionContext as MiddlewareContext;
80use crate::core::middleware::MiddlewarePipeline;
81use crate::core::node::{Edge, EdgeCondition, Node, NodeFunction};
82use crate::core::observability::ObservabilityMiddleware;
83use crate::core::tracing::TimerGuard;
84use futures::future::try_join_all;
85use std::collections::{HashMap, HashSet};
86use std::sync::Arc;
87use std::time::Instant;
88
89// =============================================================================
90// Execution Context
91// =============================================================================
92
93pub struct ExecutionContext<T: State> {
94 /// Name of the node being executed
95 pub node_name: String,
96
97 /// Current state being processed
98 pub state: T,
99
100 /// LLM client for AI operations
101 pub llm_client: Arc<dyn LLMClient>,
102
103 /// Available MCP clients for tool access
104 pub mcp_clients: HashMap<String, Arc<dyn MCPClient>>,
105}
106
107// =============================================================================
108// Runtime
109// =============================================================================
110
111/// The core execution engine for agent graphs
112///
113/// Responsible for:
114/// - Managing the graph structure
115/// - Executing nodes in order
116/// - Applying edge conditions
117/// - Maintaining service clients
118/// - Executing middleware hooks
119pub struct AgentRuntime<T: State> {
120 /// Agent configuration
121 config: AgentConfig,
122
123 /// The execution graph
124 graph: Graph<T>,
125
126 /// LLM client shared across all nodes
127 llm_client: Arc<dyn LLMClient>,
128
129 /// MCP clients for tool access
130 mcp_clients: HashMap<String, Arc<dyn MCPClient>>,
131
132 /// Middleware pipeline for cross-cutting concerns
133 middleware_pipeline: MiddlewarePipeline<T>,
134
135 /// Optional checkpointer for thread-scoped state persistence (resume, multi-turn).
136 checkpointer: Option<Arc<MemoryCheckpointer>>,
137}
138
139impl<T: State> AgentRuntime<T> {
140 // =========================================================================
141 // Initialization
142 // =========================================================================
143
144 /// Create a new runtime from a configuration
145 ///
146 /// This performs:
147 /// - Configuration validation
148 /// - LLM client initialization
149 /// - MCP client setup
150 /// - Graph construction
151 pub fn from_config(config: AgentConfig) -> Result<Self> {
152 // Validate config
153 config.validate()?;
154
155 // Create LLM client
156 let llm_client = create_llm_client(&config.llm)?;
157
158 // Create MCP clients
159 let mcp_clients: HashMap<String, Arc<dyn MCPClient>> = config
160 .graph
161 .mcps
162 .iter()
163 .map(|(name, mcp_config)| {
164 (
165 name.clone(),
166 Arc::new(DefaultMCPClient::new(mcp_config.clone())) as Arc<dyn MCPClient>,
167 )
168 })
169 .collect();
170
171 // Create graph
172 let mut graph = Graph::new();
173
174 // Collect names of nodes that are managed by a supervisor.
175 // These should NOT be added to the graph — the supervisor calls them internally,
176 // so they don't participate in the graph's own edge routing or termination checks.
177 let supervisor_children: std::collections::HashSet<String> = config
178 .graph
179 .nodes
180 .iter()
181 .filter(|n| {
182 matches!(
183 n.node_type.as_deref(),
184 Some("supervisor") | Some("orchestrator")
185 )
186 })
187 .flat_map(|n| {
188 n.config
189 .get("children")
190 .and_then(|v| v.as_array())
191 .map(|arr| {
192 arr.iter()
193 .filter_map(|v| v.as_str().map(String::from))
194 .collect::<Vec<_>>()
195 })
196 .unwrap_or_default()
197 })
198 .collect();
199
200 // Add nodes (these are placeholders; actual nodes are added via register_node)
201 // Skip supervisor-managed children — they are internal to the supervisor, not graph routing nodes.
202 for node_config in &config.graph.nodes {
203 if supervisor_children.contains(&node_config.name) {
204 continue;
205 }
206 let placeholder_fn: NodeFunction<T> =
207 Box::new(|state| Box::pin(async move { Ok(state) }));
208 let is_planner = node_config.handler == "builtin::planner"
209 || node_config.node_type.as_deref() == Some("planner");
210 let node = if is_planner {
211 Node::new_planner(
212 node_config.name.clone(),
213 placeholder_fn,
214 node_config.mcps.clone(),
215 node_config.config.clone(),
216 )
217 } else {
218 Node::new_placeholder(
219 node_config.name.clone(),
220 placeholder_fn,
221 node_config.mcps.clone(),
222 node_config.config.clone(),
223 )
224 };
225 graph.add_node(node);
226 }
227
228 // Add edges
229 let mut node_names: std::collections::HashSet<_> =
230 config.graph.nodes.iter().map(|n| n.name.clone()).collect();
231 node_names.insert("START".to_string());
232 node_names.insert("END".to_string());
233
234 let mut start_nodes = Vec::new();
235 let mut end_nodes = Vec::new();
236
237 for edge_config in &config.graph.edges {
238 for to in &edge_config.to {
239 let mut edge = Edge::new(
240 edge_config.from.clone(),
241 to.clone(),
242 None, // Condition is set during registration
243 );
244
245 if let Some(cond_name) = &edge_config.condition {
246 edge = edge.with_condition_name(cond_name.clone());
247 }
248
249 graph.add_edge(edge);
250
251 if edge_config.from == "START" {
252 start_nodes.push(to.clone());
253 }
254 if *to == "END" {
255 end_nodes.push(edge_config.from.clone());
256 }
257 }
258 }
259
260 graph.set_start_nodes(start_nodes);
261 graph.set_end_nodes(end_nodes);
262 graph.set_allow_cycles(config.graph.allow_cycles);
263 graph.set_max_loop_iterations(config.graph.recursion_limit);
264
265 // Validate graph
266 graph.validate()?;
267
268 Ok(AgentRuntime {
269 config,
270 graph,
271 llm_client,
272 mcp_clients,
273 middleware_pipeline: MiddlewarePipeline::new(),
274 checkpointer: None,
275 })
276 }
277
278 // =========================================================================
279 // Handler Registration
280 // =========================================================================
281
282 /// Register a handler function for a node
283 ///
284 /// Replaces the placeholder function with the actual handler.
285 /// Called during agent initialization.
286 pub fn register_node(&mut self, name: &str, function: NodeFunction<T>) -> Result<()> {
287 if let Some(node) = self.graph.nodes.get_mut(name) {
288 // Replace the placeholder function
289 let mcps = node.mcps.clone();
290 let config = node.config.clone();
291 let is_planner = node.is_planner;
292 let new_node = if is_planner {
293 Node::new_planner(name.to_string(), function, mcps, config)
294 } else {
295 Node::new(name.to_string(), function, mcps, config)
296 };
297 self.graph.nodes.insert(name.to_string(), new_node);
298 Ok(())
299 } else {
300 Err(FlowgentraError::NodeNotFound(name.to_string()))
301 }
302 }
303
304 /// Register a condition function for an edge
305 ///
306 /// Associates a condition function with conditional edges.
307 pub fn register_edge_condition(
308 &mut self,
309 from: &str,
310 condition_name: &str,
311 condition_fn: EdgeCondition<T>,
312 ) -> Result<()> {
313 // Find and update the edge(s)
314 for edge in self.graph.edges.iter_mut() {
315 if edge.from == from && edge.condition_name.as_deref() == Some(condition_name) {
316 edge.condition = Some(Arc::clone(&condition_fn));
317 }
318 }
319 Ok(())
320 }
321
322 /// Set the checkpointer for thread-scoped state persistence.
323 ///
324 /// When set, use `execute_with_thread(thread_id, state)` to load/save state per thread.
325 pub fn set_checkpointer(&mut self, checkpointer: Arc<MemoryCheckpointer>) -> &mut Self {
326 self.checkpointer = Some(checkpointer);
327 self
328 }
329
330 /// Builder-style: set the checkpointer.
331 pub fn with_checkpointer(mut self, checkpointer: Arc<MemoryCheckpointer>) -> Self {
332 self.checkpointer = Some(checkpointer);
333 self
334 }
335
336 /// Get the checkpointer if set.
337 pub fn checkpointer(&self) -> Option<Arc<MemoryCheckpointer>> {
338 self.checkpointer.clone()
339 }
340
341 /// Add middleware to the middleware pipeline
342 ///
343 /// Middleware will be executed in the order they are added during node execution.
344 /// Each middleware receives an ExecutionContext and can:
345 /// - Log/monitor execution
346 /// - Validate state
347 /// - Modify state before/after execution
348 /// - Skip nodes or abort execution
349 ///
350 /// # Arguments
351 /// - `middleware` - The middleware to add (must implement Middleware trait)
352 ///
353 /// # Example
354 /// ```ignore
355 /// # use flowgentra_ai::core::runtime::AgentRuntime;
356 /// # use flowgentra_ai::core::config::AgentConfig;
357 /// # use flowgentra_ai::core::middleware::LoggingMiddleware;
358 /// # use std::sync::Arc;
359 /// # let config = AgentConfig::from_file("config.yaml")?;
360 /// # let mut runtime = AgentRuntime::from_config(config)?;
361 /// runtime.add_middleware(Arc::new(LoggingMiddleware::new()));
362 /// # Ok::<(), Box<dyn std::error::Error>>(())
363 /// ```
364 pub fn add_middleware(
365 &mut self,
366 middleware: Arc<dyn crate::core::middleware::Middleware<T>>,
367 ) -> &mut Self {
368 self.middleware_pipeline.use_middleware(middleware);
369 self
370 }
371
372 /// Get a reference to the middleware pipeline
373 pub fn middleware_pipeline(&self) -> &MiddlewarePipeline<T> {
374 &self.middleware_pipeline
375 }
376
377 /// Get mutable reference to the middleware pipeline
378 pub fn middleware_pipeline_mut(&mut self) -> &mut MiddlewarePipeline<T> {
379 &mut self.middleware_pipeline
380 }
381
382 /// Add observability middleware (execution tracing, node timing, token usage, failure snapshots).
383 pub fn with_observability(&mut self) -> &mut Self {
384 let mw = ObservabilityMiddleware::new().with_agent_name(self.config.name.clone());
385 self.add_middleware(Arc::new(mw));
386 self
387 }
388
389 // =========================================================================
390 // Execution
391 // =========================================================================
392
393 /// Execute the agent graph from start to end.
394 ///
395 /// For thread-scoped persistence (resume, multi-turn), use `execute_with_thread` and
396 /// set a checkpointer via `set_checkpointer` or config.
397 pub async fn execute(&self, initial_state: T) -> Result<T> {
398 self.execute_impl(initial_state, None).await
399 }
400
401 /// Execute with a thread id for checkpointing. When a checkpointer is set, state is
402 /// loaded from the last checkpoint for this thread (if any) and saved after each node.
403 pub async fn execute_with_thread(&self, thread_id: &str, initial_state: T) -> Result<T> {
404 self.execute_impl(initial_state, Some(thread_id)).await
405 }
406
407 async fn execute_impl(&self, initial_state: T, thread_id: Option<&str>) -> Result<T> {
408 // == STATE CLONING NOTES ==
409 // This execution loop clones state at several points for correctness:
410 // 1. Parallel frontier execution: Arc-wrapped single clone for shared access
411 // 2. Middleware context: Clone for before/after hooks (they need mutable reference)
412 // 3. Node execution: Clone as handler takes state by value
413 //
414 // For typical workflows, these clones are acceptable and necessary.
415 // For large state objects (>1MB), consider:
416 // - SharedState: Zero-copy shared state via Arc<Mutex>
417 // - OptimizedState: Lazy cloning on mutation only
418 // See STATE_OPTIMIZATION_GUIDE.md for detailed strategies
419
420 let span = tracing::info_span!("agent_execution");
421 let _guard = span.enter();
422
423 tracing::info!("Starting agent execution");
424 let _timer = TimerGuard::start("total_execution");
425
426 // If checkpointer + thread_id: try load checkpoint; otherwise use initial_state
427 let mut state = if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
428 match cp.load(tid)? {
429 Some(checkpoint) => {
430 let loaded = checkpoint.state()?;
431 tracing::info!(thread_id = %tid, "Resumed from checkpoint");
432 loaded
433 }
434 None => initial_state,
435 }
436 } else {
437 initial_state
438 };
439
440 // Validate initial state if schema is configured
441 self.config.validate_state(&state)?;
442
443 // Track execution path for checkpoint metadata
444 let mut execution_path: Vec<String> = Vec::new();
445
446 // Planner: max iterations (re-plan after each step)
447 let max_plan_steps = self.config.graph.planner.max_plan_steps;
448 let mut plan_steps: usize = 0;
449
450 // Recursion / step limit — prevents infinite loops in cyclic graphs.
451 let recursion_limit = self.config.graph.recursion_limit;
452 let mut total_steps: usize = 0;
453
454 // Start from START node
455 let mut current_nodes = self.graph.start_nodes().to_vec();
456 let mut iteration = 0;
457
458 while !current_nodes.is_empty() {
459 iteration += 1;
460 total_steps += current_nodes.len();
461
462 if total_steps > recursion_limit {
463 return Err(
464 crate::core::error::FlowgentraError::RecursionLimitExceeded {
465 limit: recursion_limit,
466 },
467 );
468 }
469
470 let node_count = current_nodes.len();
471
472 tracing::debug!(iteration, node_count, total_steps, nodes = ?current_nodes, "Processing iteration");
473
474 let mut next_nodes = Vec::new();
475
476 // DAG-based parallel: when multiple nodes in frontier (and none are planner), run concurrently with tokio
477 let run_parallel = node_count > 1
478 && !current_nodes.iter().any(|n| {
479 self.graph
480 .get_node(n)
481 .map(|node| node.is_planner)
482 .unwrap_or(false)
483 });
484
485 if run_parallel {
486 // Run all frontier nodes concurrently (futures::join_all)
487 // ╔════════════════════════════════════════════════════════════════════════╗
488 // ║ CRITICAL PERFORMANCE: State Cloning in Parallel Execution ║
489 // ├════════════════════════════════════════════════════════════════════════╣
490 // ║ Issue: clone() called per node × iterations (3 nodes × 10 steps = 30×) ║
491 // ║ Impact: 50-200ms overhead for state > 500KB ║
492 // ║ Solution: Use SharedState, NOT PlainState! ║
493 // ║ ║
494 // ║ SharedState.clone() = Cheap (Arc pointer, ~1μs) ║
495 // ║ PlainState.clone() = Expensive (full data, 5-20ms) ║
496 // ║ ║
497 // ║ Recommendation: For production, ALWAYS use SharedState ║
498 // ║ Example at: flowgentra-ai/examples/state_graph_react_agent.rs ║
499 // ╚════════════════════════════════════════════════════════════════════════╝
500 let state_shared = Arc::new(state); // No clone - wrap as-is
501 let futures_with_names: Vec<_> = current_nodes
502 .iter()
503 .filter(|n| *n != "END")
504 .map(|node_name| {
505 let node = self
506 .graph
507 .get_node(node_name)
508 .ok_or_else(|| FlowgentraError::NodeNotFound(node_name.clone()))?;
509 // ⚠️ SharedState.clone() is cheap (Arc pointer)
510 // ⚠️ PlainState.clone() is expensive (full JSON data copy)
511 let state_in = (*state_shared).clone();
512 let name = node_name.clone();
513 let fut = node.execute(state_in);
514 Ok::<_, FlowgentraError>((name, fut))
515 })
516 .collect::<Result<Vec<_>>>()?;
517
518 let task_futures: Vec<_> = futures_with_names
519 .into_iter()
520 .map(|(name, f)| {
521 Box::pin(async move {
522 let out = f.await?;
523 Ok::<_, FlowgentraError>((name, out))
524 })
525 as std::pin::Pin<
526 Box<dyn std::future::Future<Output = Result<(String, T)>> + Send>,
527 >
528 })
529 .collect();
530
531 // Use `try_join_all` to short-circuit immediately on the first error
532 let results: Vec<(String, T)> = match try_join_all(task_futures).await {
533 Ok(res) => res,
534 Err(e) => {
535 tracing::error!(error = ?e, "Parallel node execution failed early");
536 return Err(FlowgentraError::ParallelExecutionError(e.to_string()));
537 }
538 };
539
540 // Recover the original state from the Arc, then merge branch results into it
541 state = Arc::try_unwrap(state_shared).unwrap_or_else(|arc| arc.as_ref().clone());
542 for (node_name, branch_state) in &results {
543 execution_path.push(node_name.clone());
544 for (k, v) in branch_state.as_map() {
545 state.set(k, v);
546 }
547 }
548
549 if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
550 let last = results.first().map(|(n, _)| n.clone());
551 let meta = CheckpointMetadata {
552 last_node: last,
553 execution_path: execution_path.clone(),
554 extra: HashMap::new(),
555 };
556 if let Err(e) = cp.save(tid, &state, &meta) {
557 tracing::error!(error = %e, thread_id = %tid, "Failed to save checkpoint after parallel execution");
558 }
559 }
560 tracing::debug!(nodes = ?current_nodes, "Parallel step completed");
561
562 // Collect next nodes from all branches (evaluate edges on merged state)
563 let mut seen = HashSet::new();
564 for node_name in ¤t_nodes {
565 if node_name == "END" {
566 continue;
567 }
568 for edge in self.graph.get_next_nodes(node_name) {
569 if edge.to == "END" {
570 tracing::info!(from = node_name, "Execution complete - reached END");
571 if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
572 let meta = CheckpointMetadata {
573 last_node: Some(node_name.clone()),
574 execution_path: execution_path.clone(),
575 extra: HashMap::new(),
576 };
577 if let Err(e) = cp.save(tid, &state, &meta) {
578 tracing::error!(error = %e, thread_id = %tid, "Failed to save final checkpoint");
579 }
580 }
581 let _ = self.middleware_pipeline.execute_on_complete(&state).await;
582 return Ok(state);
583 }
584 if let Ok(true) = edge.should_take(&state).await {
585 let next = edge
586 .get_next_node(&state)
587 .await?
588 .unwrap_or_else(|| edge.to.clone());
589 if seen.insert(next.clone()) {
590 next_nodes.push(next);
591 }
592 }
593 }
594 }
595
596 if next_nodes.is_empty() {
597 tracing::warn!("No next nodes after parallel step - stopping");
598 break;
599 }
600 current_nodes = next_nodes;
601 continue;
602 }
603
604 for node_name in current_nodes {
605 if node_name == "END" {
606 tracing::debug!("Reached END node");
607 continue;
608 }
609
610 let node_span = tracing::debug_span!("node_execution", node = %node_name);
611 let _node_guard = node_span.enter();
612
613 // Execute the node
614 let node = self.graph.get_node(&node_name).ok_or_else(|| {
615 tracing::error!(node = %node_name, "Node not found in graph");
616 FlowgentraError::NodeNotFound(node_name.clone())
617 })?;
618
619 let _node_timer = TimerGuard::start(format!("node_{}", node_name));
620
621 // === MIDDLEWARE: before_node ===
622 // Note: state.clone() is necessary here because:
623 // 1. Middleware takes mutable reference and can modify state
624 // 2. Node handler takes state by value
625 // For large state objects (>1MB), consider using SharedState for zero-copy sharing
626 let start_time = Instant::now();
627 let mut middleware_ctx = MiddlewareContext::new(node_name.clone(), state.clone());
628
629 // Execute middleware before node
630 if let Err(e) = self
631 .middleware_pipeline
632 .execute_before_node(&mut middleware_ctx)
633 .await
634 {
635 tracing::error!(node = %node_name, error = ?e, "Middleware before_node failed");
636 return Err(e);
637 }
638
639 state = middleware_ctx.state;
640
641 // Planner nodes: inject _current_node and _reachable_nodes before execution
642 if node.is_planner {
643 state.set(
644 "_current_node",
645 serde_json::Value::String(node_name.clone()),
646 );
647 let reachable = self.graph.get_reachable_node_ids(&node_name);
648 state.set("_reachable_nodes", serde_json::json!(reachable));
649 }
650
651 // Inject per-node MCP names so handlers can use get_node_mcp_client()
652 if !node.mcps.is_empty() {
653 state.set("_node_mcps", serde_json::json!(node.mcps));
654 } else {
655 state.remove("_node_mcps");
656 }
657
658 // Execute node - clone is necessary as handler takes State by value
659 match node.execute(state.clone()).await {
660 Ok(new_state) => {
661 state = new_state;
662
663 // === MIDDLEWARE: after_node ===
664 // Clone here is needed because middleware requires mutable reference
665 let elapsed = start_time.elapsed();
666 let mut after_ctx =
667 MiddlewareContext::new(node_name.clone(), state.clone());
668 after_ctx
669 .metadata
670 .insert("elapsed_ms".to_string(), elapsed.as_millis().to_string());
671
672 if let Err(e) = self
673 .middleware_pipeline
674 .execute_after_node(&mut after_ctx)
675 .await
676 {
677 tracing::error!(node = %node_name, error = ?e, "Middleware after_node failed");
678 return Err(e);
679 }
680
681 state = after_ctx.state;
682
683 // === INTERNAL RETRY LOOP ===
684 // If evaluation middleware flagged this node for retry, re-execute
685 // up to max_retries times (from config.evaluation).
686 let retry_key = format!("__node_retry_needed__{}", node_name);
687 let max_retries = self
688 .config
689 .evaluation
690 .as_ref()
691 .filter(|e| e.enabled)
692 .map(|e| e.max_retries)
693 .unwrap_or(0);
694
695 let mut retry_count = 0u32;
696 while retry_count < max_retries {
697 if let Some(flag) = state.get(&retry_key) {
698 if flag.as_bool().unwrap_or(false) {
699 retry_count += 1;
700 tracing::info!(
701 node = %node_name,
702 attempt = retry_count,
703 max_retries = max_retries,
704 "Retrying node due to low confidence"
705 );
706
707 // Clear the retry flag before re-execution
708 state.set(&retry_key, serde_json::json!(false));
709
710 // Re-execute the node
711 match node.execute(state.clone()).await {
712 Ok(new_state) => {
713 state = new_state;
714
715 // Run after_node middleware again to re-evaluate
716 let mut retry_ctx = MiddlewareContext::new(
717 node_name.clone(),
718 state.clone(),
719 );
720 if let Err(e) = self
721 .middleware_pipeline
722 .execute_after_node(&mut retry_ctx)
723 .await
724 {
725 tracing::error!(
726 node = %node_name,
727 error = ?e,
728 "Middleware after_node failed during retry"
729 );
730 return Err(e);
731 }
732 state = retry_ctx.state;
733
734 // Check if retry is still needed
735 continue;
736 }
737 Err(e) => {
738 tracing::error!(
739 node = %node_name,
740 attempt = retry_count,
741 error = ?e,
742 "Node retry execution failed"
743 );
744 return Err(e.context(&format!(
745 "while retrying node '{}' (attempt {})",
746 node_name, retry_count
747 )));
748 }
749 }
750 }
751 }
752 // No retry flag set — break
753 break;
754 }
755
756 if retry_count >= max_retries && max_retries > 0 {
757 if let Some(flag) = state.get(&retry_key) {
758 if flag.as_bool().unwrap_or(false) {
759 tracing::warn!(
760 node = %node_name,
761 max_retries = max_retries,
762 "Maximum retries reached, proceeding with best output"
763 );
764 // Clear the flag so downstream code doesn't see a stale retry request
765 state.set(&retry_key, serde_json::json!(false));
766 }
767 }
768 }
769
770 execution_path.push(node_name.clone());
771 // Checkpoint after each node when thread_id and checkpointer are set
772 if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
773 let meta = CheckpointMetadata {
774 last_node: Some(node_name.clone()),
775 execution_path: execution_path.clone(),
776 extra: HashMap::new(),
777 };
778 if let Err(e) = cp.save(tid, &state, &meta) {
779 tracing::warn!(error = ?e, "Checkpoint save failed");
780 }
781 }
782 tracing::debug!(node = %node_name, elapsed_ms = elapsed.as_millis(), "Node execution completed");
783
784 // Planner nodes: use _next_node from state instead of following edges
785 if node.is_planner {
786 plan_steps += 1;
787 if plan_steps >= max_plan_steps {
788 tracing::info!(plan_steps, "Max plan steps reached - stopping");
789 break;
790 }
791 let next_node = state
792 .get_string("_next_node")
793 .unwrap_or_else(|| "END".to_string());
794 if next_node == "END" {
795 tracing::info!(from = %node_name, "Planner chose END - execution complete");
796 if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
797 let meta = CheckpointMetadata {
798 last_node: Some(node_name.clone()),
799 execution_path: execution_path.clone(),
800 extra: HashMap::new(),
801 };
802 let _ = cp.save(tid, &state, &meta);
803 }
804 let _ = self.middleware_pipeline.execute_on_complete(&state).await;
805 return Ok(state);
806 }
807 tracing::debug!(next_node = %next_node, "Planner chose next node");
808 next_nodes.push(next_node);
809 continue;
810 }
811 }
812 Err(e) => {
813 tracing::error!(node = %node_name, error = ?e, "Node execution failed");
814
815 // === MIDDLEWARE: on_error ===
816 let _elapsed = start_time.elapsed();
817 let error_ctx = MiddlewareContext::new(node_name.clone(), state.clone());
818
819 self.middleware_pipeline
820 .execute_on_error(&node_name, &e, &error_ctx)
821 .await;
822
823 return Err(e.context(&format!("while executing node '{}'", node_name)));
824 }
825 }
826
827 // Find next nodes
828 let outgoing = self.graph.get_next_nodes(&node_name);
829 tracing::trace!(node = %node_name, edge_count = outgoing.len(), "Evaluating outgoing edges");
830
831 for edge in outgoing {
832 if edge.to == "END" {
833 tracing::info!(from = %node_name, "Execution complete - reached END");
834 execution_path.push(node_name.clone());
835 if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
836 let meta = CheckpointMetadata {
837 last_node: Some(node_name.clone()),
838 execution_path: execution_path.clone(),
839 extra: HashMap::new(),
840 };
841 if let Err(e) = cp.save(tid, &state, &meta) {
842 tracing::warn!(error = ?e, "Checkpoint save failed");
843 }
844 }
845 // === MIDDLEWARE: on_complete ===
846 let _complete_result =
847 self.middleware_pipeline.execute_on_complete(&state).await;
848
849 return Ok(state);
850 }
851
852 // Check if edge condition is satisfied
853 let edge_span = tracing::debug_span!(
854 "edge_evaluation",
855 from = %edge.from,
856 to = %edge.to
857 );
858 let _edge_guard = edge_span.enter();
859
860 match edge.should_take(&state).await {
861 Ok(should_take) => {
862 if should_take {
863 tracing::debug!(
864 from = %edge.from,
865 to = %edge.to,
866 "Edge condition satisfied"
867 );
868
869 match edge.get_next_node(&state).await {
870 Ok(Some(next_node)) => {
871 next_nodes.push(next_node);
872 }
873 Ok(None) => {
874 next_nodes.push(edge.to.clone());
875 }
876 Err(e) => {
877 tracing::error!(error = ?e, "Failed to get next node");
878 return Err(e);
879 }
880 }
881 } else {
882 tracing::trace!(
883 from = %edge.from,
884 to = %edge.to,
885 "Edge condition not satisfied"
886 );
887 }
888 }
889 Err(e) => {
890 tracing::error!(error = ?e, "Error evaluating edge condition");
891 return Err(e);
892 }
893 }
894 }
895 }
896
897 if next_nodes.is_empty() {
898 tracing::warn!("No next nodes found - execution stopping");
899 break;
900 }
901
902 current_nodes = next_nodes;
903 }
904
905 tracing::info!("Agent execution completed successfully");
906 Ok(state)
907 }
908
909 // =========================================================================
910 // Accessors
911 // =========================================================================
912
913 /// Get a reference to the graph for inspection
914 pub fn graph(&self) -> &Graph<T> {
915 &self.graph
916 }
917
918 /// Get mutable access to the graph
919 pub fn graph_mut(&mut self) -> &mut Graph<T> {
920 &mut self.graph
921 }
922
923 /// Get the configuration
924 pub fn config(&self) -> &AgentConfig {
925 &self.config
926 }
927
928 /// Get the LLM client
929 pub fn llm_client(&self) -> Arc<dyn LLMClient> {
930 Arc::clone(&self.llm_client)
931 }
932
933 /// Get an MCP client by name
934 pub fn mcp_client(&self, name: &str) -> Option<Arc<dyn MCPClient>> {
935 self.mcp_clients.get(name).map(Arc::clone)
936 }
937
938 // =========================================================================
939 // Visualization
940 // =========================================================================
941
942 /// Visualize the agent graph as a text file
943 ///
944 /// Generates a text-based representation of the graph structure.
945 /// This is useful for debugging and documentation.
946 ///
947 /// # Arguments
948 /// - `output_path`: Path where to save the visualization
949 ///
950 /// # Example
951 /// ```ignore
952 /// # use flowgentra_ai::core::runtime::AgentRuntime;
953 /// # use flowgentra_ai::core::config::AgentConfig;
954 /// # let config = AgentConfig::from_file("config.yaml")?;
955 /// # let runtime: AgentRuntime<SharedState> = AgentRuntime::from_config(config)?;
956 /// #[cfg(feature = "visualization")]
957 /// runtime.visualize_graph("agent_graph.txt")?;
958 /// # Ok::<(), Box<dyn std::error::Error>>(())
959 /// ```
960 pub fn visualize_graph(&self, output_path: &str) -> Result<()> {
961 // Use pure-Rust visualization module with improved defaults
962 let config = crate::core::utils::visualization::VisualizationConfig::new(output_path);
963
964 crate::core::utils::visualization::visualize_graph(&self.graph, config)?;
965
966 tracing::info!(output_path = %output_path, "Graph visualization completed");
967 println!("✓ Graph visualization saved to: {}", output_path);
968 Ok(())
969 }
970}
971
972// Sub-modules for runtime organization
973pub mod context;
974pub mod optimization;
975pub mod parallel;
976
977pub use optimization::{strategies, CloneStats, OptimizedState};
978pub use parallel::*;