Skip to main content

flowgentra_ai/core/runtime/
mod.rs

1//! ## Advanced Patterns
2//!
3//! - Support custom middleware for logging, metrics, etc.
4//! - Allow users to hook into state transitions, node execution, and errors.
5//! ## Observability & Debugging
6//!
7//! - Use structured logging (e.g., `tracing`) throughout runtime and handlers.
8//! - Integrate distributed tracing for workflow execution.
9//! - Provide health check endpoints for production deployments.
10
11/// Example: health check stub
12///
13/// ```ignore
14/// // Example: log a state transition
15/// // tracing::info!("Transitioned from {:?} to {:?}", old_state, new_state);
16/// ```
17pub fn health_check() -> bool {
18    // Implement real health logic (e.g., DB, LLM, etc.)
19    true
20}
21// # Runtime for typed state engine
22//
23// ## Node & Handler Patterns
24//
25// - Register all handlers with macros for auto-discovery.
26// - Each handler should only mutate the state fields it owns.
27// - Document handler contracts (fields read/written).
28// - Use clear error types and never panic in handlers or runtime.
29// - Compose handlers for complex logic, don't overload one function.
30use crate::core::reducer::ReducerConfig;
31/// Runtime for typed state engine
32use crate::core::state::State;
33
34/// Merge a partial state update into the current state using per-field reducers.
35///
36/// Works with any `State` type by converting to/from JSON values.
37/// The `ReducerConfig` maps field names to `JsonReducer` strategies.
38/// Fields without explicit reducers use `Overwrite` (replace) semantics.
39///
40/// # Example
41/// ```ignore
42/// let config = ReducerConfig::new()
43///     .field("messages", JsonReducer::Append)
44///     .field("count", JsonReducer::Sum);
45/// let merged = merge_state(&current, &partial_update, &config)?;
46/// ```
47pub fn merge_state<T: State>(
48    current: &T,
49    update: &T,
50    reducers: &ReducerConfig,
51) -> crate::core::error::Result<T> {
52    let current_value = current.to_value();
53    let update_value = update.to_value();
54    let merged_value = reducers.merge_values(&current_value, &update_value);
55    T::from_json(merged_value)
56}
57
58/// Merge multiple partial state updates into the current state, applying them in order.
59pub fn merge_state_many<T: State>(
60    current: &T,
61    updates: &[T],
62    reducers: &ReducerConfig,
63) -> crate::core::error::Result<T> {
64    let mut result = current.clone();
65    for update in updates {
66        result = merge_state(&result, update, reducers)?;
67    }
68    Ok(result)
69}
70
71use crate::core::config::AgentConfig;
72use crate::core::error::{FlowgentraError, Result};
73use crate::core::graph::Graph;
74use crate::core::llm::{create_llm_client, LLMClient};
75use crate::core::mcp::{DefaultMCPClient, MCPClient};
76use crate::core::memory::{
77    CheckpointMetadata, Checkpointer, GenericCheckpointer, MemoryCheckpointer,
78};
79use crate::core::middleware::ExecutionContext as MiddlewareContext;
80use crate::core::middleware::MiddlewarePipeline;
81use crate::core::node::{Edge, EdgeCondition, Node, NodeFunction};
82use crate::core::observability::ObservabilityMiddleware;
83use crate::core::tracing::TimerGuard;
84use futures::future::try_join_all;
85use std::collections::{HashMap, HashSet};
86use std::sync::Arc;
87use std::time::Instant;
88
89// =============================================================================
90// Execution Context
91// =============================================================================
92
93pub struct ExecutionContext<T: State> {
94    /// Name of the node being executed
95    pub node_name: String,
96
97    /// Current state being processed
98    pub state: T,
99
100    /// LLM client for AI operations
101    pub llm_client: Arc<dyn LLMClient>,
102
103    /// Available MCP clients for tool access
104    pub mcp_clients: HashMap<String, Arc<dyn MCPClient>>,
105}
106
107// =============================================================================
108// Runtime
109// =============================================================================
110
111/// The core execution engine for agent graphs
112///
113/// Responsible for:
114/// - Managing the graph structure
115/// - Executing nodes in order
116/// - Applying edge conditions
117/// - Maintaining service clients
118/// - Executing middleware hooks
119pub struct AgentRuntime<T: State> {
120    /// Agent configuration
121    config: AgentConfig,
122
123    /// The execution graph
124    graph: Graph<T>,
125
126    /// LLM client shared across all nodes
127    llm_client: Arc<dyn LLMClient>,
128
129    /// MCP clients for tool access
130    mcp_clients: HashMap<String, Arc<dyn MCPClient>>,
131
132    /// Middleware pipeline for cross-cutting concerns
133    middleware_pipeline: MiddlewarePipeline<T>,
134
135    /// Optional checkpointer for thread-scoped state persistence (resume, multi-turn).
136    checkpointer: Option<Arc<MemoryCheckpointer>>,
137}
138
139impl<T: State> AgentRuntime<T> {
140    // =========================================================================
141    // Initialization
142    // =========================================================================
143
144    /// Create a new runtime from a configuration
145    ///
146    /// This performs:
147    /// - Configuration validation
148    /// - LLM client initialization
149    /// - MCP client setup
150    /// - Graph construction
151    pub fn from_config(config: AgentConfig) -> Result<Self> {
152        // Validate config
153        config.validate()?;
154
155        // Create LLM client
156        let llm_client = create_llm_client(&config.llm)?;
157
158        // Create MCP clients
159        let mcp_clients: HashMap<String, Arc<dyn MCPClient>> = config
160            .graph
161            .mcps
162            .iter()
163            .map(|(name, mcp_config)| {
164                (
165                    name.clone(),
166                    Arc::new(DefaultMCPClient::new(mcp_config.clone())) as Arc<dyn MCPClient>,
167                )
168            })
169            .collect();
170
171        // Create graph
172        let mut graph = Graph::new();
173
174        // Collect names of nodes that are managed by a supervisor.
175        // These should NOT be added to the graph — the supervisor calls them internally,
176        // so they don't participate in the graph's own edge routing or termination checks.
177        let supervisor_children: std::collections::HashSet<String> = config
178            .graph
179            .nodes
180            .iter()
181            .filter(|n| {
182                matches!(
183                    n.node_type.as_deref(),
184                    Some("supervisor") | Some("orchestrator")
185                )
186            })
187            .flat_map(|n| {
188                n.config
189                    .get("children")
190                    .and_then(|v| v.as_array())
191                    .map(|arr| {
192                        arr.iter()
193                            .filter_map(|v| v.as_str().map(String::from))
194                            .collect::<Vec<_>>()
195                    })
196                    .unwrap_or_default()
197            })
198            .collect();
199
200        // Add nodes (these are placeholders; actual nodes are added via register_node)
201        // Skip supervisor-managed children — they are internal to the supervisor, not graph routing nodes.
202        for node_config in &config.graph.nodes {
203            if supervisor_children.contains(&node_config.name) {
204                continue;
205            }
206            let placeholder_fn: NodeFunction<T> =
207                Box::new(|state| Box::pin(async move { Ok(state) }));
208            let is_planner = node_config.handler == "builtin::planner"
209                || node_config.node_type.as_deref() == Some("planner");
210            let node = if is_planner {
211                Node::new_planner(
212                    node_config.name.clone(),
213                    placeholder_fn,
214                    node_config.mcps.clone(),
215                    node_config.config.clone(),
216                )
217            } else {
218                Node::new_placeholder(
219                    node_config.name.clone(),
220                    placeholder_fn,
221                    node_config.mcps.clone(),
222                    node_config.config.clone(),
223                )
224            };
225            graph.add_node(node);
226        }
227
228        // Add edges
229        let mut node_names: std::collections::HashSet<_> =
230            config.graph.nodes.iter().map(|n| n.name.clone()).collect();
231        node_names.insert("START".to_string());
232        node_names.insert("END".to_string());
233
234        let mut start_nodes = Vec::new();
235        let mut end_nodes = Vec::new();
236
237        for edge_config in &config.graph.edges {
238            for to in &edge_config.to {
239                let mut edge = Edge::new(
240                    edge_config.from.clone(),
241                    to.clone(),
242                    None, // Condition is set during registration
243                );
244
245                if let Some(cond_name) = &edge_config.condition {
246                    edge = edge.with_condition_name(cond_name.clone());
247                }
248
249                graph.add_edge(edge);
250
251                if edge_config.from == "START" {
252                    start_nodes.push(to.clone());
253                }
254                if *to == "END" {
255                    end_nodes.push(edge_config.from.clone());
256                }
257            }
258        }
259
260        graph.set_start_nodes(start_nodes);
261        graph.set_end_nodes(end_nodes);
262        graph.set_allow_cycles(config.graph.allow_cycles);
263        graph.set_max_loop_iterations(config.graph.recursion_limit);
264
265        // Validate graph
266        graph.validate()?;
267
268        Ok(AgentRuntime {
269            config,
270            graph,
271            llm_client,
272            mcp_clients,
273            middleware_pipeline: MiddlewarePipeline::new(),
274            checkpointer: None,
275        })
276    }
277
278    // =========================================================================
279    // Handler Registration
280    // =========================================================================
281
282    /// Register a handler function for a node
283    ///
284    /// Replaces the placeholder function with the actual handler.
285    /// Called during agent initialization.
286    pub fn register_node(&mut self, name: &str, function: NodeFunction<T>) -> Result<()> {
287        if let Some(node) = self.graph.nodes.get_mut(name) {
288            // Replace the placeholder function
289            let mcps = node.mcps.clone();
290            let config = node.config.clone();
291            let is_planner = node.is_planner;
292            let new_node = if is_planner {
293                Node::new_planner(name.to_string(), function, mcps, config)
294            } else {
295                Node::new(name.to_string(), function, mcps, config)
296            };
297            self.graph.nodes.insert(name.to_string(), new_node);
298            Ok(())
299        } else {
300            Err(FlowgentraError::NodeNotFound(name.to_string()))
301        }
302    }
303
304    /// Register a condition function for an edge
305    ///
306    /// Associates a condition function with conditional edges.
307    pub fn register_edge_condition(
308        &mut self,
309        from: &str,
310        condition_name: &str,
311        condition_fn: EdgeCondition<T>,
312    ) -> Result<()> {
313        // Find and update the edge(s)
314        for edge in self.graph.edges.iter_mut() {
315            if edge.from == from && edge.condition_name.as_deref() == Some(condition_name) {
316                edge.condition = Some(Arc::clone(&condition_fn));
317            }
318        }
319        Ok(())
320    }
321
322    /// Set the checkpointer for thread-scoped state persistence.
323    ///
324    /// When set, use `execute_with_thread(thread_id, state)` to load/save state per thread.
325    pub fn set_checkpointer(&mut self, checkpointer: Arc<MemoryCheckpointer>) -> &mut Self {
326        self.checkpointer = Some(checkpointer);
327        self
328    }
329
330    /// Builder-style: set the checkpointer.
331    pub fn with_checkpointer(mut self, checkpointer: Arc<MemoryCheckpointer>) -> Self {
332        self.checkpointer = Some(checkpointer);
333        self
334    }
335
336    /// Get the checkpointer if set.
337    pub fn checkpointer(&self) -> Option<Arc<MemoryCheckpointer>> {
338        self.checkpointer.clone()
339    }
340
341    /// Add middleware to the middleware pipeline
342    ///
343    /// Middleware will be executed in the order they are added during node execution.
344    /// Each middleware receives an ExecutionContext and can:
345    /// - Log/monitor execution
346    /// - Validate state
347    /// - Modify state before/after execution
348    /// - Skip nodes or abort execution
349    ///
350    /// # Arguments
351    /// - `middleware` - The middleware to add (must implement Middleware trait)
352    ///
353    /// # Example
354    /// ```ignore
355    /// # use flowgentra_ai::core::runtime::AgentRuntime;
356    /// # use flowgentra_ai::core::config::AgentConfig;
357    /// # use flowgentra_ai::core::middleware::LoggingMiddleware;
358    /// # use std::sync::Arc;
359    /// # let config = AgentConfig::from_file("config.yaml")?;
360    /// # let mut runtime = AgentRuntime::from_config(config)?;
361    /// runtime.add_middleware(Arc::new(LoggingMiddleware::new()));
362    /// # Ok::<(), Box<dyn std::error::Error>>(())
363    /// ```
364    pub fn add_middleware(
365        &mut self,
366        middleware: Arc<dyn crate::core::middleware::Middleware<T>>,
367    ) -> &mut Self {
368        self.middleware_pipeline.use_middleware(middleware);
369        self
370    }
371
372    /// Get a reference to the middleware pipeline
373    pub fn middleware_pipeline(&self) -> &MiddlewarePipeline<T> {
374        &self.middleware_pipeline
375    }
376
377    /// Get mutable reference to the middleware pipeline
378    pub fn middleware_pipeline_mut(&mut self) -> &mut MiddlewarePipeline<T> {
379        &mut self.middleware_pipeline
380    }
381
382    /// Add observability middleware (execution tracing, node timing, token usage, failure snapshots).
383    pub fn with_observability(&mut self) -> &mut Self {
384        let mw = ObservabilityMiddleware::new().with_agent_name(self.config.name.clone());
385        self.add_middleware(Arc::new(mw));
386        self
387    }
388
389    // =========================================================================
390    // Execution
391    // =========================================================================
392
393    /// Execute the agent graph from start to end.
394    ///
395    /// For thread-scoped persistence (resume, multi-turn), use `execute_with_thread` and
396    /// set a checkpointer via `set_checkpointer` or config.
397    pub async fn execute(&self, initial_state: T) -> Result<T> {
398        self.execute_impl(initial_state, None).await
399    }
400
401    /// Execute with a thread id for checkpointing. When a checkpointer is set, state is
402    /// loaded from the last checkpoint for this thread (if any) and saved after each node.
403    pub async fn execute_with_thread(&self, thread_id: &str, initial_state: T) -> Result<T> {
404        self.execute_impl(initial_state, Some(thread_id)).await
405    }
406
407    async fn execute_impl(&self, initial_state: T, thread_id: Option<&str>) -> Result<T> {
408        // == STATE CLONING NOTES ==
409        // This execution loop clones state at several points for correctness:
410        // 1. Parallel frontier execution: Arc-wrapped single clone for shared access
411        // 2. Middleware context: Clone for before/after hooks (they need mutable reference)
412        // 3. Node execution: Clone as handler takes state by value
413        //
414        // For typical workflows, these clones are acceptable and necessary.
415        // For large state objects (>1MB), consider:
416        // - SharedState: Zero-copy shared state via Arc<Mutex>
417        // - OptimizedState: Lazy cloning on mutation only
418        // See STATE_OPTIMIZATION_GUIDE.md for detailed strategies
419
420        let span = tracing::info_span!("agent_execution");
421        let _guard = span.enter();
422
423        tracing::info!("Starting agent execution");
424        let _timer = TimerGuard::start("total_execution");
425
426        // If checkpointer + thread_id: try load checkpoint; otherwise use initial_state
427        let mut state = if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
428            match cp.load(tid)? {
429                Some(checkpoint) => {
430                    let loaded = checkpoint.state()?;
431                    tracing::info!(thread_id = %tid, "Resumed from checkpoint");
432                    loaded
433                }
434                None => initial_state,
435            }
436        } else {
437            initial_state
438        };
439
440        // Validate initial state if schema is configured
441        self.config.validate_state(&state)?;
442
443        // Track execution path for checkpoint metadata
444        let mut execution_path: Vec<String> = Vec::new();
445
446        // Planner: max iterations (re-plan after each step)
447        let max_plan_steps = self.config.graph.planner.max_plan_steps;
448        let mut plan_steps: usize = 0;
449
450        // Recursion / step limit — prevents infinite loops in cyclic graphs.
451        let recursion_limit = self.config.graph.recursion_limit;
452        let mut total_steps: usize = 0;
453
454        // Start from START node
455        let mut current_nodes = self.graph.start_nodes().to_vec();
456        let mut iteration = 0;
457
458        while !current_nodes.is_empty() {
459            iteration += 1;
460            total_steps += current_nodes.len();
461
462            if total_steps > recursion_limit {
463                return Err(
464                    crate::core::error::FlowgentraError::RecursionLimitExceeded {
465                        limit: recursion_limit,
466                    },
467                );
468            }
469
470            let node_count = current_nodes.len();
471
472            tracing::debug!(iteration, node_count, total_steps, nodes = ?current_nodes, "Processing iteration");
473
474            let mut next_nodes = Vec::new();
475
476            // DAG-based parallel: when multiple nodes in frontier (and none are planner), run concurrently with tokio
477            let run_parallel = node_count > 1
478                && !current_nodes.iter().any(|n| {
479                    self.graph
480                        .get_node(n)
481                        .map(|node| node.is_planner)
482                        .unwrap_or(false)
483                });
484
485            if run_parallel {
486                // Run all frontier nodes concurrently (futures::join_all)
487                // ╔════════════════════════════════════════════════════════════════════════╗
488                // ║ CRITICAL PERFORMANCE: State Cloning in Parallel Execution              ║
489                // ├════════════════════════════════════════════════════════════════════════╣
490                // ║ Issue: clone() called per node × iterations (3 nodes × 10 steps = 30×) ║
491                // ║ Impact: 50-200ms overhead for state > 500KB                            ║
492                // ║ Solution: Use SharedState, NOT PlainState!                             ║
493                // ║                                                                        ║
494                // ║ SharedState.clone() = Cheap (Arc pointer, ~1μs)                       ║
495                // ║ PlainState.clone() = Expensive (full data, 5-20ms)                    ║
496                // ║                                                                        ║
497                // ║ Recommendation: For production, ALWAYS use SharedState                 ║
498                // ║ Example at: flowgentra-ai/examples/state_graph_react_agent.rs            ║
499                // ╚════════════════════════════════════════════════════════════════════════╝
500                let state_shared = Arc::new(state); // No clone - wrap as-is
501                let futures_with_names: Vec<_> = current_nodes
502                    .iter()
503                    .filter(|n| *n != "END")
504                    .map(|node_name| {
505                        let node = self
506                            .graph
507                            .get_node(node_name)
508                            .ok_or_else(|| FlowgentraError::NodeNotFound(node_name.clone()))?;
509                        // ⚠️ SharedState.clone() is cheap (Arc pointer)
510                        // ⚠️ PlainState.clone() is expensive (full JSON data copy)
511                        let state_in = (*state_shared).clone();
512                        let name = node_name.clone();
513                        let fut = node.execute(state_in);
514                        Ok::<_, FlowgentraError>((name, fut))
515                    })
516                    .collect::<Result<Vec<_>>>()?;
517
518                let task_futures: Vec<_> = futures_with_names
519                    .into_iter()
520                    .map(|(name, f)| {
521                        Box::pin(async move {
522                            let out = f.await?;
523                            Ok::<_, FlowgentraError>((name, out))
524                        })
525                            as std::pin::Pin<
526                                Box<dyn std::future::Future<Output = Result<(String, T)>> + Send>,
527                            >
528                    })
529                    .collect();
530
531                // Use `try_join_all` to short-circuit immediately on the first error
532                let results: Vec<(String, T)> = match try_join_all(task_futures).await {
533                    Ok(res) => res,
534                    Err(e) => {
535                        tracing::error!(error = ?e, "Parallel node execution failed early");
536                        return Err(FlowgentraError::ParallelExecutionError(e.to_string()));
537                    }
538                };
539
540                // Recover the original state from the Arc, then merge branch results into it
541                state = Arc::try_unwrap(state_shared).unwrap_or_else(|arc| arc.as_ref().clone());
542                for (node_name, branch_state) in &results {
543                    execution_path.push(node_name.clone());
544                    for (k, v) in branch_state.as_map() {
545                        state.set(k, v);
546                    }
547                }
548
549                if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
550                    let last = results.first().map(|(n, _)| n.clone());
551                    let meta = CheckpointMetadata {
552                        last_node: last,
553                        execution_path: execution_path.clone(),
554                        extra: HashMap::new(),
555                    };
556                    if let Err(e) = cp.save(tid, &state, &meta) {
557                        tracing::error!(error = %e, thread_id = %tid, "Failed to save checkpoint after parallel execution");
558                    }
559                }
560                tracing::debug!(nodes = ?current_nodes, "Parallel step completed");
561
562                // Collect next nodes from all branches (evaluate edges on merged state)
563                let mut seen = HashSet::new();
564                for node_name in &current_nodes {
565                    if node_name == "END" {
566                        continue;
567                    }
568                    for edge in self.graph.get_next_nodes(node_name) {
569                        if edge.to == "END" {
570                            tracing::info!(from = node_name, "Execution complete - reached END");
571                            if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
572                                let meta = CheckpointMetadata {
573                                    last_node: Some(node_name.clone()),
574                                    execution_path: execution_path.clone(),
575                                    extra: HashMap::new(),
576                                };
577                                if let Err(e) = cp.save(tid, &state, &meta) {
578                                    tracing::error!(error = %e, thread_id = %tid, "Failed to save final checkpoint");
579                                }
580                            }
581                            let _ = self.middleware_pipeline.execute_on_complete(&state).await;
582                            return Ok(state);
583                        }
584                        if let Ok(true) = edge.should_take(&state).await {
585                            let next = edge
586                                .get_next_node(&state)
587                                .await?
588                                .unwrap_or_else(|| edge.to.clone());
589                            if seen.insert(next.clone()) {
590                                next_nodes.push(next);
591                            }
592                        }
593                    }
594                }
595
596                if next_nodes.is_empty() {
597                    tracing::warn!("No next nodes after parallel step - stopping");
598                    break;
599                }
600                current_nodes = next_nodes;
601                continue;
602            }
603
604            for node_name in current_nodes {
605                if node_name == "END" {
606                    tracing::debug!("Reached END node");
607                    continue;
608                }
609
610                let node_span = tracing::debug_span!("node_execution", node = %node_name);
611                let _node_guard = node_span.enter();
612
613                // Execute the node
614                let node = self.graph.get_node(&node_name).ok_or_else(|| {
615                    tracing::error!(node = %node_name, "Node not found in graph");
616                    FlowgentraError::NodeNotFound(node_name.clone())
617                })?;
618
619                let _node_timer = TimerGuard::start(format!("node_{}", node_name));
620
621                // === MIDDLEWARE: before_node ===
622                // Note: state.clone() is necessary here because:
623                // 1. Middleware takes mutable reference and can modify state
624                // 2. Node handler takes state by value
625                // For large state objects (>1MB), consider using SharedState for zero-copy sharing
626                let start_time = Instant::now();
627                let mut middleware_ctx = MiddlewareContext::new(node_name.clone(), state.clone());
628
629                // Execute middleware before node
630                if let Err(e) = self
631                    .middleware_pipeline
632                    .execute_before_node(&mut middleware_ctx)
633                    .await
634                {
635                    tracing::error!(node = %node_name, error = ?e, "Middleware before_node failed");
636                    return Err(e);
637                }
638
639                state = middleware_ctx.state;
640
641                // Planner nodes: inject _current_node and _reachable_nodes before execution
642                if node.is_planner {
643                    state.set(
644                        "_current_node",
645                        serde_json::Value::String(node_name.clone()),
646                    );
647                    let reachable = self.graph.get_reachable_node_ids(&node_name);
648                    state.set("_reachable_nodes", serde_json::json!(reachable));
649                }
650
651                // Inject per-node MCP names so handlers can use get_node_mcp_client()
652                if !node.mcps.is_empty() {
653                    state.set("_node_mcps", serde_json::json!(node.mcps));
654                } else {
655                    state.remove("_node_mcps");
656                }
657
658                // Execute node - clone is necessary as handler takes State by value
659                match node.execute(state.clone()).await {
660                    Ok(new_state) => {
661                        state = new_state;
662
663                        // === MIDDLEWARE: after_node ===
664                        // Clone here is needed because middleware requires mutable reference
665                        let elapsed = start_time.elapsed();
666                        let mut after_ctx =
667                            MiddlewareContext::new(node_name.clone(), state.clone());
668                        after_ctx
669                            .metadata
670                            .insert("elapsed_ms".to_string(), elapsed.as_millis().to_string());
671
672                        if let Err(e) = self
673                            .middleware_pipeline
674                            .execute_after_node(&mut after_ctx)
675                            .await
676                        {
677                            tracing::error!(node = %node_name, error = ?e, "Middleware after_node failed");
678                            return Err(e);
679                        }
680
681                        state = after_ctx.state;
682
683                        // === INTERNAL RETRY LOOP ===
684                        // If evaluation middleware flagged this node for retry, re-execute
685                        // up to max_retries times (from config.evaluation).
686                        let retry_key = format!("__node_retry_needed__{}", node_name);
687                        let max_retries = self
688                            .config
689                            .evaluation
690                            .as_ref()
691                            .filter(|e| e.enabled)
692                            .map(|e| e.max_retries)
693                            .unwrap_or(0);
694
695                        let mut retry_count = 0u32;
696                        while retry_count < max_retries {
697                            if let Some(flag) = state.get(&retry_key) {
698                                if flag.as_bool().unwrap_or(false) {
699                                    retry_count += 1;
700                                    tracing::info!(
701                                        node = %node_name,
702                                        attempt = retry_count,
703                                        max_retries = max_retries,
704                                        "Retrying node due to low confidence"
705                                    );
706
707                                    // Clear the retry flag before re-execution
708                                    state.set(&retry_key, serde_json::json!(false));
709
710                                    // Re-execute the node
711                                    match node.execute(state.clone()).await {
712                                        Ok(new_state) => {
713                                            state = new_state;
714
715                                            // Run after_node middleware again to re-evaluate
716                                            let mut retry_ctx = MiddlewareContext::new(
717                                                node_name.clone(),
718                                                state.clone(),
719                                            );
720                                            if let Err(e) = self
721                                                .middleware_pipeline
722                                                .execute_after_node(&mut retry_ctx)
723                                                .await
724                                            {
725                                                tracing::error!(
726                                                    node = %node_name,
727                                                    error = ?e,
728                                                    "Middleware after_node failed during retry"
729                                                );
730                                                return Err(e);
731                                            }
732                                            state = retry_ctx.state;
733
734                                            // Check if retry is still needed
735                                            continue;
736                                        }
737                                        Err(e) => {
738                                            tracing::error!(
739                                                node = %node_name,
740                                                attempt = retry_count,
741                                                error = ?e,
742                                                "Node retry execution failed"
743                                            );
744                                            return Err(e.context(&format!(
745                                                "while retrying node '{}' (attempt {})",
746                                                node_name, retry_count
747                                            )));
748                                        }
749                                    }
750                                }
751                            }
752                            // No retry flag set — break
753                            break;
754                        }
755
756                        if retry_count >= max_retries && max_retries > 0 {
757                            if let Some(flag) = state.get(&retry_key) {
758                                if flag.as_bool().unwrap_or(false) {
759                                    tracing::warn!(
760                                        node = %node_name,
761                                        max_retries = max_retries,
762                                        "Maximum retries reached, proceeding with best output"
763                                    );
764                                    // Clear the flag so downstream code doesn't see a stale retry request
765                                    state.set(&retry_key, serde_json::json!(false));
766                                }
767                            }
768                        }
769
770                        execution_path.push(node_name.clone());
771                        // Checkpoint after each node when thread_id and checkpointer are set
772                        if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
773                            let meta = CheckpointMetadata {
774                                last_node: Some(node_name.clone()),
775                                execution_path: execution_path.clone(),
776                                extra: HashMap::new(),
777                            };
778                            if let Err(e) = cp.save(tid, &state, &meta) {
779                                tracing::warn!(error = ?e, "Checkpoint save failed");
780                            }
781                        }
782                        tracing::debug!(node = %node_name, elapsed_ms = elapsed.as_millis(), "Node execution completed");
783
784                        // Planner nodes: use _next_node from state instead of following edges
785                        if node.is_planner {
786                            plan_steps += 1;
787                            if plan_steps >= max_plan_steps {
788                                tracing::info!(plan_steps, "Max plan steps reached - stopping");
789                                break;
790                            }
791                            let next_node = state
792                                .get_string("_next_node")
793                                .unwrap_or_else(|| "END".to_string());
794                            if next_node == "END" {
795                                tracing::info!(from = %node_name, "Planner chose END - execution complete");
796                                if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
797                                    let meta = CheckpointMetadata {
798                                        last_node: Some(node_name.clone()),
799                                        execution_path: execution_path.clone(),
800                                        extra: HashMap::new(),
801                                    };
802                                    let _ = cp.save(tid, &state, &meta);
803                                }
804                                let _ = self.middleware_pipeline.execute_on_complete(&state).await;
805                                return Ok(state);
806                            }
807                            tracing::debug!(next_node = %next_node, "Planner chose next node");
808                            next_nodes.push(next_node);
809                            continue;
810                        }
811                    }
812                    Err(e) => {
813                        tracing::error!(node = %node_name, error = ?e, "Node execution failed");
814
815                        // === MIDDLEWARE: on_error ===
816                        let _elapsed = start_time.elapsed();
817                        let error_ctx = MiddlewareContext::new(node_name.clone(), state.clone());
818
819                        self.middleware_pipeline
820                            .execute_on_error(&node_name, &e, &error_ctx)
821                            .await;
822
823                        return Err(e.context(&format!("while executing node '{}'", node_name)));
824                    }
825                }
826
827                // Find next nodes
828                let outgoing = self.graph.get_next_nodes(&node_name);
829                tracing::trace!(node = %node_name, edge_count = outgoing.len(), "Evaluating outgoing edges");
830
831                for edge in outgoing {
832                    if edge.to == "END" {
833                        tracing::info!(from = %node_name, "Execution complete - reached END");
834                        execution_path.push(node_name.clone());
835                        if let (Some(cp), Some(tid)) = (&self.checkpointer, thread_id) {
836                            let meta = CheckpointMetadata {
837                                last_node: Some(node_name.clone()),
838                                execution_path: execution_path.clone(),
839                                extra: HashMap::new(),
840                            };
841                            if let Err(e) = cp.save(tid, &state, &meta) {
842                                tracing::warn!(error = ?e, "Checkpoint save failed");
843                            }
844                        }
845                        // === MIDDLEWARE: on_complete ===
846                        let _complete_result =
847                            self.middleware_pipeline.execute_on_complete(&state).await;
848
849                        return Ok(state);
850                    }
851
852                    // Check if edge condition is satisfied
853                    let edge_span = tracing::debug_span!(
854                        "edge_evaluation",
855                        from = %edge.from,
856                        to = %edge.to
857                    );
858                    let _edge_guard = edge_span.enter();
859
860                    match edge.should_take(&state).await {
861                        Ok(should_take) => {
862                            if should_take {
863                                tracing::debug!(
864                                    from = %edge.from,
865                                    to = %edge.to,
866                                    "Edge condition satisfied"
867                                );
868
869                                match edge.get_next_node(&state).await {
870                                    Ok(Some(next_node)) => {
871                                        next_nodes.push(next_node);
872                                    }
873                                    Ok(None) => {
874                                        next_nodes.push(edge.to.clone());
875                                    }
876                                    Err(e) => {
877                                        tracing::error!(error = ?e, "Failed to get next node");
878                                        return Err(e);
879                                    }
880                                }
881                            } else {
882                                tracing::trace!(
883                                    from = %edge.from,
884                                    to = %edge.to,
885                                    "Edge condition not satisfied"
886                                );
887                            }
888                        }
889                        Err(e) => {
890                            tracing::error!(error = ?e, "Error evaluating edge condition");
891                            return Err(e);
892                        }
893                    }
894                }
895            }
896
897            if next_nodes.is_empty() {
898                tracing::warn!("No next nodes found - execution stopping");
899                break;
900            }
901
902            current_nodes = next_nodes;
903        }
904
905        tracing::info!("Agent execution completed successfully");
906        Ok(state)
907    }
908
909    // =========================================================================
910    // Accessors
911    // =========================================================================
912
913    /// Get a reference to the graph for inspection
914    pub fn graph(&self) -> &Graph<T> {
915        &self.graph
916    }
917
918    /// Get mutable access to the graph
919    pub fn graph_mut(&mut self) -> &mut Graph<T> {
920        &mut self.graph
921    }
922
923    /// Get the configuration
924    pub fn config(&self) -> &AgentConfig {
925        &self.config
926    }
927
928    /// Get the LLM client
929    pub fn llm_client(&self) -> Arc<dyn LLMClient> {
930        Arc::clone(&self.llm_client)
931    }
932
933    /// Get an MCP client by name
934    pub fn mcp_client(&self, name: &str) -> Option<Arc<dyn MCPClient>> {
935        self.mcp_clients.get(name).map(Arc::clone)
936    }
937
938    // =========================================================================
939    // Visualization
940    // =========================================================================
941
942    /// Visualize the agent graph as a text file
943    ///
944    /// Generates a text-based representation of the graph structure.
945    /// This is useful for debugging and documentation.
946    ///
947    /// # Arguments
948    /// - `output_path`: Path where to save the visualization
949    ///
950    /// # Example
951    /// ```ignore
952    /// # use flowgentra_ai::core::runtime::AgentRuntime;
953    /// # use flowgentra_ai::core::config::AgentConfig;
954    /// # let config = AgentConfig::from_file("config.yaml")?;
955    /// # let runtime: AgentRuntime<SharedState> = AgentRuntime::from_config(config)?;
956    /// #[cfg(feature = "visualization")]
957    /// runtime.visualize_graph("agent_graph.txt")?;
958    /// # Ok::<(), Box<dyn std::error::Error>>(())
959    /// ```
960    pub fn visualize_graph(&self, output_path: &str) -> Result<()> {
961        // Use pure-Rust visualization module with improved defaults
962        let config = crate::core::utils::visualization::VisualizationConfig::new(output_path);
963
964        crate::core::utils::visualization::visualize_graph(&self.graph, config)?;
965
966        tracing::info!(output_path = %output_path, "Graph visualization completed");
967        println!("✓ Graph visualization saved to: {}", output_path);
968        Ok(())
969    }
970}
971
972// Sub-modules for runtime organization
973pub mod context;
974pub mod optimization;
975pub mod parallel;
976
977pub use optimization::{strategies, CloneStats, OptimizedState};
978pub use parallel::*;