Skip to main content

flowgentra_ai/core/agent/
mod.rs

1use tracing::info;
2// # Agent API - High-Level Interface
3//
4// The `Agent` struct provides a simple API for creating and running agents.
5// Most users interact with this module through the `from_config_path()` function
6// which uses automatic handler discovery.
7//
8// ## Quick Start
9//
10// 1. **Decorate your handlers** with `#[register_handler]`
11// 2. **Create a config.yaml** with your agent graph
12// 3. **Use `from_config_path()`** to create and run the agent
13//
14// ```ignore
15// use flowgentra_ai::prelude::*;
16// use serde_json::json;
17//
18// #[register_handler]
19// pub async fn my_handler(mut state: State) -> Result<State> {
20//     let input = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
21//     state.set("result", json!(input.to_uppercase()));
22//     Ok(state)
23// }
24//
25// #[tokio::main]
26// async fn main() -> Result<()> {
27//     // Auto-discovers all #[register_handler] functions
28//     let mut agent = from_config_path("config.yaml")?;
29//
30//     let mut state = State::new(Default::default());
31//     state.set("input", json!("hello world"));
32//
33//     let result = agent.run(state).await?;
34//     println!("Done: {}", result.to_json_string()?);
35//     Ok(())
36// }
37// ```
38//
39// ## Handler Registration
40//
41// Handlers are automatically registered via the `#[register_handler]` attribute macro.
42/// Handler names must match the function name and be referenced by that name in your config.yaml.
43pub(crate) use crate::core::config::AgentConfig;
44pub(crate) use crate::core::error::{FlowgentraError, Result};
45pub(crate) use crate::core::llm::{create_llm_client, LLMClient};
46pub(crate) use crate::core::memory::{
47    ConversationMemory, InMemoryConversationMemory, MemoryCheckpointer,
48};
49pub(crate) use crate::core::runtime::AgentRuntime;
50pub(crate) use crate::core::state::SharedState;
51use std::collections::HashMap;
52use std::sync::Arc;
53// Use inventory for auto-registration of handlers - collected dynamically at runtime
54inventory::collect!(HandlerEntry);
55
56// =============================================================================
57// Auto-Registration via Inventory
58// =============================================================================
59
60/// Entry for a handler in the global inventory
61/// Handlers submit themselves to this list for auto-registration
62pub struct HandlerEntry {
63    /// Name of the handler (matches config node names)
64    pub name: String,
65    /// The handler function (always uses SharedState)
66    pub handler: ArcHandler<SharedState>,
67}
68
69impl HandlerEntry {
70    /// Create a new handler entry for auto-registration
71    pub fn new(name: impl Into<String>, handler: ArcHandler<SharedState>) -> HandlerEntry {
72        HandlerEntry {
73            name: name.into(),
74            handler,
75        }
76    }
77}
78
79// Type for Arc-wrapped handlers (used by inventory auto-registration)
80pub type ArcHandler<T> = Arc<
81    dyn Fn(T) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>>
82        + Send
83        + Sync,
84>;
85
86// =============================================================================
87// Type Aliases
88// =============================================================================
89
90/// Type signature for handler functions
91///
92/// A handler receives the current state and returns the updated state.
93/// Handlers are async and can perform I/O operations.
94///
95/// # Example
96/// ```no_run
97/// use flowgentra_ai::core::agent::Handler;
98/// use flowgentra_ai::core::state::SharedState;
99/// use serde_json::json;
100///
101/// let my_handler: Handler<SharedState> = Box::new(|state| {
102///     Box::pin(async move {
103///         let input = state.get("input");
104///         state.set("output", json!("processed"));
105///         Ok(state)
106///     })
107/// });
108/// ```
109pub type Handler<T> = Box<
110    dyn Fn(T) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>>
111        + Send
112        + Sync,
113>;
114
115/// Type signature for condition functions
116///
117/// A condition function evaluates the current state and returns a boolean.
118/// Used for branching logic in the graph.
119///
120/// # Example
121/// ```no_run
122/// use flowgentra_ai::core::agent::Condition;
123/// use flowgentra_ai::core::state::SharedState;
124///
125/// let is_complex: Condition<SharedState> = Box::new(|state: &SharedState| {
126///     state.get("complexity_score")
127///         .and_then(|v| v.as_i64())
128///         .map(|score| score > 50)
129///         .unwrap_or(false)
130/// });
131/// ```
132pub type Condition<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
133
134/// Registry mapping handler names to handler functions
135pub type HandlerRegistry<T> = HashMap<String, Handler<T>>;
136
137/// Registry mapping condition names to condition functions
138pub type ConditionRegistry<T> = HashMap<String, Condition<T>>;
139
140// =============================================================================
141// Agent - Main API
142// =============================================================================
143
144/// The Agent - your main interface to FlowgentraAI
145///
146/// Create an agent from a YAML config and handler implementations,
147/// then run it with a state to get results.
148///
149/// The Agent handles:
150/// - Loading and validating configuration
151/// - Registering handlers and conditions
152/// - Managing the execution runtime
153/// - Orchestrating node execution
154/// - Optional checkpointer and conversation memory (from config or programmatic)
155pub struct Agent {
156    runtime: AgentRuntime<SharedState>,
157    llm_client: Arc<dyn LLMClient>,
158    config: AgentConfig,
159    /// Current state of the agent (initialized from state_schema)
160    pub state: SharedState,
161    /// Optional conversation memory (message history per thread). Set via config or with_conversation_memory().
162    conversation_memory: Option<Arc<dyn ConversationMemory>>,
163}
164
165impl Agent {
166    /// Log agent startup and configuration for observability
167    pub fn log_startup(&self) {
168        info!(
169            "Agent '{}' starting with config: {:?}",
170            self.config.name, self.config
171        );
172    }
173    /// Create an agent from a YAML config file
174    ///
175    /// This is the main entry point for users. It:
176    /// 1. Loads the YAML config file
177    /// 2. Validates the graph structure
178    /// 3. Creates the runtime with your handlers and conditions
179    /// 4. Sets up the LLM client
180    ///
181    /// # Arguments
182    /// - `config_path`: Path to your `config.yaml` file
183    /// - `handlers`: Registry of handler functions
184    /// - `conditions`: Registry of condition functions
185    ///
186    /// # Example
187    /// ```no_run
188    /// use flowgentra_ai::prelude::*;
189    /// use std::collections::HashMap;
190    ///
191    /// #[tokio::main]
192    /// async fn main() -> Result<()> {
193    ///     let handlers = HashMap::new();
194    ///     let conditions = HashMap::new();
195    ///     let mut agent = Agent::from_config("config.yaml", handlers, conditions)?;
196    ///     Ok(())
197    /// }
198    /// ```
199    pub fn from_config(
200        config_path: &str,
201        handlers: HandlerRegistry<SharedState>,
202        conditions: ConditionRegistry<SharedState>,
203    ) -> Result<Self> {
204        let config = AgentConfig::from_file(config_path)?;
205        config.validate()?;
206        Self::from_config_inner(config, handlers, conditions)
207    }
208
209    /// Create an agent from an already-loaded [`AgentConfig`].
210    ///
211    /// Use this when you have a config object in hand (e.g. from `from_config_path`
212    /// which already loaded the file) to avoid reading the file a second time.
213    pub fn from_config_inner(
214        config: AgentConfig,
215        handlers: HandlerRegistry<SharedState>,
216        conditions: ConditionRegistry<SharedState>,
217    ) -> Result<Self> {
218        // Create runtime
219        let mut runtime = AgentRuntime::<SharedState>::from_config(config.clone())?;
220
221        // Create LLM client
222        let llm_client = create_llm_client(&config.llm)?;
223
224        // Convert all handlers to Arc (cloneable) so multiple nodes can share a handler
225        let arc_handlers: HashMap<String, ArcHandler<SharedState>> = handlers
226            .into_iter()
227            .map(|(name, handler)| {
228                let arc: ArcHandler<SharedState> = Arc::new(move |state| handler(state));
229                (name, arc)
230            })
231            .collect();
232
233        // Register handlers by mapping each node config to its handler function.
234        // Looks up by node.name first (from from_config_path, which pre-resolves by node name),
235        // then by node.handler (from manual Agent::from_config, which uses handler names).
236        let mut missing_handlers = Vec::new();
237
238        // Built-in node types that don't require a user-supplied handler
239        const BUILTIN_TYPES: &[&str] = &[
240            "evaluation",
241            "retry",
242            "timeout",
243            "loop",
244            "planner",
245            "human_in_the_loop",
246            "memory",
247            // supervisor (+ backwards-compat alias)
248            "supervisor",
249            "orchestrator",
250            // subgraph (+ backwards-compat aliases)
251            "subgraph",
252            "agent",
253            "agent_or_graph",
254        ];
255
256        // Supervisor-managed children are not in the runtime graph — skip registration.
257        let supervisor_children: std::collections::HashSet<String> = config
258            .graph
259            .nodes
260            .iter()
261            .filter(|n| {
262                matches!(
263                    n.node_type.as_deref(),
264                    Some("supervisor") | Some("orchestrator")
265                )
266            })
267            .flat_map(|n| {
268                n.config
269                    .get("children")
270                    .and_then(|v| v.as_array())
271                    .map(|arr| {
272                        arr.iter()
273                            .filter_map(|v| v.as_str().map(String::from))
274                            .collect::<Vec<_>>()
275                    })
276                    .unwrap_or_default()
277            })
278            .collect();
279
280        for node in &config.graph.nodes {
281            if node.name == "START" || node.name == "END" {
282                continue;
283            }
284            if supervisor_children.contains(&node.name) {
285                continue;
286            }
287            if node.handler.starts_with("builtin::") {
288                continue;
289            }
290            let arc_handler = arc_handlers
291                .get(&node.name)
292                .or_else(|| arc_handlers.get(&node.handler))
293                .cloned();
294
295            // Standalone built-in type with no user-supplied handler — only skip when
296            // from_config_path did NOT pre-build a handler for this node.
297            // If a handler IS present (e.g. the planner built by from_config_path), always
298            // register it so the placeholder function is replaced.
299            let is_builtin_standalone = node.handler.is_empty()
300                && node
301                    .node_type
302                    .as_deref()
303                    .is_some_and(|t| BUILTIN_TYPES.contains(&t))
304                && arc_handler.is_none();
305            if is_builtin_standalone {
306                continue;
307            }
308
309            match arc_handler {
310                Some(h) => {
311                    runtime
312                        .register_node(&node.name, Box::new(move |state| h(state)))
313                        .map_err(|e| {
314                            if matches!(e, FlowgentraError::NodeNotFound(_)) {
315                                FlowgentraError::NodeNotFound(format!(
316                                    "Node '{}' not found in graph. Check your config.",
317                                    node.name
318                                ))
319                            } else {
320                                e
321                            }
322                        })?;
323                }
324                None => {
325                    missing_handlers.push(node.handler.clone());
326                }
327            }
328        }
329
330        if !missing_handlers.is_empty() {
331            let available_list = if arc_handlers.is_empty() {
332                "(none registered)".to_string()
333            } else {
334                arc_handlers
335                    .keys()
336                    .map(|k| format!("'{}'", k))
337                    .collect::<Vec<_>>()
338                    .join(", ")
339            };
340
341            return Err(FlowgentraError::ConfigError(format!(
342                "Configuration references unknown handler(s): {}.\nAvailable handlers: {}\nMake sure to #[register_handler] these functions in your code.",
343                missing_handlers.iter()
344                    .map(|h| format!("'{}'", h))
345                    .collect::<Vec<_>>()
346                    .join(", "),
347                available_list
348            )));
349        }
350
351        // Register all conditions
352        type EdgeConditionFn = std::sync::Arc<
353            dyn Fn(
354                    &SharedState,
355                )
356                    -> std::result::Result<Option<String>, crate::core::error::FlowgentraError>
357                + Send
358                + Sync,
359        >;
360        for (condition_name, condition_fn) in conditions {
361            let cond_name_clone = condition_name.clone();
362            let edge_condition: EdgeConditionFn =
363                std::sync::Arc::new(move |state: &SharedState| {
364                    if condition_fn(state) {
365                        Ok(Some(cond_name_clone.clone()))
366                    } else {
367                        Ok(None)
368                    }
369                });
370
371            let edges_to_register: Vec<String> = runtime
372                .graph()
373                .edges
374                .iter()
375                .filter(|e| e.condition_name.as_deref() == Some(condition_name.as_str()))
376                .map(|e| e.from.clone())
377                .collect();
378
379            for from_node in edges_to_register {
380                runtime.register_edge_condition(
381                    &from_node,
382                    &condition_name,
383                    edge_condition.clone(),
384                )?;
385            }
386        }
387
388        let initial_state = config.create_initial_state();
389        Ok(Agent {
390            runtime,
391            llm_client,
392            config,
393            state: initial_state,
394            conversation_memory: None,
395        })
396    }
397
398    /// Set the checkpointer (e.g. for thread-scoped state persistence). Can also be set via config.yaml `memory.checkpointer`.
399    pub fn set_checkpointer(&mut self, checkpointer: Arc<MemoryCheckpointer>) -> &mut Self {
400        self.runtime.set_checkpointer(checkpointer);
401        self
402    }
403
404    /// Builder-style: set the checkpointer.
405    pub fn with_checkpointer(mut self, checkpointer: Arc<MemoryCheckpointer>) -> Self {
406        self.runtime.set_checkpointer(checkpointer);
407        self
408    }
409
410    /// Set conversation memory (message history per thread). Can also be set via config.yaml `memory.conversation`.
411    pub fn set_conversation_memory(&mut self, memory: Arc<dyn ConversationMemory>) -> &mut Self {
412        self.conversation_memory = Some(memory);
413        self
414    }
415
416    /// Builder-style: set conversation memory.
417    pub fn with_conversation_memory(mut self, memory: Arc<dyn ConversationMemory>) -> Self {
418        self.conversation_memory = Some(memory);
419        self
420    }
421
422    /// Get conversation memory if set (for use in handlers to add/get messages).
423    pub fn conversation_memory(&self) -> Option<Arc<dyn ConversationMemory>> {
424        self.conversation_memory.clone()
425    }
426
427    /// Execute the agent with its current state
428    ///
429    /// Runs the agent through all nodes following the edges until completion.
430    /// Uses the agent's built-in state (initialized from state_schema).
431    /// Automatically injects the LLM configuration into state so handlers can access it.
432    ///
433    /// Set state values before calling:
434    /// ```no_run
435    /// use flowgentra_ai::prelude::*;
436    /// use serde_json::json;
437    ///
438    /// #[tokio::main]
439    /// async fn main() -> Result<()> {
440    ///     let mut agent = from_config_path("config.yaml")?;
441    ///     
442    ///     // Set initial values on agent.state
443    ///     agent.state.set("input", json!("Say hello"));
444    ///     
445    ///     let result_state = agent.run().await?;
446    ///     println!("Done! Result: {}", result_state.to_json_string()?);
447    ///     
448    ///     Ok(())
449    /// }
450    /// ```
451    pub async fn run(&mut self) -> Result<SharedState> {
452        // Automatically inject LLM config into state for handler access
453        let llm_config_json =
454            serde_json::to_value(&self.config.llm).unwrap_or_else(|_| serde_json::json!({}));
455        self.state.set("_llm_config", llm_config_json); // Ensure set() is defined in State
456
457        // Automatically inject MCP configs into state for handler access
458        if !self.config.graph.mcps.is_empty() {
459            let mcp_configs_json = serde_json::to_value(&self.config.graph.mcps)
460                .unwrap_or_else(|_| serde_json::json!({}));
461            self.state.set("_mcp_configs", mcp_configs_json);
462        }
463
464        // Automatically inject RAG config into state for handler access
465        if let Some(ref rag_config) = self.config.graph.rag {
466            let mut resolved = rag_config.clone();
467            resolved.resolve_env_vars();
468            let rag_config_json =
469                serde_json::to_value(&resolved).unwrap_or_else(|_| serde_json::json!({}));
470            self.state.set("_rag_config", rag_config_json);
471        }
472
473        self.runtime.execute(self.state.clone()).await
474    }
475
476    /// Run with a thread id for checkpointing and conversation memory. When a checkpointer is set,
477    /// state is loaded from the last checkpoint for this thread (if any) and saved after each node.
478    /// Use the same thread_id with conversation_memory to get/add messages for this conversation.
479    /// Automatically injects the LLM configuration into state so handlers can access it.
480    pub async fn run_with_thread(&mut self, thread_id: &str) -> Result<SharedState> {
481        // Automatically inject LLM config into state for handler access
482        let llm_config_json =
483            serde_json::to_value(&self.config.llm).unwrap_or_else(|_| serde_json::json!({}));
484        self.state.set("_llm_config", llm_config_json);
485
486        // Automatically inject MCP configs into state for handler access
487        if !self.config.graph.mcps.is_empty() {
488            let mcp_configs_json = serde_json::to_value(&self.config.graph.mcps)
489                .unwrap_or_else(|_| serde_json::json!({}));
490            self.state.set("_mcp_configs", mcp_configs_json);
491        }
492
493        // Automatically inject RAG config into state for handler access
494        if let Some(ref rag_config) = self.config.graph.rag {
495            let mut resolved = rag_config.clone();
496            resolved.resolve_env_vars();
497            let rag_config_json =
498                serde_json::to_value(&resolved).unwrap_or_else(|_| serde_json::json!({}));
499            self.state.set("_rag_config", rag_config_json);
500        }
501
502        self.runtime
503            .execute_with_thread(thread_id, self.state.clone())
504            .await
505    }
506
507    /// Get the LLM client for use in handlers
508    ///
509    /// Handlers can use this to access the configured LLM provider.
510    pub fn llm_client(&self) -> Arc<dyn LLMClient> {
511        Arc::clone(&self.llm_client)
512    }
513
514    /// Get a reference to the agent's configuration
515    pub fn config(&self) -> &AgentConfig {
516        &self.config
517    }
518
519    // Visualize the agent's execution graph
520    //
521    // Generates a text-based or graphical representation of your agent's workflow.
522    // Useful for debugging and documentation.
523    //
524    // visualize_graph requires type parameter T to be known, which is not available in Agent context
525    // pub async fn visualize_graph(&self, output_path: &str) -> Result<()> {
526    //     self.runtime.visualize_graph(output_path)
527    // }
528
529    /// Get mutable access to the underlying runtime
530    ///
531    /// For advanced users who need direct runtime access.
532    pub fn runtime_mut(&mut self) -> &mut AgentRuntime<SharedState> {
533        &mut self.runtime
534    }
535
536    // ==============================================
537    // Memory Management API
538    // ==============================================
539
540    /// Initialize message history in state for memory
541    ///
542    /// Creates an empty message history that handlers can append to.
543    ///
544    /// # Example (Code API)
545    /// ```ignore
546    /// let mut agent = from_config_path("config.yaml")?;
547    /// agent.enable_message_history()?;  // Just add this line!
548    /// ```
549    ///
550    /// # Example (Config approach - just add to state_schema)
551    /// ```yaml
552    /// state_schema:
553    ///   messages:
554    ///     type: array
555    ///     description: "Message history"
556    /// ```
557    pub fn enable_message_history(&mut self) -> Result<()> {
558        use crate::core::state::MessageHistory;
559        let history = MessageHistory::new();
560        history.save_to_state(&self.state)?;
561        Ok(())
562    }
563
564    /// Add a memory handler node to the graph
565    ///
566    /// Allows programmatic addition of memory handlers without editing config.
567    /// Handlers: "memory::append_message", "memory::compress_history", "memory::clear_history"
568    ///
569    /// # Example
570    /// ```ignore
571    /// agent.add_memory_handler("append", "memory::append_message")?;
572    /// agent.add_memory_handler("compress", "memory::compress_history")?;
573    /// ```
574    pub fn add_memory_handler(&mut self, node_name: &str, handler_type: &str) -> Result<()> {
575        // Register the memory handler in the runtime
576        let handler: Handler<SharedState> = match handler_type {
577            "memory::append_message" => Box::new(|state| {
578                Box::pin(crate::core::node::memory_handlers::append_message_handler(
579                    state,
580                ))
581            }),
582            "memory::compress_history" => Box::new(|state| {
583                Box::pin(crate::core::node::memory_handlers::compress_history_handler(state))
584            }),
585            "memory::clear_history" => Box::new(|state| {
586                Box::pin(crate::core::node::memory_handlers::clear_history_handler(
587                    state,
588                ))
589            }),
590            "memory::get_message_count" => Box::new(|state| {
591                Box::pin(crate::core::node::memory_handlers::get_message_count_handler(state))
592            }),
593            "memory::format_history_for_context" => Box::new(|state| {
594                Box::pin(
595                    crate::core::node::memory_handlers::format_history_for_context_handler(state),
596                )
597            }),
598            _ => {
599                return Err(FlowgentraError::ValidationError(format!(
600                    "Unknown memory handler: {}",
601                    handler_type
602                )))
603            }
604        };
605
606        self.runtime.register_node(node_name, handler)?;
607
608        Ok(())
609    }
610
611    /// Get message history helper
612    ///
613    /// Convenience method to work with message history
614    ///
615    /// # Example
616    /// ```ignore
617    /// let mut history = agent.get_message_history()?;
618    /// history.add_user_message("Hello!");
619    /// history.save_to_state(&agent.state)?;
620    /// ```
621    pub fn get_message_history(&self) -> Result<crate::core::state::MessageHistory> {
622        crate::core::state::MessageHistory::from_state(&self.state)
623    }
624
625    /// Set message history from user messages
626    ///
627    /// Useful for loading existing conversation or initializing with context
628    ///
629    /// # Example
630    /// ```ignore
631    /// let messages = vec![
632    ///     ("user", "What is Rust?"),
633    ///     ("assistant", "Rust is a systems programming language..."),
634    /// ];
635    /// for (role, content) in &messages {
636    ///     agent.add_message(role, content)?;
637    /// }
638    /// ```
639    pub fn add_message(&mut self, role: &str, content: &str) -> Result<()> {
640        let mut history = self.get_message_history()?;
641        match role {
642            "user" => history.add_user_message(content),
643            "assistant" => history.add_assistant_message(content),
644            "system" => history.add_system_message(content),
645            _ => {
646                return Err(FlowgentraError::ValidationError(format!(
647                    "Invalid role: {}",
648                    role
649                )))
650            }
651        }
652        history.save_to_state(&self.state)?;
653        Ok(())
654    }
655
656    /// Clear all messages from history
657    ///
658    /// Useful for resetting conversation state
659    pub fn clear_messages(&mut self) -> Result<()> {
660        let history = crate::core::state::MessageHistory::new();
661        history.save_to_state(&self.state)?;
662        Ok(())
663    }
664
665    /// Get custom state field helper
666    ///
667    /// For Pattern 4: Custom State Fields
668    pub fn custom_state(&self) -> Result<crate::core::state::CustomState> {
669        crate::core::state::CustomState::from_state(&self.state)
670    }
671
672    /// Set a custom state field
673    pub fn set_custom_field(&mut self, key: &str, value: serde_json::Value) -> Result<()> {
674        let mut custom = self.custom_state()?;
675        custom.set(key, value);
676        custom.save_to_state(&self.state)?;
677        Ok(())
678    }
679}
680
681/// Create an agent from config path only - handlers are auto-discovered!
682///
683/// Handlers registered via `#[register_handler]` attribute are automatically collected
684/// and available to the agent builder. No manual registration needed!
685///
686/// # Arguments
687/// - `config_path`: Path to your `config.yaml` file
688///
689/// # Returns
690/// Result with Agent if successful, error if any handlers are missing
691///
692/// # Error Messages
693/// The function will provide helpful error messages if handlers are missing,
694/// showing which handlers are required by the config but not registered.
695///
696/// # Example
697/// ```ignore
698/// use flowgentra_ai::prelude::*;
699///
700/// #[register_handler]
701/// pub async fn my_handler(state: State) -> Result<State> {
702///     state.set("output", json!("done"));
703///     Ok(state)
704/// }
705///
706/// #[tokio::main]
707/// async fn main() -> Result<()> {
708///     // That's it! Just create the agent from config
709///     let mut agent = from_config_path("config.yaml")?;
710///     
711///     let state = State::new(Default::default());
712///     let result = agent.run(state).await?;
713///     Ok(())
714/// }
715/// ```
716pub fn from_config_path(config_path: &str) -> Result<Agent> {
717    // Load config to get required node names
718    let mut config = AgentConfig::from_file(config_path)?;
719    config.validate()?;
720
721    // Resolve relative paths in subgraph configs against the parent config file's directory.
722    // This ensures `path: agents/researcher.yaml` in config.yaml correctly resolves to
723    // `<config_dir>/agents/researcher.yaml` regardless of the process CWD.
724    let config_dir = std::path::Path::new(config_path)
725        .parent()
726        .filter(|p| !p.as_os_str().is_empty())
727        .unwrap_or(std::path::Path::new("."))
728        .to_path_buf();
729
730    // For stdio MCPs: resolve working_dir and command path.
731    // - Default working_dir to config file's directory so relative script paths work
732    // - Resolve command via PATH if it's not an absolute path
733    for mcp in config.graph.mcps.values_mut() {
734        if mcp.connection_type == crate::core::mcp::MCPConnectionType::Stdio {
735            if mcp.connection_settings.working_dir.is_none() {
736                mcp.connection_settings.working_dir =
737                    Some(config_dir.to_string_lossy().to_string());
738            }
739            // Resolve command to absolute path if needed (e.g. "python" → "/usr/bin/python")
740            let cmd = mcp.stdio_command().to_string();
741            if !std::path::Path::new(&cmd).is_absolute() {
742                let mut resolved = String::new();
743
744                // Try system lookup (where on Windows, which on Unix)
745                let lookup_cmd = if cfg!(windows) { "where" } else { "which" };
746                if let Ok(output) = std::process::Command::new(lookup_cmd).arg(&cmd).output() {
747                    if output.status.success() {
748                        resolved = String::from_utf8_lossy(&output.stdout)
749                            .lines()
750                            .next()
751                            .unwrap_or("")
752                            .trim()
753                            .to_string();
754                    }
755                }
756
757                // On Windows, also search common installation paths
758                #[cfg(windows)]
759                if resolved.is_empty() && (cmd == "python" || cmd == "python3") {
760                    let candidates = [
761                        // Standard Python installer paths
762                        "C:\\Python312\\python.exe".to_string(),
763                        "C:\\Python311\\python.exe".to_string(),
764                        "C:\\Python310\\python.exe".to_string(),
765                        "C:\\Python39\\python.exe".to_string(),
766                        // Program Files
767                        "C:\\Program Files\\Python312\\python.exe".to_string(),
768                        "C:\\Program Files\\Python311\\python.exe".to_string(),
769                        "C:\\Program Files\\Python310\\python.exe".to_string(),
770                        "C:\\Program Files\\Python39\\python.exe".to_string(),
771                    ];
772                    if let Ok(home) = std::env::var("USERPROFILE") {
773                        let user_candidates = [
774                            format!(
775                                "{}\\AppData\\Local\\Programs\\Python\\Python312\\python.exe",
776                                home
777                            ),
778                            format!(
779                                "{}\\AppData\\Local\\Programs\\Python\\Python311\\python.exe",
780                                home
781                            ),
782                            format!(
783                                "{}\\AppData\\Local\\Programs\\Python\\Python310\\python.exe",
784                                home
785                            ),
786                        ];
787                        for c in user_candidates.iter().chain(candidates.iter()) {
788                            if std::path::Path::new(c).exists() {
789                                resolved = c.clone();
790                                break;
791                            }
792                        }
793                    } else {
794                        for c in &candidates {
795                            if std::path::Path::new(c).exists() {
796                                resolved = c.clone();
797                                break;
798                            }
799                        }
800                    }
801                }
802
803                if !resolved.is_empty() {
804                    tracing::info!(command = %cmd, resolved = %resolved, "Resolved stdio command path");
805                    if mcp.command.is_some() {
806                        mcp.command = Some(resolved);
807                    } else {
808                        mcp.uri = resolved;
809                    }
810                }
811            }
812        }
813    }
814
815    // Collect all registered handlers from inventory (ArcHandler = cloneable)
816    let mut handlers_map: HashMap<String, ArcHandler<SharedState>> = HashMap::new();
817    for entry in inventory::iter::<HandlerEntry> {
818        handlers_map.insert(entry.name.clone(), entry.handler.clone());
819    }
820
821    // Inject builtin::planner into handlers_map for backward compatibility
822    // (when a node uses handler: "builtin::planner" instead of type: "planner")
823    let has_legacy_planner = config
824        .graph
825        .nodes
826        .iter()
827        .any(|n| n.handler == "builtin::planner");
828    let has_planner_type = config
829        .graph
830        .nodes
831        .iter()
832        .any(|n| n.node_type.as_deref() == Some("planner"));
833    if has_legacy_planner || has_planner_type {
834        let llm_client = config.create_llm_client()?;
835        let prompt_template = config.graph.planner.prompt_template.clone();
836        let planner_fn = Arc::new(crate::core::node::planner::create_planner_handler(
837            llm_client,
838            prompt_template,
839        ));
840        let arc_handler: ArcHandler<SharedState> =
841            Arc::new(move |state| planner_fn.as_ref()(state));
842        handlers_map.insert("__builtin_planner__".to_string(), arc_handler);
843    }
844
845    // Build handler registry keyed by NODE NAME.
846    // Dispatch based on node type — every built-in type is detected here.
847    let mut node_handlers: HandlerRegistry<SharedState> = HashMap::new();
848    let mut missing_handlers: Vec<String> = Vec::new();
849
850    // Helper: look up a handler by name, or record it as missing
851    macro_rules! require_handler {
852        ($name:expr) => {
853            match handlers_map.get($name) {
854                Some(h) => h.clone(),
855                None => {
856                    missing_handlers.push($name.to_string());
857                    continue;
858                }
859            }
860        };
861    }
862
863    for node_config in &config.graph.nodes {
864        if node_config.name == "START" || node_config.name == "END" {
865            continue;
866        }
867        // Supervisor nodes are built in a second pass (need child handlers to exist first).
868        // Subgraph nodes are self-contained and built in the second pass too.
869        match node_config.node_type.as_deref() {
870            Some("supervisor") | Some("orchestrator") => continue,
871            Some("subgraph") | Some("agent") | Some("agent_or_graph") => continue,
872            _ => {}
873        }
874
875        let node_name = node_config.name.clone();
876        let handler = match node_config.node_type.as_deref() {
877            // ── Evaluation: loop until confident ───────────────────────────
878            // Standalone (no handler): scores the current state field and writes
879            // evaluation metadata. Designed to be used as a graph node with back-edges.
880            // Wrapping mode (handler provided): calls the handler repeatedly until
881            // min_confidence is reached or max_retries is exhausted.
882            Some("evaluation") => {
883                use crate::core::node::evaluation_node::EvaluationNodeConfig;
884                let cfg = EvaluationNodeConfig::from_node_config(node_config)?;
885                if node_config.handler.is_empty() {
886                    create_evaluation_standalone_handler(cfg)
887                } else {
888                    let arc = require_handler!(&node_config.handler);
889                    wrap_handler_with_evaluation(arc, cfg)
890                }
891            }
892
893            // ── Retry: exponential-backoff retry management ─────────────────
894            // Standalone: manages __retry_count__ and __retry_should_retry__ in
895            // state. Pair with a back-edge and condition on __retry_should_retry__.
896            // Wrapping mode: re-calls the handler on error automatically.
897            Some("retry") => {
898                use crate::core::node::nodes_trait::RetryNodeConfig;
899                let cfg = RetryNodeConfig::from_node_config(node_config)?;
900                if node_config.handler.is_empty() {
901                    create_retry_standalone_handler(cfg)
902                } else {
903                    let arc = require_handler!(&node_config.handler);
904                    wrap_handler_with_retry(arc, cfg)
905                }
906            }
907
908            // ── Timeout: wall-clock deadline tracking ───────────────────────
909            // Standalone: sets __timeout_deadline__ on first visit, checks it on
910            // subsequent visits. Sets __timeout_timed_out__ for routing.
911            // Wrapping mode: aborts the handler if it exceeds the duration.
912            Some("timeout") => {
913                use crate::core::node::nodes_trait::TimeoutNodeConfig;
914                let cfg = TimeoutNodeConfig::from_node_config(node_config)?;
915                if node_config.handler.is_empty() {
916                    create_timeout_standalone_handler(cfg)
917                } else {
918                    let arc = require_handler!(&node_config.handler);
919                    wrap_handler_with_timeout(arc, cfg)
920                }
921            }
922
923            // ── Loop: iteration counter management ──────────────────────────
924            // Standalone: tracks __loop_iteration__ and __loop_continue__ in state.
925            // Pair with a back-edge conditioned on __loop_continue__.
926            // Wrapping mode: runs the handler up to max_iterations times inline.
927            Some("loop") => {
928                use crate::core::node::advanced_nodes::LoopNodeConfig;
929                let cfg = LoopNodeConfig::from_node_config(node_config)?;
930                if node_config.handler.is_empty() {
931                    create_loop_standalone_handler(cfg)
932                } else {
933                    let arc = require_handler!(&node_config.handler);
934                    wrap_handler_with_loop(arc, cfg)
935                }
936            }
937
938            // ── Planner: LLM-driven next-node selection (no user handler) ───
939            // `type: planner` always uses the builtin planner regardless of the handler field.
940            // `handler: "builtin::planner"` on a typeless node is the legacy spelling.
941            // NOTE: the guard `if` on a `|` pattern applies to ALL alternatives, so these
942            // two cases must be separate arms to avoid the guard blocking `Some("planner")`.
943            Some("planner") => {
944                let arc = handlers_map
945                    .get("__builtin_planner__")
946                    .cloned()
947                    .expect("planner was pre-injected");
948                Box::new(move |state| arc(state))
949            }
950            None if node_config.handler == "builtin::planner" => {
951                let arc = handlers_map
952                    .get("__builtin_planner__")
953                    .cloned()
954                    .expect("planner was pre-injected");
955                Box::new(move |state| arc(state))
956            }
957
958            // ── Human-in-the-loop: pause for human approval/edit ───────────
959            Some("human_in_the_loop") => {
960                use crate::core::node::nodes_trait::HumanInTheLoopConfig;
961                let cfg = HumanInTheLoopConfig::from_node_config(node_config)?;
962                create_human_in_loop_handler(cfg)
963            }
964
965            // ── Memory: built-in memory operations ─────────────────────────
966            Some("memory") => {
967                let op = node_config
968                    .config
969                    .get("operation")
970                    .and_then(|v| v.as_str())
971                    .unwrap_or("");
972                match create_memory_handler(op) {
973                    Some(h) => h,
974                    None => {
975                        return Err(FlowgentraError::ConfigError(format!(
976                            "Unknown memory operation '{}' for node '{}'. \
977                         Valid operations: append_message, compress_history, \
978                         clear_history, get_message_count, format_history_for_context",
979                            op, node_config.name
980                        )))
981                    }
982                }
983            }
984
985            // ── Plain handler (or unrecognized type: falls back to handler) ─
986            _ => {
987                let arc = require_handler!(&node_config.handler);
988                Box::new(move |state| arc(state))
989            }
990        };
991
992        node_handlers.insert(node_name, handler);
993    }
994
995    // ── Second pass: supervisor + subgraph nodes ────────────────────────────────
996    // Convert first-pass handlers to Arc so supervisor children can be shared
997    // without being consumed (each child must remain a standalone graph node too).
998    let node_handlers_arc: HashMap<String, ArcHandler<SharedState>> = node_handlers
999        .into_iter()
1000        .map(|(name, h)| {
1001            let arc: ArcHandler<SharedState> = Arc::new(move |state| h(state));
1002            (name, arc)
1003        })
1004        .collect();
1005
1006    let mut second_pass_handlers: HandlerRegistry<SharedState> = HashMap::new();
1007
1008    // ── Sub-pass A: index all subgraph configs by node name ───────────────────
1009    // create_subgraph_handler is cheap (YAML loads only at execution time),
1010    // so we build on demand in sub-pass B rather than pre-building.
1011    use crate::core::node::agent_or_graph_node::SubgraphNodeConfig;
1012    let subgraph_configs: HashMap<String, SubgraphNodeConfig> = config
1013        .graph
1014        .nodes
1015        .iter()
1016        .filter(|n| {
1017            matches!(
1018                n.node_type.as_deref(),
1019                Some("subgraph") | Some("agent") | Some("agent_or_graph")
1020            )
1021        })
1022        .map(|n| {
1023            let mut cfg = SubgraphNodeConfig::from_node_config(n)?;
1024            // Resolve subgraph path relative to the parent config file's directory
1025            if !std::path::Path::new(&cfg.path).is_absolute() {
1026                cfg.path = config_dir.join(&cfg.path).to_string_lossy().to_string();
1027            }
1028            Ok((n.name.clone(), cfg))
1029        })
1030        .collect::<Result<_>>()?;
1031
1032    // Register all subgraph nodes as standalone graph nodes too
1033    for (name, cfg) in &subgraph_configs {
1034        second_pass_handlers.insert(name.clone(), create_subgraph_handler(cfg.clone()));
1035    }
1036
1037    // ── Sub-pass B: build supervisor handlers ─────────────────────────────────
1038    // Child resolution order:
1039    //   1. first-pass plain handlers  (node_handlers_arc)
1040    //   2. subgraph nodes             (subgraph_configs — built fresh per supervisor)
1041    //   3. already-built supervisor handlers (built_supervisor_arcs — enables nesting)
1042    //   4. inventory handlers         (handlers_map)
1043    //
1044    // Supervisors are built in multiple passes so that parent supervisors can
1045    // reference child supervisors that were built in an earlier pass.
1046    let supervisor_nodes: Vec<_> = config
1047        .graph
1048        .nodes
1049        .iter()
1050        .filter(|n| {
1051            matches!(
1052                n.node_type.as_deref(),
1053                Some("supervisor") | Some("orchestrator")
1054            )
1055        })
1056        .collect();
1057
1058    let mut built_supervisor_arcs: HashMap<String, ArcHandler<SharedState>> = HashMap::new();
1059    let mut remaining: Vec<_> = supervisor_nodes.iter().map(|n| n.name.clone()).collect();
1060    let max_passes = remaining.len() + 1; // guard against infinite loops
1061
1062    for _pass in 0..max_passes {
1063        if remaining.is_empty() {
1064            break;
1065        }
1066        let mut still_remaining = Vec::new();
1067
1068        for sup_name in &remaining {
1069            let node_config = supervisor_nodes
1070                .iter()
1071                .find(|n| &n.name == sup_name)
1072                .unwrap();
1073            use crate::core::node::orchestrator_node::SupervisorNodeConfig;
1074            let cfg = SupervisorNodeConfig::from_node_config(node_config)?;
1075
1076            let mut child_arcs: Vec<(String, ArcHandler<SharedState>)> = Vec::new();
1077            let mut all_resolved = true;
1078
1079            for child_name in &cfg.children {
1080                if let Some(arc) = node_handlers_arc.get(child_name).cloned() {
1081                    child_arcs.push((child_name.clone(), arc));
1082                } else if let Some(sub_cfg) = subgraph_configs.get(child_name) {
1083                    let handler = create_subgraph_handler(sub_cfg.clone());
1084                    child_arcs.push((child_name.clone(), Arc::new(move |state| handler(state))));
1085                } else if let Some(arc) = built_supervisor_arcs.get(child_name).cloned() {
1086                    // child is a supervisor built in a previous pass
1087                    child_arcs.push((child_name.clone(), arc));
1088                } else if let Some(arc) = handlers_map.get(child_name).cloned() {
1089                    child_arcs.push((child_name.clone(), arc));
1090                } else {
1091                    // child not yet available — might be built in a later pass
1092                    all_resolved = false;
1093                    break;
1094                }
1095            }
1096
1097            if all_resolved {
1098                // Build per-child MCP map from node configs
1099                let child_mcps: HashMap<String, Vec<String>> = cfg
1100                    .children
1101                    .iter()
1102                    .filter_map(|name| {
1103                        let node = config.graph.nodes.iter().find(|n| &n.name == name)?;
1104                        if node.mcps.is_empty() {
1105                            None
1106                        } else {
1107                            Some((name.clone(), node.mcps.clone()))
1108                        }
1109                    })
1110                    .collect();
1111
1112                let handler = if matches!(
1113                    cfg.strategy,
1114                    crate::core::node::orchestrator_node::OrchestrationStrategy::Dynamic
1115                ) {
1116                    let llm = config
1117                        .create_llm_client()
1118                        .ok()
1119                        .map(|c| c as Arc<dyn LLMClient>);
1120                    create_supervisor_handler_with_llm(cfg, child_arcs, llm, child_mcps)
1121                } else {
1122                    create_supervisor_handler(cfg, child_arcs, child_mcps)
1123                };
1124                let arc: ArcHandler<SharedState> = Arc::new(move |state| handler(state));
1125                built_supervisor_arcs.insert(sup_name.clone(), arc);
1126                second_pass_handlers.insert(sup_name.clone(), {
1127                    let arc = built_supervisor_arcs.get(sup_name).unwrap().clone();
1128                    Box::new(move |state| arc(state))
1129                });
1130            } else {
1131                still_remaining.push(sup_name.clone());
1132            }
1133        }
1134
1135        if still_remaining.len() == remaining.len() {
1136            // No progress — remaining supervisors have unresolvable children
1137            for sup_name in &still_remaining {
1138                let node_config = supervisor_nodes
1139                    .iter()
1140                    .find(|n| &n.name == sup_name)
1141                    .unwrap();
1142                use crate::core::node::orchestrator_node::SupervisorNodeConfig;
1143                let cfg = SupervisorNodeConfig::from_node_config(node_config)?;
1144                for child_name in &cfg.children {
1145                    if !node_handlers_arc.contains_key(child_name)
1146                        && !subgraph_configs.contains_key(child_name)
1147                        && !built_supervisor_arcs.contains_key(child_name)
1148                        && !handlers_map.contains_key(child_name)
1149                    {
1150                        missing_handlers.push(child_name.clone());
1151                    }
1152                }
1153            }
1154            break;
1155        }
1156
1157        remaining = still_remaining;
1158    }
1159
1160    // Merge both passes back into a single HandlerRegistry
1161    let mut node_handlers: HandlerRegistry<SharedState> = node_handlers_arc
1162        .into_iter()
1163        .map(|(name, arc)| {
1164            let h: Handler<SharedState> = Box::new(move |state| arc(state));
1165            (name, h)
1166        })
1167        .collect();
1168    node_handlers.extend(second_pass_handlers);
1169
1170    if !missing_handlers.is_empty() {
1171        missing_handlers.dedup();
1172        let missing = missing_handlers.join(", ");
1173        let registered = handlers_map
1174            .keys()
1175            .filter(|k| !k.starts_with("__builtin"))
1176            .map(|k: &String| k.as_str())
1177            .collect::<Vec<_>>()
1178            .join(", ");
1179
1180        let msg = if registered.is_empty() {
1181            format!(
1182                "Missing handlers: {}. No handlers registered yet.\n\
1183                 Use #[register_handler] attribute on your handler functions to register them.\n\
1184                 Example:\n  #[register_handler]\n  pub async fn {} (state: State) -> Result<State> {{ ... }}",
1185                missing,
1186                missing_handlers[0]
1187            )
1188        } else {
1189            format!(
1190                "Missing handlers: {}\n\
1191                 Registered handlers: {}\n\
1192                 Use #[register_handler] attribute to register missing handlers.",
1193                missing, registered
1194            )
1195        };
1196
1197        return Err(crate::core::error::FlowgentraError::ConfigError(msg));
1198    }
1199
1200    // Pass handlers keyed by node name — from_config_inner will match by node.name
1201    let mut agent = Agent::from_config_inner(config.clone(), node_handlers, HashMap::new())?;
1202
1203    // Apply memory from config (checkpointer, conversation memory, buffer/window)
1204    if config
1205        .memory
1206        .checkpointer
1207        .type_
1208        .eq_ignore_ascii_case("memory")
1209    {
1210        agent.set_checkpointer(Arc::new(MemoryCheckpointer::new()));
1211    }
1212    if config.memory.conversation.enabled {
1213        let conv = InMemoryConversationMemory::with_config(&config.memory.buffer);
1214        agent.set_conversation_memory(Arc::new(conv));
1215    }
1216
1217    // Apply evaluation from config (auto-add AutoEvaluationMiddleware)
1218    if let Some(eval_config) = &config.evaluation {
1219        if eval_config.enabled {
1220            use crate::core::evaluation::{
1221                AutoEvaluationMiddleware, ConfidenceConfig, LegacyEvaluationPolicy, RetryConfig,
1222                ScoringCriteria,
1223            };
1224
1225            let policy = LegacyEvaluationPolicy {
1226                enable_scoring: true,
1227                enable_grading: eval_config
1228                    .grading
1229                    .as_ref()
1230                    .map(|g| g.enabled)
1231                    .unwrap_or(false),
1232                enable_confidence_scoring: true,
1233                confidence_threshold: eval_config.min_confidence,
1234                max_retries: eval_config.max_retries,
1235                enable_self_correction: true,
1236                store_evaluation_history: true,
1237            };
1238
1239            let scoring_criteria = ScoringCriteria::default();
1240
1241            let confidence_config = ConfidenceConfig {
1242                low_threshold: eval_config.min_confidence * 0.6,
1243                high_threshold: eval_config.min_confidence,
1244                ..Default::default()
1245            };
1246
1247            let retry_config = RetryConfig {
1248                max_retries: eval_config.max_retries,
1249                confidence_threshold: eval_config.min_confidence,
1250                ..Default::default()
1251            };
1252
1253            let middleware = AutoEvaluationMiddleware::new()
1254                .with_policy(policy)
1255                .with_scoring_criteria(scoring_criteria)
1256                .with_confidence_config(confidence_config)
1257                .with_retry_config(retry_config);
1258
1259            agent.runtime_mut().add_middleware(Arc::new(middleware));
1260        }
1261    }
1262
1263    Ok(agent)
1264}
1265
1266// Memory-aware agent wrapper for simplified memory handling
1267mod memory_aware;
1268pub use memory_aware::{MemoryAwareAgent, MemoryStats};
1269
1270// =============================================================================
1271// Built-in Node Handler Wrappers
1272//
1273// Each function converts a built-in node config into a Handler<SharedState>
1274// so every node type is uniformly represented as a NodeFunction in the runtime.
1275// =============================================================================
1276
1277/// Wraps a handler in an evaluation retry loop.
1278/// Delegates to `EvaluationNodeConfig::into_wrapping_node_fn` — single source of truth.
1279fn wrap_handler_with_evaluation(
1280    handler: ArcHandler<SharedState>,
1281    eval_config: crate::core::node::evaluation_node::EvaluationNodeConfig,
1282) -> Handler<SharedState> {
1283    eval_config.into_wrapping_node_fn(handler)
1284}
1285
1286// ── Retry ──────────────────────────────────────────────────────────────────
1287/// Wraps a handler in an exponential-backoff retry loop.
1288/// Re-calls the handler on error up to `max_retries` times.
1289///
1290/// YAML:
1291/// ```yaml
1292/// - name: call_api
1293///   type: retry
1294///   handler: call_api_handler
1295///   config:
1296///     max_retries: 3
1297///     backoff_ms: 1000
1298///     backoff_multiplier: 2.0
1299///     max_backoff_ms: 30000
1300/// ```
1301fn wrap_handler_with_retry(
1302    handler: ArcHandler<SharedState>,
1303    config: crate::core::node::nodes_trait::RetryNodeConfig,
1304) -> Handler<SharedState> {
1305    Box::new(move |state| {
1306        let handler = handler.clone();
1307        let config = config.clone();
1308        Box::pin(async move {
1309            let mut last_err: Option<FlowgentraError> = None;
1310
1311            for attempt in 0..=config.max_retries {
1312                match handler(state.clone()).await {
1313                    Ok(new_state) => return Ok(new_state),
1314                    Err(e) => {
1315                        tracing::warn!(
1316                            "Retry '{}' attempt {}/{} failed: {}",
1317                            config.name,
1318                            attempt + 1,
1319                            config.max_retries,
1320                            e
1321                        );
1322                        last_err = Some(e);
1323
1324                        if attempt < config.max_retries {
1325                            let backoff = if attempt == 0 {
1326                                config.backoff_ms
1327                            } else {
1328                                let exp = config.backoff_ms as f64
1329                                    * (config.backoff_multiplier as f64).powi(attempt as i32);
1330                                (exp as u64).min(config.max_backoff_ms)
1331                            };
1332                            tokio::time::sleep(std::time::Duration::from_millis(backoff)).await;
1333                        }
1334                    }
1335                }
1336            }
1337
1338            Err(last_err.unwrap_or_else(|| {
1339                FlowgentraError::RuntimeError(format!(
1340                    "Retry '{}' exhausted {} attempts",
1341                    config.name, config.max_retries
1342                ))
1343            }))
1344        })
1345    })
1346}
1347
1348// ── Timeout ────────────────────────────────────────────────────────────────
1349/// Wraps a handler with a wall-clock timeout.
1350/// On timeout, behaviour is controlled by `on_timeout`:
1351/// - `"error"` (default) → return an error
1352/// - `"skip"` → return state unchanged
1353/// - `"default_value"` → set `_timeout_default` in state and return ok
1354///
1355/// YAML:
1356/// ```yaml
1357/// - name: slow_op
1358///   type: timeout
1359///   handler: slow_handler
1360///   config:
1361///     timeout_ms: 5000
1362///     on_timeout: "error"   # or "skip" or "default_value"
1363/// ```
1364fn wrap_handler_with_timeout(
1365    handler: ArcHandler<SharedState>,
1366    config: crate::core::node::nodes_trait::TimeoutNodeConfig,
1367) -> Handler<SharedState> {
1368    Box::new(move |state| {
1369        let handler = handler.clone();
1370        let config = config.clone();
1371        Box::pin(async move {
1372            let duration = std::time::Duration::from_millis(config.timeout_ms);
1373            match tokio::time::timeout(duration, handler(state.clone())).await {
1374                Ok(Ok(new_state)) => Ok(new_state),
1375                Ok(Err(e)) => Err(e),
1376                Err(_elapsed) => match config.on_timeout.as_str() {
1377                    "skip" => Ok(state),
1378                    "default_value" => {
1379                        if let Some(default) = &config.default_value {
1380                            state.set("_timeout_default", default.clone());
1381                        }
1382                        Ok(state)
1383                    }
1384                    _ => Err(FlowgentraError::ExecutionTimeout(format!(
1385                        "Node '{}' timed out after {}ms",
1386                        config.name, config.timeout_ms
1387                    ))),
1388                },
1389            }
1390        })
1391    })
1392}
1393
1394// ── Loop ───────────────────────────────────────────────────────────────────
1395/// Wraps a handler in a fixed-iteration loop.
1396/// Exits early when the state field named `break_condition` is true.
1397///
1398/// YAML:
1399/// ```yaml
1400/// - name: iterate_process
1401///   type: loop
1402///   handler: process_step
1403///   config:
1404///     max_iterations: 5
1405///     break_condition: "is_done"   # name of a bool state field
1406/// ```
1407fn wrap_handler_with_loop(
1408    handler: ArcHandler<SharedState>,
1409    config: crate::core::node::advanced_nodes::LoopNodeConfig,
1410) -> Handler<SharedState> {
1411    Box::new(move |state| {
1412        let handler = handler.clone();
1413        let config = config.clone();
1414        Box::pin(async move {
1415            let mut state = state;
1416
1417            for iteration in 0..config.max_iterations {
1418                tracing::info!(
1419                    "Loop '{}' iteration {}/{}",
1420                    config.handler,
1421                    iteration + 1,
1422                    config.max_iterations
1423                );
1424                state = handler(state).await?;
1425
1426                if let Some(ref cond) = config.break_condition {
1427                    if state.get(cond).and_then(|v| v.as_bool()).unwrap_or(false) {
1428                        tracing::info!(
1429                            "Loop '{}' break condition '{}' met at iteration {}",
1430                            config.handler,
1431                            cond,
1432                            iteration + 1
1433                        );
1434                        break;
1435                    }
1436                }
1437            }
1438
1439            Ok(state)
1440        })
1441    })
1442}
1443
1444// ── Human-in-the-Loop ─────────────────────────────────────────────────────
1445/// Simulates a human approval checkpoint.
1446/// Sets `_human_approved = true` and `_human_node` in state.
1447/// (In production, replace with a real interactive or webhook mechanism.)
1448///
1449/// YAML:
1450/// ```yaml
1451/// - name: approval
1452///   type: human_in_the_loop
1453///   config:
1454///     prompt: "Please approve this action"
1455///     require_approval: true
1456///     editable_fields: ["amount", "recipient"]
1457/// ```
1458fn create_human_in_loop_handler(
1459    config: crate::core::node::nodes_trait::HumanInTheLoopConfig,
1460) -> Handler<SharedState> {
1461    Box::new(move |state| {
1462        let config = config.clone();
1463        Box::pin(async move {
1464            tracing::info!("Human-in-the-loop '{}': {}", config.name, config.prompt);
1465            state.set("_human_approved", serde_json::json!(true));
1466            state.set("_human_node", serde_json::json!(config.name));
1467            if !config.editable_fields.is_empty() {
1468                state.set(
1469                    "_human_editable_fields",
1470                    serde_json::json!(config.editable_fields),
1471                );
1472            }
1473            Ok(state)
1474        })
1475    })
1476}
1477
1478// ── Memory ─────────────────────────────────────────────────────────────────
1479/// Creates a memory operation handler by operation name.
1480///
1481/// YAML:
1482/// ```yaml
1483/// - name: append_msg
1484///   type: memory
1485///   config:
1486///     operation: append_message   # or compress_history, clear_history,
1487///                                 #    get_message_count, format_history_for_context
1488/// ```
1489fn create_memory_handler(operation: &str) -> Option<Handler<SharedState>> {
1490    use crate::core::node::memory_handlers;
1491    match operation {
1492        "append_message" => Some(Box::new(|state| {
1493            Box::pin(memory_handlers::append_message_handler(state))
1494        })),
1495        "compress_history" => Some(Box::new(|state| {
1496            Box::pin(memory_handlers::compress_history_handler(state))
1497        })),
1498        "clear_history" => Some(Box::new(|state| {
1499            Box::pin(memory_handlers::clear_history_handler(state))
1500        })),
1501        "get_message_count" => Some(Box::new(|state| {
1502            Box::pin(memory_handlers::get_message_count_handler(state))
1503        })),
1504        "format_history_for_context" => Some(Box::new(|state| {
1505            Box::pin(memory_handlers::format_history_for_context_handler(state))
1506        })),
1507        _ => None,
1508    }
1509}
1510
1511// =============================================================================
1512// Standalone Built-in Node Handlers
1513//
1514// These create self-contained handlers that manage state flags so the caller
1515// can route the graph based on those flags (e.g. with conditional back-edges).
1516// No user handler is required — they are fully implemented by the library.
1517// =============================================================================
1518
1519// ── Standalone Evaluation ─────────────────────────────────────────────────────
1520/// Reads the configured `field_state` from the current state, scores it with
1521/// the built-in heuristic scorer, and writes evaluation metadata to state.
1522///
1523/// State keys written:
1524/// - `__eval_score__<name>`        – current numeric score (0.0–1.0)
1525/// - `__eval_feedback__<name>`     – textual feedback for the next handler
1526/// - `__eval_needs_retry__<name>`  – bool, true when score < min_confidence
1527///   AND attempt < max_retries
1528/// - `__eval_attempt__<name>`      – current attempt counter (starts at 1)
1529/// - `__eval_meta__<name>`         – full JSON object with all metadata
1530///
1531/// Pair with a conditional back-edge on `__eval_needs_retry__<name>` to retry.
1532///
1533/// YAML:
1534/// ```yaml
1535/// - name: score_output
1536///   type: evaluation
1537///   config:
1538///     field_state: llm_output
1539///     min_confidence: 0.80
1540///     max_retries: 3
1541///     rubric: "Is the output clear and accurate?"
1542/// ```
1543/// Delegates to `EvaluationNodeConfig::into_standalone_node_fn` — single source of truth.
1544fn create_evaluation_standalone_handler(
1545    config: crate::core::node::evaluation_node::EvaluationNodeConfig,
1546) -> Handler<SharedState> {
1547    config.into_standalone_node_fn()
1548}
1549
1550// ── Standalone Retry ──────────────────────────────────────────────────────────
1551/// Manages a retry counter and computes exponential backoff in state.
1552/// Does NOT call any user handler — use this as a gate node in your graph.
1553///
1554/// State keys written:
1555/// - `__retry_count__<name>`        – current attempt number (starts at 1)
1556/// - `__retry_should_retry__<name>` – bool, true while count < max_retries
1557/// - `__retry_meta__<name>`         – JSON with attempt / backoff / should_retry
1558///
1559/// Pair with a conditional back-edge on `__retry_should_retry__<name>`.
1560///
1561/// YAML:
1562/// ```yaml
1563/// - name: retry_gate
1564///   type: retry
1565///   config:
1566///     max_retries: 3
1567///     backoff_ms: 500
1568///     backoff_multiplier: 2.0
1569///     max_backoff_ms: 10000
1570/// ```
1571fn create_retry_standalone_handler(
1572    config: crate::core::node::nodes_trait::RetryNodeConfig,
1573) -> Handler<SharedState> {
1574    use serde_json::json;
1575
1576    Box::new(move |state| {
1577        let config = config.clone();
1578        Box::pin(async move {
1579            let count_key = format!("__retry_count__{}", config.name);
1580            let should_retry_key = format!("__retry_should_retry__{}", config.name);
1581
1582            let current = state.get(&count_key).and_then(|v| v.as_u64()).unwrap_or(0) as usize;
1583
1584            let next = current + 1;
1585            let should_retry = current < config.max_retries;
1586
1587            // Exponential backoff for the current attempt
1588            let backoff_ms = if current == 0 {
1589                config.backoff_ms
1590            } else {
1591                let exp = config.backoff_ms as f64
1592                    * (config.backoff_multiplier as f64).powi(current as i32);
1593                (exp as u64).min(config.max_backoff_ms)
1594            };
1595
1596            state.set(&count_key, json!(next));
1597            state.set(&should_retry_key, json!(should_retry));
1598            state.set(
1599                format!("__retry_meta__{}", config.name),
1600                json!({
1601                    "attempt": next,
1602                    "max_retries": config.max_retries,
1603                    "should_retry": should_retry,
1604                    "backoff_ms": backoff_ms,
1605                }),
1606            );
1607
1608            if should_retry {
1609                tracing::info!(
1610                    "Standalone retry '{}' attempt {}/{}, sleeping {}ms",
1611                    config.name,
1612                    next,
1613                    config.max_retries,
1614                    backoff_ms
1615                );
1616                tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
1617            } else {
1618                tracing::info!(
1619                    "Standalone retry '{}' exhausted {} attempts — no more retries",
1620                    config.name,
1621                    config.max_retries
1622                );
1623            }
1624
1625            Ok(state)
1626        })
1627    })
1628}
1629
1630// ── Standalone Timeout ────────────────────────────────────────────────────────
1631/// Tracks a wall-clock deadline in state.
1632/// On first visit, records `now + timeout_ms` as the deadline.
1633/// On each subsequent visit, checks whether the deadline has passed.
1634///
1635/// State keys written:
1636/// - `__timeout_deadline__<name>` – UNIX-ms deadline (set once on first visit)
1637/// - `__timeout_timed_out__<name>` – bool, true once deadline passes
1638/// - `__timeout_meta__<name>`      – JSON with deadline / elapsed info
1639///
1640/// On timeout, behaviour mirrors the wrapping variant:
1641/// - `on_timeout: "error"` → returns an error
1642/// - `on_timeout: "skip"`  → continues silently
1643/// - `on_timeout: "default_value"` → writes `_timeout_default` to state
1644///
1645/// YAML:
1646/// ```yaml
1647/// - name: check_deadline
1648///   type: timeout
1649///   config:
1650///     timeout_ms: 30000
1651///     on_timeout: "skip"
1652/// ```
1653fn create_timeout_standalone_handler(
1654    config: crate::core::node::nodes_trait::TimeoutNodeConfig,
1655) -> Handler<SharedState> {
1656    use serde_json::json;
1657
1658    Box::new(move |state| {
1659        let config = config.clone();
1660        Box::pin(async move {
1661            let deadline_key = format!("__timeout_deadline__{}", config.name);
1662            let timed_out_key = format!("__timeout_timed_out__{}", config.name);
1663
1664            let now_ms = std::time::SystemTime::now()
1665                .duration_since(std::time::UNIX_EPOCH)
1666                .unwrap_or_default()
1667                .as_millis() as u64;
1668
1669            let deadline_ms = match state.get(&deadline_key).and_then(|v| v.as_u64()) {
1670                Some(d) => d,
1671                None => {
1672                    // First visit: record the deadline, not yet timed out
1673                    let d = now_ms + config.timeout_ms;
1674                    state.set(&deadline_key, json!(d));
1675                    state.set(&timed_out_key, json!(false));
1676                    state.set(
1677                        format!("__timeout_meta__{}", config.name),
1678                        json!({ "timed_out": false, "deadline_ms": d, "current_ms": now_ms }),
1679                    );
1680                    return Ok(state);
1681                }
1682            };
1683
1684            let timed_out = now_ms > deadline_ms;
1685            state.set(&timed_out_key, json!(timed_out));
1686            state.set(
1687                format!("__timeout_meta__{}", config.name),
1688                json!({
1689                    "timed_out": timed_out,
1690                    "timeout_ms": config.timeout_ms,
1691                    "deadline_ms": deadline_ms,
1692                    "current_ms": now_ms,
1693                    "elapsed_ms": now_ms.saturating_sub(deadline_ms - config.timeout_ms),
1694                }),
1695            );
1696
1697            if timed_out {
1698                tracing::warn!(
1699                    "Standalone timeout '{}' expired (deadline={}ms, now={}ms)",
1700                    config.name,
1701                    deadline_ms,
1702                    now_ms
1703                );
1704                match config.on_timeout.as_str() {
1705                    "error" => {
1706                        return Err(FlowgentraError::ExecutionTimeout(format!(
1707                            "Node '{}' timed out after {}ms",
1708                            config.name, config.timeout_ms
1709                        )))
1710                    }
1711                    "default_value" => {
1712                        if let Some(default) = &config.default_value {
1713                            state.set("_timeout_default", default.clone());
1714                        }
1715                    }
1716                    _ => {} // "skip" — continue silently
1717                }
1718            }
1719
1720            Ok(state)
1721        })
1722    })
1723}
1724
1725// ── Standalone Loop ───────────────────────────────────────────────────────────
1726/// Manages an iteration counter in state.
1727/// Use this as a loop-gate node with a conditional back-edge on
1728/// `__loop_continue__<name>` to build explicit loop subgraphs.
1729///
1730/// State keys written:
1731/// - `__loop_iteration__<name>` – current iteration (starts at 1)
1732/// - `__loop_continue__<name>`  – bool, true while within max_iterations
1733///   AND break_condition is not set in state
1734/// - `__loop_meta__<name>`      – JSON with full iteration metadata
1735///
1736/// YAML:
1737/// ```yaml
1738/// - name: loop_gate
1739///   type: loop
1740///   config:
1741///     max_iterations: 5
1742///     break_condition: "is_done"   # optional bool state field
1743/// ```
1744// ── Supervisor ────────────────────────────────────────────────────────────────
1745/// Coordinates multiple child handlers (or subgraphs) as a single node.
1746///
1747/// This is the canonical multi-agent orchestration pattern:
1748/// a Supervisor delegates to sub-agents, manages their execution, and aggregates results.
1749///
1750/// Sequential: calls children one after another; state flows through each.
1751/// Parallel: calls all children concurrently; merges results per `merge_strategy`.
1752///
1753/// State written:
1754/// - `__supervisor_meta__<name>` — per-child results, errors, timing
1755///
1756/// YAML:
1757/// ```yaml
1758/// - name: research_coordinator
1759///   type: supervisor           # alias: orchestrator
1760///   config:
1761///     strategy: sequential     # or "parallel"
1762///     children: [research_agent, writer_agent, critic_agent]
1763///     fail_fast: true
1764///     child_timeout_ms: 30000
1765///     collect_stats: true
1766///     merge_strategy: latest   # parallel only — or "first_success", "deep_merge"
1767///     parallel_aggregation: all  # or "majority", "first_success"
1768/// ```
1769fn create_supervisor_handler(
1770    config: crate::core::node::orchestrator_node::SupervisorNodeConfig,
1771    children: Vec<(String, ArcHandler<SharedState>)>,
1772    child_mcps: HashMap<String, Vec<String>>,
1773) -> Handler<SharedState> {
1774    create_supervisor_handler_with_llm(config, children, None, child_mcps)
1775}
1776
1777fn create_supervisor_handler_with_llm(
1778    config: crate::core::node::orchestrator_node::SupervisorNodeConfig,
1779    children: Vec<(String, ArcHandler<SharedState>)>,
1780    llm_client: Option<Arc<dyn LLMClient>>,
1781    child_mcps: HashMap<String, Vec<String>>,
1782) -> Handler<SharedState> {
1783    use crate::core::node::orchestrator_node::{OrchestrationStrategy, ParallelMergeStrategy};
1784    use serde_json::json;
1785
1786    // Evaluate a skip condition expression against state.
1787    // Supported forms:
1788    //   "key"            — skip if state[key] is non-null and non-false
1789    //   "key != null"    — skip if state[key] is non-null
1790    //   "key == null"    — skip if state[key] is null/absent
1791    //   "key == value"   — skip if state[key].to_string() == value
1792    fn should_skip(condition: &str, state: &SharedState) -> bool {
1793        let condition = condition.trim();
1794        if condition.contains("!=") {
1795            let parts: Vec<&str> = condition.splitn(2, "!=").collect();
1796            let key = parts[0].trim();
1797            let rhs = parts[1].trim();
1798            let val = state.get(key);
1799            if rhs == "null" {
1800                return val.is_some() && !val.unwrap().is_null();
1801            }
1802            return val
1803                .map(|v| v.to_string().trim_matches('"') != rhs)
1804                .unwrap_or(false);
1805        }
1806        if condition.contains("==") {
1807            let parts: Vec<&str> = condition.splitn(2, "==").collect();
1808            let key = parts[0].trim();
1809            let rhs = parts[1].trim();
1810            let val = state.get(key);
1811            if rhs == "null" {
1812                return val.is_none() || val.unwrap().is_null();
1813            }
1814            return val
1815                .map(|v| v.to_string().trim_matches('"') == rhs)
1816                .unwrap_or(false);
1817        }
1818        // bare key: skip if truthy
1819        state
1820            .get(condition)
1821            .map(|v| !v.is_null() && v.as_bool() != Some(false))
1822            .unwrap_or(false)
1823    }
1824
1825    // Run a single child with optional timeout, returning (result, duration_ms).
1826    async fn run_child(
1827        name: &str,
1828        handler: &ArcHandler<SharedState>,
1829        state: SharedState,
1830        timeout_ms: Option<u64>,
1831    ) -> (crate::core::error::Result<SharedState>, u128) {
1832        let child_start = std::time::Instant::now();
1833        let result = if let Some(ms) = timeout_ms {
1834            tokio::time::timeout(std::time::Duration::from_millis(ms), handler(state))
1835                .await
1836                .unwrap_or_else(|_| {
1837                    Err(FlowgentraError::ExecutionTimeout(format!(
1838                        "Child '{}' timed out after {}ms",
1839                        name, ms
1840                    )))
1841                })
1842        } else {
1843            handler(state).await
1844        };
1845        (result, child_start.elapsed().as_millis())
1846    }
1847
1848    Box::new(move |state| {
1849        let config = config.clone();
1850        let children = children.clone();
1851        let llm_client = llm_client.clone();
1852        let child_mcps = child_mcps.clone();
1853        Box::pin(async move {
1854            let start = std::time::Instant::now();
1855
1856            // Inject per-node MCP assignments into state before running a child
1857            let inject_mcps = |child_name: &str, state: &SharedState| {
1858                if let Some(mcps) = child_mcps.get(child_name) {
1859                    state.set("_node_mcps", serde_json::json!(mcps));
1860                } else {
1861                    state.remove("_node_mcps");
1862                }
1863            };
1864
1865            match &config.strategy {
1866                // ── Sequential ──────────────────────────────────────────────
1867                OrchestrationStrategy::Sequential => {
1868                    let mut current = state;
1869                    let mut child_results = Vec::new();
1870                    let mut errors: Vec<String> = Vec::new();
1871
1872                    for (name, handler) in &children {
1873                        // ── Skip condition ──────────────────────────────────
1874                        if let Some(cond) = config.skip_conditions.get(name.as_str()) {
1875                            if should_skip(cond, &current) {
1876                                tracing::info!(
1877                                    "Supervisor '{}': skipping '{}' (condition: {cond})",
1878                                    config.name,
1879                                    name
1880                                );
1881                                child_results.push(json!({
1882                                    "name": name, "skipped": true, "condition": cond
1883                                }));
1884                                continue;
1885                            }
1886                        }
1887
1888                        // ── Run with retry ──────────────────────────────────
1889                        let max_attempts = config.max_retries_per_child + 1;
1890                        let mut last_err = String::new();
1891                        let mut succeeded = false;
1892                        let mut total_ms = 0u128;
1893
1894                        inject_mcps(name, &current);
1895                        for attempt in 1..=max_attempts {
1896                            let (result, ms) =
1897                                run_child(name, handler, current.clone(), config.child_timeout_ms)
1898                                    .await;
1899                            total_ms += ms;
1900                            match result {
1901                                Ok(new_state) => {
1902                                    current = new_state;
1903                                    succeeded = true;
1904                                    if attempt > 1 {
1905                                        tracing::info!(
1906                                            "Supervisor '{}': child '{}' succeeded on attempt {attempt}",
1907                                            config.name, name
1908                                        );
1909                                    }
1910                                    break;
1911                                }
1912                                Err(e) => {
1913                                    last_err = e.to_string();
1914                                    if attempt < max_attempts {
1915                                        tracing::warn!(
1916                                            "Supervisor '{}': child '{}' failed (attempt {attempt}/{max_attempts}): {e}",
1917                                            config.name, name
1918                                        );
1919                                    }
1920                                }
1921                            }
1922                        }
1923
1924                        if succeeded {
1925                            child_results.push(json!({
1926                                "name": name,
1927                                "success": true,
1928                                "duration_ms": total_ms,
1929                                "attempts": (total_ms > 0) as u8,  // always 1+ if here
1930                            }));
1931                        } else {
1932                            child_results.push(json!({
1933                                "name": name,
1934                                "success": false,
1935                                "error": last_err,
1936                                "duration_ms": total_ms,
1937                                "attempts": max_attempts,
1938                            }));
1939                            errors.push(format!("'{}': {}", name, last_err));
1940                            if config.fail_fast {
1941                                tracing::warn!(
1942                                    "Supervisor '{}' fail_fast: stopped after '{}' failed after {max_attempts} attempt(s)",
1943                                    config.name, name
1944                                );
1945                                break;
1946                            }
1947                        }
1948                    }
1949
1950                    current.set(
1951                        format!("__supervisor_meta__{}", config.name),
1952                        json!({
1953                            "strategy": "sequential",
1954                            "children": child_results,
1955                            "errors": errors,
1956                            "success": errors.is_empty(),
1957                            "duration_ms": start.elapsed().as_millis(),
1958                        }),
1959                    );
1960                    tracing::info!(
1961                        "Supervisor '{}' sequential done in {}ms, errors={}",
1962                        config.name,
1963                        start.elapsed().as_millis(),
1964                        errors.len()
1965                    );
1966                    Ok(current)
1967                }
1968
1969                // ── Parallel ────────────────────────────────────────────────
1970                OrchestrationStrategy::Parallel => {
1971                    let base_state = Arc::new(state);
1972                    // Semaphore limits how many children run at the same time.
1973                    // All futures are spawned immediately; only `max_concurrent` can hold a permit.
1974                    let semaphore = config
1975                        .max_concurrent
1976                        .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1977
1978                    // Filter children by skip conditions, cloning so futures own their data
1979                    let active_children: Vec<(String, ArcHandler<SharedState>)> = children
1980                        .iter()
1981                        .filter(|(name, _)| {
1982                            if let Some(cond) = config.skip_conditions.get(name.as_str()) {
1983                                if should_skip(cond, &base_state) {
1984                                    tracing::info!(
1985                                        "Supervisor '{}': skipping '{}' in parallel (condition: {cond})",
1986                                        config.name, name
1987                                    );
1988                                    return false;
1989                                }
1990                            }
1991                            true
1992                        })
1993                        .map(|(n, h)| (n.clone(), h.clone()))
1994                        .collect();
1995
1996                    let futures: Vec<_> = active_children
1997                        .into_iter()
1998                        .map(|(name, handler)| {
1999                            let state_copy = base_state.deep_clone();
2000                            inject_mcps(&name, &state_copy);
2001                            let timeout_ms = config.child_timeout_ms;
2002                            let max_attempts = config.max_retries_per_child + 1;
2003                            let sem = semaphore.clone();
2004                            async move {
2005                                // Acquire semaphore permit — blocks until a slot is free
2006                                let _permit = if let Some(ref sem) = sem {
2007                                    Some(sem.acquire().await.expect("semaphore closed"))
2008                                } else {
2009                                    None
2010                                };
2011
2012                                let mut total_ms = 0u128;
2013                                let mut last_err = String::new();
2014                                for attempt in 1..=max_attempts {
2015                                    let (result, ms) = run_child(&name, &handler, state_copy.clone(), timeout_ms).await;
2016                                    total_ms += ms;
2017                                    match result {
2018                                        Ok(s) => {
2019                                            if attempt > 1 {
2020                                                tracing::info!(
2021                                                    "Supervisor parallel: '{}' succeeded on attempt {attempt}",
2022                                                    name
2023                                                );
2024                                            }
2025                                            return (name, Ok(s), total_ms, attempt);
2026                                        }
2027                                        Err(e) => {
2028                                            last_err = e.to_string();
2029                                            if attempt < max_attempts {
2030                                                tracing::warn!(
2031                                                    "Supervisor parallel: '{}' failed attempt {attempt}/{max_attempts}: {e}",
2032                                                    name
2033                                                );
2034                                            }
2035                                        }
2036                                    }
2037                                }
2038                                (name, Err(FlowgentraError::ExecutionError(last_err)), total_ms, max_attempts)
2039                            }
2040                        })
2041                        .collect();
2042
2043                    let results = futures::future::join_all(futures).await;
2044
2045                    let mut successes: Vec<SharedState> = Vec::new();
2046                    let mut child_results = Vec::new();
2047                    let mut errors: Vec<String> = Vec::new();
2048
2049                    for (name, result, duration_ms, attempts) in results {
2050                        match result {
2051                            Ok(s) => {
2052                                child_results.push(json!({
2053                                    "name": name, "success": true,
2054                                    "duration_ms": duration_ms, "attempts": attempts,
2055                                }));
2056                                successes.push(s);
2057                            }
2058                            Err(e) => {
2059                                child_results.push(json!({
2060                                    "name": name, "success": false,
2061                                    "error": e.to_string(),
2062                                    "duration_ms": duration_ms, "attempts": attempts,
2063                                }));
2064                                errors.push(format!("'{}': {}", name, e));
2065                            }
2066                        }
2067                    }
2068
2069                    // ── Merge ───────────────────────────────────────────────
2070                    let final_state = match &config.merge_strategy {
2071                        ParallelMergeStrategy::Latest => successes
2072                            .into_iter()
2073                            .last()
2074                            .unwrap_or_else(|| (*base_state).clone()),
2075
2076                        ParallelMergeStrategy::FirstSuccess => successes
2077                            .into_iter()
2078                            .next()
2079                            .unwrap_or_else(|| (*base_state).clone()),
2080
2081                        ParallelMergeStrategy::DeepMerge => {
2082                            // Start from a fresh clone of base state, then overlay
2083                            // only keys that each child actually changed.
2084                            // Each child starts with a deep_clone of base_state, so we
2085                            // compare each child's values against the base to detect
2086                            // real modifications — avoiding null overwrites from
2087                            // schema-initialized keys the child never touched.
2088                            let base_snapshot: Vec<(String, serde_json::Value)> =
2089                                base_state.iter_map();
2090                            let merged = (*base_state).clone();
2091                            for child_state in successes {
2092                                for (key, value) in child_state.iter_map() {
2093                                    // Skip internal supervisor/planner metadata keys
2094                                    if key.starts_with("__supervisor_meta__")
2095                                        || key.starts_with("__eval_")
2096                                        || key.starts_with("_next_node")
2097                                    {
2098                                        continue;
2099                                    }
2100                                    // Only merge if the child actually changed this key
2101                                    let changed =
2102                                        match base_snapshot.iter().find(|(k, _)| k == &key) {
2103                                            Some((_, base_val)) => value != *base_val,
2104                                            None => true, // new key not in base — always merge
2105                                        };
2106                                    if changed {
2107                                        merged.set(key, value);
2108                                    }
2109                                }
2110                            }
2111                            merged
2112                        }
2113
2114                        ParallelMergeStrategy::Custom(_) => successes
2115                            .into_iter()
2116                            .last()
2117                            .unwrap_or_else(|| (*base_state).clone()),
2118                    };
2119
2120                    final_state.set(
2121                        format!("__supervisor_meta__{}", config.name),
2122                        json!({
2123                            "strategy": "parallel",
2124                            "children": child_results,
2125                            "errors": errors,
2126                            "success": errors.is_empty(),
2127                            "duration_ms": start.elapsed().as_millis(),
2128                        }),
2129                    );
2130                    tracing::info!(
2131                        "Supervisor '{}' parallel done in {}ms, errors={}",
2132                        config.name,
2133                        start.elapsed().as_millis(),
2134                        errors.len()
2135                    );
2136                    Ok(final_state)
2137                }
2138
2139                // ── Autonomous ──────────────────────────────────────────────
2140                // The supervisor loops, calling whichever agent owns each missing
2141                // required output, until all are present or max_iterations is hit.
2142                OrchestrationStrategy::Autonomous => {
2143                    let mut current = state;
2144
2145                    if let Some(goal) = &config.goal {
2146                        tracing::info!(
2147                            "Supervisor '{}' autonomous start, goal: {}",
2148                            config.name,
2149                            goal
2150                        );
2151                    }
2152
2153                    // Build O(1) lookup: child name → handler
2154                    let child_map: HashMap<String, ArcHandler<SharedState>> = children
2155                        .iter()
2156                        .map(|(n, h)| (n.clone(), h.clone()))
2157                        .collect();
2158
2159                    let max_iter = if config.max_iterations == 0 {
2160                        10
2161                    } else {
2162                        config.max_iterations
2163                    };
2164                    let mut iteration_log = Vec::new();
2165
2166                    'outer: for iteration in 1..=max_iter {
2167                        // Which required outputs are still missing?
2168                        let missing: Vec<String> = config
2169                            .required_outputs
2170                            .iter()
2171                            .filter(|key| {
2172                                current
2173                                    .get(key.as_str())
2174                                    .map(|v| v.is_null())
2175                                    .unwrap_or(true)
2176                            })
2177                            .cloned()
2178                            .collect();
2179
2180                        if missing.is_empty() {
2181                            tracing::info!(
2182                                "Supervisor '{}' autonomous: all outputs satisfied after {} iterations",
2183                                config.name, iteration - 1
2184                            );
2185                            break;
2186                        }
2187
2188                        tracing::info!(
2189                            "Supervisor '{}' autonomous iteration {}/{}: missing {:?}",
2190                            config.name,
2191                            iteration,
2192                            max_iter,
2193                            missing
2194                        );
2195
2196                        // Collect agents responsible for the missing outputs (deduplicated)
2197                        let mut agents_to_call: Vec<String> = Vec::new();
2198                        for key in &missing {
2199                            if let Some(owner) = config.output_owners.get(key.as_str()) {
2200                                if !agents_to_call.contains(owner) {
2201                                    agents_to_call.push(owner.clone());
2202                                }
2203                            }
2204                        }
2205
2206                        if agents_to_call.is_empty() {
2207                            tracing::warn!(
2208                                "Supervisor '{}' autonomous: no owners for missing {:?}, stopping",
2209                                config.name,
2210                                missing
2211                            );
2212                            break;
2213                        }
2214
2215                        let mut iter_child_results = Vec::new();
2216                        for agent_name in &agents_to_call {
2217                            if let Some(handler) = child_map.get(agent_name.as_str()) {
2218                                inject_mcps(agent_name, &current);
2219                                let max_attempts = config.max_retries_per_child + 1;
2220                                let mut succeeded = false;
2221                                let mut total_ms = 0u128;
2222                                let mut last_err = String::new();
2223
2224                                for attempt in 1..=max_attempts {
2225                                    let (result, ms) = run_child(
2226                                        agent_name,
2227                                        handler,
2228                                        current.clone(),
2229                                        config.child_timeout_ms,
2230                                    )
2231                                    .await;
2232                                    total_ms += ms;
2233                                    match result {
2234                                        Ok(new_state) => {
2235                                            current = new_state;
2236                                            succeeded = true;
2237                                            break;
2238                                        }
2239                                        Err(e) => {
2240                                            last_err = e.to_string();
2241                                            if attempt < max_attempts {
2242                                                tracing::warn!(
2243                                                    "Supervisor '{}' autonomous: '{}' failed attempt {}/{}: {}",
2244                                                    config.name, agent_name, attempt, max_attempts, e
2245                                                );
2246                                            }
2247                                        }
2248                                    }
2249                                }
2250
2251                                iter_child_results.push(json!({
2252                                    "name": agent_name,
2253                                    "success": succeeded,
2254                                    "duration_ms": total_ms,
2255                                    "error": if succeeded { serde_json::Value::Null } else { json!(last_err) },
2256                                }));
2257
2258                                if !succeeded && config.fail_fast {
2259                                    break 'outer;
2260                                }
2261                            } else {
2262                                tracing::warn!(
2263                                    "Supervisor '{}' autonomous: agent '{}' not found",
2264                                    config.name,
2265                                    agent_name
2266                                );
2267                            }
2268                        }
2269
2270                        iteration_log.push(json!({
2271                            "iteration": iteration,
2272                            "missing_before": missing,
2273                            "agents_called": agents_to_call,
2274                            "results": iter_child_results,
2275                        }));
2276                    }
2277
2278                    // Final completeness check
2279                    let final_missing: Vec<String> = config
2280                        .required_outputs
2281                        .iter()
2282                        .filter(|key| {
2283                            current
2284                                .get(key.as_str())
2285                                .map(|v| v.is_null())
2286                                .unwrap_or(true)
2287                        })
2288                        .cloned()
2289                        .collect();
2290
2291                    let success = final_missing.is_empty();
2292                    current.set(
2293                        format!("__supervisor_meta__{}", config.name),
2294                        json!({
2295                            "strategy": "autonomous",
2296                            "goal": config.goal,
2297                            "required_outputs": config.required_outputs,
2298                            "iterations": iteration_log,
2299                            "missing_outputs": final_missing,
2300                            "success": success,
2301                            "duration_ms": start.elapsed().as_millis(),
2302                        }),
2303                    );
2304                    tracing::info!(
2305                        "Supervisor '{}' autonomous done in {}ms, success={}",
2306                        config.name,
2307                        start.elapsed().as_millis(),
2308                        success
2309                    );
2310                    Ok(current)
2311                }
2312
2313                // ── Dynamic (LLM-driven) ────────────────────────────────────
2314                // The supervisor asks an LLM which agents to call, in what order.
2315                OrchestrationStrategy::Dynamic => {
2316                    let mut current = state;
2317
2318                    let child_map: HashMap<String, ArcHandler<SharedState>> = children
2319                        .iter()
2320                        .map(|(n, h)| (n.clone(), h.clone()))
2321                        .collect();
2322
2323                    let child_names: Vec<String> =
2324                        children.iter().map(|(n, _)| n.clone()).collect();
2325                    let max_iter = if config.max_iterations == 0 {
2326                        10
2327                    } else {
2328                        config.max_iterations
2329                    };
2330                    let mut iteration_log = Vec::new();
2331
2332                    let llm = llm_client.clone();
2333
2334                    'dynamic_outer: for iteration in 1..=max_iter {
2335                        println!("\n  ── Dynamic iteration {iteration}/{max_iter} ──");
2336
2337                        // Build a filtered view: separate populated vs null keys, hide internals
2338                        let all_keys: Vec<String> = current.keys().collect();
2339                        let populated_keys: Vec<&String> = all_keys
2340                            .iter()
2341                            .filter(|k| !k.starts_with('_'))
2342                            .filter(|k| {
2343                                current
2344                                    .get(k.as_str())
2345                                    .map(|v| !v.is_null())
2346                                    .unwrap_or(false)
2347                            })
2348                            .collect();
2349                        let null_keys: Vec<&String> = all_keys
2350                            .iter()
2351                            .filter(|k| !k.starts_with('_'))
2352                            .filter(|k| {
2353                                current.get(k.as_str()).map(|v| v.is_null()).unwrap_or(true)
2354                            })
2355                            .collect();
2356
2357                        println!("  [dynamic]  Completed: {populated_keys:?}");
2358                        println!("  [dynamic]  Missing:   {null_keys:?}");
2359
2360                        // Ask LLM which agents to call
2361                        let agents_to_call = if let Some(ref llm) = llm {
2362                            let system_prompt = config.selector_prompt.as_deref().unwrap_or(
2363                                "You pick which agents to run. Reply with ONLY a JSON array of agent name strings. \
2364                                 Example: [\"agent_a\", \"agent_b\"]. If no agents needed, reply []."
2365                            );
2366                            let user_prompt = if null_keys.is_empty() {
2367                                "All outputs are complete. Reply with [].".to_string()
2368                            } else {
2369                                format!(
2370                                    "Goal: {}\n\
2371                                     Agents: {:?}\n\
2372                                     Done: {:?}\n\
2373                                     Still needed: {:?}\n\
2374                                     Pick the agents to run next. Reply with ONLY a JSON array.",
2375                                    config.goal.as_deref().unwrap_or("Complete the task"),
2376                                    child_names,
2377                                    populated_keys,
2378                                    null_keys,
2379                                )
2380                            };
2381
2382                            println!("  [dynamic]  Asking LLM...");
2383
2384                            let messages = vec![
2385                                crate::core::llm::Message::system(system_prompt),
2386                                crate::core::llm::Message::user(user_prompt),
2387                            ];
2388
2389                            match llm.chat(messages).await {
2390                                Ok(response) => {
2391                                    println!(
2392                                        "  [dynamic]  LLM raw response: {:?}",
2393                                        response.content
2394                                    );
2395                                    let content = response.content.trim();
2396                                    // Try to extract JSON array from the response
2397                                    let json_str = if let Some(start) = content.find('[') {
2398                                        if let Some(end) = content.rfind(']') {
2399                                            &content[start..=end]
2400                                        } else {
2401                                            content
2402                                        }
2403                                    } else {
2404                                        content
2405                                    };
2406                                    let parsed = serde_json::from_str::<Vec<String>>(json_str)
2407                                        .unwrap_or_default();
2408                                    if parsed.is_empty() && !content.contains("[]") {
2409                                        println!("  [dynamic]  WARNING: could not parse LLM response as JSON array");
2410                                    }
2411                                    parsed
2412                                }
2413                                Err(e) => {
2414                                    println!("  [dynamic]  LLM error: {e}");
2415                                    child_names.clone()
2416                                }
2417                            }
2418                        } else {
2419                            // No LLM — fall back to output_owners
2420                            let missing: Vec<String> = config
2421                                .required_outputs
2422                                .iter()
2423                                .filter(|key| {
2424                                    current
2425                                        .get(key.as_str())
2426                                        .map(|v| v.is_null())
2427                                        .unwrap_or(true)
2428                                })
2429                                .cloned()
2430                                .collect();
2431
2432                            if missing.is_empty() {
2433                                break;
2434                            }
2435
2436                            let mut agents = Vec::new();
2437                            for key in &missing {
2438                                if let Some(owner) = config.output_owners.get(key.as_str()) {
2439                                    if !agents.contains(owner) {
2440                                        agents.push(owner.clone());
2441                                    }
2442                                }
2443                            }
2444                            if agents.is_empty() {
2445                                break;
2446                            }
2447                            agents
2448                        };
2449
2450                        // If LLM returned empty but outputs still missing, use fallback
2451                        if agents_to_call.is_empty() {
2452                            let still_missing: Vec<String> = config
2453                                .required_outputs
2454                                .iter()
2455                                .filter(|key| {
2456                                    current
2457                                        .get(key.as_str())
2458                                        .map(|v| v.is_null())
2459                                        .unwrap_or(true)
2460                                })
2461                                .cloned()
2462                                .collect();
2463
2464                            if still_missing.is_empty() {
2465                                println!("  [dynamic]  All required outputs present → done");
2466                                break;
2467                            }
2468
2469                            println!("  [dynamic]  LLM returned [] but still missing: {still_missing:?}, using fallback");
2470                            let mut fallback = Vec::new();
2471                            for key in &still_missing {
2472                                if let Some(owner) = config.output_owners.get(key.as_str()) {
2473                                    if !fallback.contains(owner)
2474                                        && child_map.contains_key(owner.as_str())
2475                                    {
2476                                        fallback.push(owner.clone());
2477                                    }
2478                                }
2479                            }
2480                            if fallback.is_empty() {
2481                                println!("  [dynamic]  No fallback agents, stopping");
2482                                break;
2483                            }
2484
2485                            // Run fallback agents
2486                            let mut iter_results = Vec::new();
2487                            for agent_name in &fallback {
2488                                if let Some(handler) = child_map.get(agent_name.as_str()) {
2489                                    inject_mcps(agent_name, &current);
2490                                    println!("  [dynamic]  Running {agent_name}...");
2491                                    let (result, ms) = run_child(
2492                                        agent_name,
2493                                        handler,
2494                                        current.clone(),
2495                                        config.child_timeout_ms,
2496                                    )
2497                                    .await;
2498                                    match result {
2499                                        Ok(new_state) => {
2500                                            current = new_state;
2501                                            println!("  [dynamic]  {agent_name} ✓ ({ms}ms)");
2502                                            iter_results.push(json!({
2503                                                "name": agent_name, "success": true, "duration_ms": ms,
2504                                            }));
2505                                        }
2506                                        Err(e) => {
2507                                            println!("  [dynamic]  {agent_name} ✗ ({ms}ms): {e}");
2508                                            iter_results.push(json!({
2509                                                "name": agent_name, "success": false,
2510                                                "error": e.to_string(), "duration_ms": ms,
2511                                            }));
2512                                            if config.fail_fast {
2513                                                break 'dynamic_outer;
2514                                            }
2515                                        }
2516                                    }
2517                                }
2518                            }
2519                            iteration_log.push(json!({
2520                                "iteration": iteration,
2521                                "agents_called": fallback,
2522                                "results": iter_results,
2523                            }));
2524                            continue;
2525                        }
2526
2527                        println!("  [dynamic]  Calling: {agents_to_call:?}");
2528
2529                        let mut iter_results = Vec::new();
2530                        for agent_name in &agents_to_call {
2531                            if let Some(handler) = child_map.get(agent_name.as_str()) {
2532                                inject_mcps(agent_name, &current);
2533                                println!("  [dynamic]  Running {agent_name}...");
2534                                let (result, ms) = run_child(
2535                                    agent_name,
2536                                    handler,
2537                                    current.clone(),
2538                                    config.child_timeout_ms,
2539                                )
2540                                .await;
2541                                match result {
2542                                    Ok(new_state) => {
2543                                        current = new_state;
2544                                        println!("  [dynamic]  {agent_name} ✓ ({ms}ms)");
2545                                        iter_results.push(json!({
2546                                            "name": agent_name, "success": true, "duration_ms": ms,
2547                                        }));
2548                                    }
2549                                    Err(e) => {
2550                                        println!("  [dynamic]  {agent_name} ✗ ({ms}ms): {e}");
2551                                        iter_results.push(json!({
2552                                            "name": agent_name, "success": false,
2553                                            "error": e.to_string(), "duration_ms": ms,
2554                                        }));
2555                                        if config.fail_fast {
2556                                            break 'dynamic_outer;
2557                                        }
2558                                    }
2559                                }
2560                            } else {
2561                                println!("  [dynamic]  WARNING: agent '{agent_name}' not found");
2562                            }
2563                        }
2564
2565                        iteration_log.push(json!({
2566                            "iteration": iteration,
2567                            "agents_called": agents_to_call,
2568                            "results": iter_results,
2569                        }));
2570                    }
2571
2572                    current.set(
2573                        format!("__supervisor_meta__{}", config.name),
2574                        json!({
2575                            "strategy": "dynamic",
2576                            "goal": config.goal,
2577                            "iterations": iteration_log,
2578                            "duration_ms": start.elapsed().as_millis(),
2579                        }),
2580                    );
2581                    Ok(current)
2582                }
2583
2584                // ── RoundRobin ─────────────────────────────────────────────────
2585                // Tasks from a state array are distributed across agents in rotation.
2586                OrchestrationStrategy::RoundRobin => {
2587                    let tasks_key = config.tasks_key.as_deref().unwrap_or("tasks");
2588                    let tasks: Vec<serde_json::Value> = state
2589                        .get(tasks_key)
2590                        .and_then(|v| v.as_array().cloned())
2591                        .unwrap_or_default();
2592
2593                    if tasks.is_empty() {
2594                        tracing::warn!(
2595                            "Supervisor '{}' round_robin: no tasks at key '{}'",
2596                            config.name,
2597                            tasks_key
2598                        );
2599                        state.set(
2600                            format!("__supervisor_meta__{}", config.name),
2601                            json!({"strategy": "round_robin", "tasks": 0, "success": true}),
2602                        );
2603                        return Ok(state);
2604                    }
2605
2606                    let mut current = state;
2607                    let num_children = children.len();
2608                    let mut all_results: Vec<serde_json::Value> = Vec::new();
2609                    let mut child_results = Vec::new();
2610                    let mut errors: Vec<String> = Vec::new();
2611
2612                    for (i, task) in tasks.iter().enumerate() {
2613                        let child_idx = i % num_children;
2614                        let (name, handler) = &children[child_idx];
2615
2616                        current.set("__current_task__".to_string(), task.clone());
2617                        current.set("__task_index__".to_string(), json!(i));
2618                        inject_mcps(name, &current);
2619
2620                        let (result, ms) =
2621                            run_child(name, handler, current.clone(), config.child_timeout_ms)
2622                                .await;
2623                        match result {
2624                            Ok(new_state) => {
2625                                if let Some(r) = new_state.get("__task_result__") {
2626                                    all_results.push(r);
2627                                }
2628                                current = new_state;
2629                                child_results.push(json!({
2630                                    "task_index": i, "agent": name, "success": true, "duration_ms": ms,
2631                                }));
2632                            }
2633                            Err(e) => {
2634                                errors.push(format!("Task {}: {}", i, e));
2635                                child_results.push(json!({
2636                                    "task_index": i, "agent": name, "success": false,
2637                                    "error": e.to_string(), "duration_ms": ms,
2638                                }));
2639                                if config.fail_fast {
2640                                    break;
2641                                }
2642                            }
2643                        }
2644                    }
2645
2646                    current.set("__round_robin_results__".to_string(), json!(all_results));
2647                    current.set(
2648                        format!("__supervisor_meta__{}", config.name),
2649                        json!({
2650                            "strategy": "round_robin",
2651                            "tasks": tasks.len(),
2652                            "children": child_results,
2653                            "errors": errors,
2654                            "success": errors.is_empty(),
2655                            "duration_ms": start.elapsed().as_millis(),
2656                        }),
2657                    );
2658                    Ok(current)
2659                }
2660
2661                // ── Hierarchical ───────────────────────────────────────────────
2662                // Delegates to sub-supervisors. Each child is expected to be a
2663                // supervisor or subgraph managing its own agent group.
2664                OrchestrationStrategy::Hierarchical => {
2665                    let mut current = state;
2666                    let mut child_results = Vec::new();
2667                    let mut errors: Vec<String> = Vec::new();
2668
2669                    for (name, handler) in &children {
2670                        tracing::info!(
2671                            "Supervisor '{}' hierarchical: delegating to sub-supervisor '{}'",
2672                            config.name,
2673                            name
2674                        );
2675
2676                        // Skip condition
2677                        if let Some(cond) = config.skip_conditions.get(name.as_str()) {
2678                            if should_skip(cond, &current) {
2679                                child_results.push(json!({"name": name, "skipped": true}));
2680                                continue;
2681                            }
2682                        }
2683
2684                        inject_mcps(name, &current);
2685                        let max_attempts = config.max_retries_per_child + 1;
2686                        let mut succeeded = false;
2687                        let mut total_ms = 0u128;
2688                        let mut last_err = String::new();
2689
2690                        for attempt in 1..=max_attempts {
2691                            let (result, ms) =
2692                                run_child(name, handler, current.clone(), config.child_timeout_ms)
2693                                    .await;
2694                            total_ms += ms;
2695                            match result {
2696                                Ok(new_state) => {
2697                                    current = new_state;
2698                                    succeeded = true;
2699                                    break;
2700                                }
2701                                Err(e) => {
2702                                    last_err = e.to_string();
2703                                    if attempt < max_attempts {
2704                                        tracing::warn!(
2705                                            "Supervisor '{}' hierarchical: sub-supervisor '{}' failed attempt {}/{}",
2706                                            config.name, name, attempt, max_attempts
2707                                        );
2708                                    }
2709                                }
2710                            }
2711                        }
2712
2713                        child_results.push(json!({
2714                            "name": name, "success": succeeded,
2715                            "duration_ms": total_ms,
2716                            "error": if succeeded { serde_json::Value::Null } else { json!(last_err) },
2717                        }));
2718
2719                        if !succeeded {
2720                            errors.push(format!("Sub-supervisor '{}': {}", name, last_err));
2721                            if config.fail_fast {
2722                                break;
2723                            }
2724                        }
2725                    }
2726
2727                    current.set(
2728                        format!("__supervisor_meta__{}", config.name),
2729                        json!({
2730                            "strategy": "hierarchical",
2731                            "children": child_results,
2732                            "errors": errors,
2733                            "success": errors.is_empty(),
2734                            "duration_ms": start.elapsed().as_millis(),
2735                        }),
2736                    );
2737                    Ok(current)
2738                }
2739
2740                // ── Broadcast (Fan-out) ────────────────────────────────────────
2741                // Same task sent to all agents; best result selected.
2742                // Each child gets a deep_clone so writes are isolated.
2743                OrchestrationStrategy::Broadcast => {
2744                    let base_state = Arc::new(state);
2745                    let semaphore = config
2746                        .max_concurrent
2747                        .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
2748
2749                    let futures: Vec<_> = children
2750                        .iter()
2751                        .map(|(name, handler)| {
2752                            let state_copy = base_state.deep_clone();
2753                            inject_mcps(name, &state_copy);
2754                            let timeout_ms = config.child_timeout_ms;
2755                            let sem = semaphore.clone();
2756                            let name = name.clone();
2757                            let handler = handler.clone();
2758                            async move {
2759                                let _permit = if let Some(ref sem) = sem {
2760                                    Some(sem.acquire().await.expect("semaphore closed"))
2761                                } else {
2762                                    None
2763                                };
2764                                let (result, ms) =
2765                                    run_child(&name, &handler, state_copy, timeout_ms).await;
2766                                (name, result, ms)
2767                            }
2768                        })
2769                        .collect();
2770
2771                    let results = futures::future::join_all(futures).await;
2772
2773                    let score_key = config.score_key.as_deref().unwrap_or("__score__");
2774                    let criteria = config
2775                        .selection_criteria
2776                        .as_deref()
2777                        .unwrap_or("first_success");
2778
2779                    let mut successes: Vec<(String, SharedState, u128)> = Vec::new();
2780                    let mut child_results = Vec::new();
2781                    let mut errors: Vec<String> = Vec::new();
2782
2783                    for (name, result, ms) in results {
2784                        match result {
2785                            Ok(s) => {
2786                                child_results.push(json!({
2787                                    "name": name, "success": true, "duration_ms": ms,
2788                                }));
2789                                successes.push((name, s, ms));
2790                            }
2791                            Err(e) => {
2792                                child_results.push(json!({
2793                                    "name": name, "success": false,
2794                                    "error": e.to_string(), "duration_ms": ms,
2795                                }));
2796                                errors.push(format!("'{}': {}", name, e));
2797                            }
2798                        }
2799                    }
2800
2801                    if successes.is_empty() {
2802                        let final_state = (*base_state).clone();
2803                        final_state.set(
2804                            format!("__supervisor_meta__{}", config.name),
2805                            json!({
2806                                "strategy": "broadcast",
2807                                "children": child_results,
2808                                "errors": errors,
2809                                "success": false,
2810                                "duration_ms": start.elapsed().as_millis(),
2811                            }),
2812                        );
2813                        return Ok(final_state);
2814                    }
2815
2816                    let (winner_name, winner_state) = match criteria {
2817                        "highest_score" => {
2818                            let best = successes.into_iter().max_by(|(_, a, _), (_, b, _)| {
2819                                let sa = a.get(score_key).and_then(|v| v.as_f64()).unwrap_or(0.0);
2820                                let sb = b.get(score_key).and_then(|v| v.as_f64()).unwrap_or(0.0);
2821                                sa.partial_cmp(&sb).unwrap_or(std::cmp::Ordering::Equal)
2822                            });
2823                            best.map(|(n, s, _)| (n, s))
2824                                .unwrap_or_else(|| ("unknown".to_string(), (*base_state).clone()))
2825                        }
2826                        _ => {
2827                            // first_success
2828                            successes
2829                                .into_iter()
2830                                .next()
2831                                .map(|(n, s, _)| (n, s))
2832                                .unwrap_or_else(|| ("unknown".to_string(), (*base_state).clone()))
2833                        }
2834                    };
2835
2836                    winner_state.set(
2837                        format!("__supervisor_meta__{}", config.name),
2838                        json!({
2839                            "strategy": "broadcast",
2840                            "winner": winner_name,
2841                            "selection_criteria": criteria,
2842                            "children": child_results,
2843                            "success": true,
2844                            "duration_ms": start.elapsed().as_millis(),
2845                        }),
2846                    );
2847                    Ok(winner_state)
2848                }
2849
2850                // ── MapReduce ──────────────────────────────────────────────────
2851                // Input split into chunks, each processed by a child in parallel, then merged.
2852                OrchestrationStrategy::MapReduce => {
2853                    let map_key = config.map_key.as_deref().unwrap_or("input_chunks");
2854                    let reduce_key = config.reduce_key.as_deref().unwrap_or("reduced_output");
2855
2856                    let chunks: Vec<serde_json::Value> = state
2857                        .get(map_key)
2858                        .and_then(|v| v.as_array().cloned())
2859                        .unwrap_or_default();
2860
2861                    if chunks.is_empty() {
2862                        tracing::warn!(
2863                            "Supervisor '{}' map_reduce: no data at key '{}'",
2864                            config.name,
2865                            map_key
2866                        );
2867                        state.set(
2868                            format!("__supervisor_meta__{}", config.name),
2869                            json!({"strategy": "map_reduce", "chunks": 0, "success": true}),
2870                        );
2871                        state.set(reduce_key.to_string(), json!([]));
2872                        return Ok(state);
2873                    }
2874
2875                    let base_state = Arc::new(state);
2876                    let num_children = children.len();
2877                    let semaphore = config
2878                        .max_concurrent
2879                        .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
2880
2881                    // Map phase: distribute chunks across children
2882                    let futures: Vec<_> = chunks
2883                        .iter()
2884                        .enumerate()
2885                        .map(|(i, chunk)| {
2886                            let child_idx = i % num_children;
2887                            let (name, handler) = children[child_idx].clone();
2888                            let chunk_state = base_state.deep_clone();
2889                            chunk_state.set("__map_chunk__".to_string(), chunk.clone());
2890                            chunk_state.set("__chunk_index__".to_string(), json!(i));
2891                            let timeout_ms = config.child_timeout_ms;
2892                            let sem = semaphore.clone();
2893                            async move {
2894                                let _permit = if let Some(ref sem) = sem {
2895                                    Some(sem.acquire().await.expect("semaphore closed"))
2896                                } else {
2897                                    None
2898                                };
2899                                let (result, ms) =
2900                                    run_child(&name, &handler, chunk_state, timeout_ms).await;
2901                                (i, name, result, ms)
2902                            }
2903                        })
2904                        .collect();
2905
2906                    let results = futures::future::join_all(futures).await;
2907
2908                    // Reduce phase: collect results in order
2909                    let mut reduced: Vec<serde_json::Value> = vec![json!(null); chunks.len()];
2910                    let mut child_results = Vec::new();
2911                    let mut errors: Vec<String> = Vec::new();
2912
2913                    for (i, name, result, ms) in results {
2914                        match result {
2915                            Ok(s) => {
2916                                let chunk_result = s.get("__map_result__").unwrap_or(json!(null));
2917                                reduced[i] = chunk_result;
2918                                child_results.push(json!({
2919                                    "chunk_index": i, "agent": name, "success": true, "duration_ms": ms,
2920                                }));
2921                            }
2922                            Err(e) => {
2923                                errors.push(format!("Chunk {}: {}", i, e));
2924                                child_results.push(json!({
2925                                    "chunk_index": i, "agent": name, "success": false,
2926                                    "error": e.to_string(), "duration_ms": ms,
2927                                }));
2928                            }
2929                        }
2930                    }
2931
2932                    let final_state = (*base_state).clone();
2933                    final_state.set(reduce_key.to_string(), json!(reduced));
2934                    final_state.set(
2935                        format!("__supervisor_meta__{}", config.name),
2936                        json!({
2937                            "strategy": "map_reduce",
2938                            "chunks": chunks.len(),
2939                            "children": child_results,
2940                            "errors": errors,
2941                            "success": errors.is_empty(),
2942                            "duration_ms": start.elapsed().as_millis(),
2943                        }),
2944                    );
2945                    Ok(final_state)
2946                }
2947
2948                // ── ConditionalRouting ─────────────────────────────────────────
2949                // Routes to the most appropriate agent based on state-driven rules.
2950                OrchestrationStrategy::ConditionalRouting => {
2951                    let child_map: HashMap<String, ArcHandler<SharedState>> = children
2952                        .iter()
2953                        .map(|(n, h)| (n.clone(), h.clone()))
2954                        .collect();
2955
2956                    let mut selected_name: Option<String> = None;
2957
2958                    for (condition, child_name) in &config.routing_rules {
2959                        if should_skip(condition, &state) {
2960                            // Condition is truthy → route to this agent
2961                            if child_map.contains_key(child_name.as_str()) {
2962                                selected_name = Some(child_name.clone());
2963                                break;
2964                            }
2965                        }
2966                    }
2967
2968                    // Default to first child
2969                    let agent_name = selected_name.unwrap_or_else(|| {
2970                        let default = children.first().map(|(n, _)| n.clone()).unwrap_or_default();
2971                        tracing::info!(
2972                            "Supervisor '{}' conditional_routing: no rule matched, using default '{}'",
2973                            config.name, default
2974                        );
2975                        default
2976                    });
2977
2978                    if let Some(handler) = child_map.get(&agent_name) {
2979                        let (result, ms) =
2980                            run_child(&agent_name, handler, state, config.child_timeout_ms).await;
2981                        match result {
2982                            Ok(final_state) => {
2983                                final_state.set(
2984                                    format!("__supervisor_meta__{}", config.name),
2985                                    json!({
2986                                        "strategy": "conditional_routing",
2987                                        "routed_to": agent_name,
2988                                        "duration_ms": ms,
2989                                        "success": true,
2990                                    }),
2991                                );
2992                                Ok(final_state)
2993                            }
2994                            Err(e) => Err(e),
2995                        }
2996                    } else {
2997                        Err(FlowgentraError::ConfigError(format!(
2998                            "Supervisor '{}': conditional routing target '{}' not found",
2999                            config.name, agent_name
3000                        )))
3001                    }
3002                }
3003
3004                // ── RetryFallback ──────────────────────────────────────────────
3005                // Agents tried in order until one succeeds.
3006                OrchestrationStrategy::RetryFallback => {
3007                    let order: Vec<String> = if !config.fallback_order.is_empty() {
3008                        config.fallback_order.clone()
3009                    } else {
3010                        children.iter().map(|(n, _)| n.clone()).collect()
3011                    };
3012
3013                    let child_map: HashMap<String, ArcHandler<SharedState>> = children
3014                        .iter()
3015                        .map(|(n, h)| (n.clone(), h.clone()))
3016                        .collect();
3017
3018                    let mut child_results = Vec::new();
3019                    let mut last_error = String::new();
3020
3021                    for agent_name in &order {
3022                        if let Some(handler) = child_map.get(agent_name.as_str()) {
3023                            let max_attempts = config.max_retries_per_child + 1;
3024                            let mut total_ms = 0u128;
3025
3026                            for attempt in 1..=max_attempts {
3027                                let (result, ms) = run_child(
3028                                    agent_name,
3029                                    handler,
3030                                    state.clone(),
3031                                    config.child_timeout_ms,
3032                                )
3033                                .await;
3034                                total_ms += ms;
3035
3036                                match result {
3037                                    Ok(final_state) => {
3038                                        child_results.push(json!({
3039                                            "name": agent_name, "success": true,
3040                                            "duration_ms": total_ms, "attempts": attempt,
3041                                        }));
3042                                        final_state.set(
3043                                            format!("__supervisor_meta__{}", config.name),
3044                                            json!({
3045                                                "strategy": "retry_fallback",
3046                                                "succeeded_agent": agent_name,
3047                                                "children": child_results,
3048                                                "success": true,
3049                                                "duration_ms": start.elapsed().as_millis(),
3050                                            }),
3051                                        );
3052                                        tracing::info!(
3053                                            "Supervisor '{}' retry_fallback: '{}' succeeded",
3054                                            config.name,
3055                                            agent_name
3056                                        );
3057                                        return Ok(final_state);
3058                                    }
3059                                    Err(e) => {
3060                                        last_error = e.to_string();
3061                                        if attempt < max_attempts {
3062                                            tracing::warn!(
3063                                                "Supervisor '{}' retry_fallback: '{}' failed attempt {}/{}",
3064                                                config.name, agent_name, attempt, max_attempts
3065                                            );
3066                                        }
3067                                    }
3068                                }
3069                            }
3070
3071                            child_results.push(json!({
3072                                "name": agent_name, "success": false,
3073                                "error": last_error, "duration_ms": total_ms,
3074                                "attempts": max_attempts,
3075                            }));
3076                            tracing::info!(
3077                                "Supervisor '{}' retry_fallback: '{}' exhausted, trying next fallback",
3078                                config.name, agent_name
3079                            );
3080                        }
3081                    }
3082
3083                    // All agents failed
3084                    state.set(
3085                        format!("__supervisor_meta__{}", config.name),
3086                        json!({
3087                            "strategy": "retry_fallback",
3088                            "children": child_results,
3089                            "success": false,
3090                            "error": format!("All fallback agents failed. Last: {}", last_error),
3091                            "duration_ms": start.elapsed().as_millis(),
3092                        }),
3093                    );
3094                    Err(FlowgentraError::ExecutionError(format!(
3095                        "Supervisor '{}': all fallback agents failed. Last error: {}",
3096                        config.name, last_error
3097                    )))
3098                }
3099
3100                // ── Debate / Critique ──────────────────────────────────────────
3101                // Agents generate responses and critique each other across rounds.
3102                OrchestrationStrategy::Debate => {
3103                    let mut current = state;
3104                    let rounds = if config.debate_rounds == 0 {
3105                        2
3106                    } else {
3107                        config.debate_rounds
3108                    };
3109                    let mut debate_log: Vec<serde_json::Value> = Vec::new();
3110
3111                    let child_map: HashMap<String, ArcHandler<SharedState>> = children
3112                        .iter()
3113                        .map(|(n, h)| (n.clone(), h.clone()))
3114                        .collect();
3115                    let child_names: Vec<String> =
3116                        children.iter().map(|(n, _)| n.clone()).collect();
3117
3118                    for round in 0..rounds {
3119                        tracing::info!(
3120                            "Supervisor '{}' debate: round {}/{}",
3121                            config.name,
3122                            round + 1,
3123                            rounds
3124                        );
3125                        let mut round_responses: Vec<serde_json::Value> = Vec::new();
3126
3127                        // Provide debate context to each agent
3128                        if !debate_log.is_empty() {
3129                            current.set("__debate_history__".to_string(), json!(debate_log));
3130                        }
3131                        current.set("__debate_round__".to_string(), json!(round));
3132
3133                        for agent_name in &child_names {
3134                            if let Some(handler) = child_map.get(agent_name.as_str()) {
3135                                // Set previous round's responses so agent can critique
3136                                if !round_responses.is_empty() {
3137                                    current.set(
3138                                        "__debate_current_responses__".to_string(),
3139                                        json!(round_responses),
3140                                    );
3141                                }
3142
3143                                let (result, ms) = run_child(
3144                                    agent_name,
3145                                    handler,
3146                                    current.clone(),
3147                                    config.child_timeout_ms,
3148                                )
3149                                .await;
3150
3151                                match result {
3152                                    Ok(new_state) => {
3153                                        let response = new_state
3154                                            .get("__debate_response__")
3155                                            .unwrap_or(json!(null));
3156                                        round_responses.push(json!({
3157                                            "agent": agent_name,
3158                                            "response": response,
3159                                            "duration_ms": ms,
3160                                        }));
3161                                        current = new_state;
3162                                    }
3163                                    Err(e) => {
3164                                        round_responses.push(json!({
3165                                            "agent": agent_name,
3166                                            "error": e.to_string(),
3167                                            "duration_ms": ms,
3168                                        }));
3169                                    }
3170                                }
3171                            }
3172                        }
3173
3174                        current.set(
3175                            "__debate_current_responses__".to_string(),
3176                            json!(round_responses.clone()),
3177                        );
3178                        debate_log.push(json!({
3179                            "round": round + 1,
3180                            "responses": round_responses,
3181                        }));
3182                    }
3183
3184                    current.set("__debate_log__".to_string(), json!(debate_log));
3185                    current.set(
3186                        format!("__supervisor_meta__{}", config.name),
3187                        json!({
3188                            "strategy": "debate",
3189                            "rounds": rounds,
3190                            "participants": child_names,
3191                            "success": true,
3192                            "duration_ms": start.elapsed().as_millis(),
3193                        }),
3194                    );
3195                    Ok(current)
3196                }
3197
3198                OrchestrationStrategy::Custom(strategy_name) => {
3199                    Err(FlowgentraError::ConfigError(format!(
3200                        "Orchestrator '{}': custom strategy '{}' is not implemented.",
3201                        config.name, strategy_name
3202                    )))
3203                }
3204            }
3205        })
3206    })
3207}
3208
3209fn create_loop_standalone_handler(
3210    config: crate::core::node::advanced_nodes::LoopNodeConfig,
3211) -> Handler<SharedState> {
3212    use serde_json::json;
3213
3214    Box::new(move |state| {
3215        let config = config.clone();
3216        Box::pin(async move {
3217            // Use handler name as the key base (may be empty — fall back to "loop")
3218            let key_base = if config.handler.is_empty() {
3219                "loop".to_string()
3220            } else {
3221                config.handler.clone()
3222            };
3223            let iteration_key = format!("__loop_iteration__{}", key_base);
3224            let continue_key = format!("__loop_continue__{}", key_base);
3225
3226            let iteration = state
3227                .get(&iteration_key)
3228                .and_then(|v| v.as_u64())
3229                .unwrap_or(0) as usize
3230                + 1;
3231
3232            let break_now = config
3233                .break_condition
3234                .as_ref()
3235                .map(|cond| state.get(cond).and_then(|v| v.as_bool()).unwrap_or(false))
3236                .unwrap_or(false);
3237
3238            let should_continue = iteration < config.max_iterations && !break_now;
3239
3240            state.set(&iteration_key, json!(iteration));
3241            state.set(&continue_key, json!(should_continue));
3242            state.set(
3243                format!("__loop_meta__{}", key_base),
3244                json!({
3245                    "iteration": iteration,
3246                    "max_iterations": config.max_iterations,
3247                    "should_continue": should_continue,
3248                    "break_condition_met": break_now,
3249                }),
3250            );
3251
3252            tracing::info!(
3253                "Standalone loop '{}' iteration={}/{}, continue={}",
3254                key_base,
3255                iteration,
3256                config.max_iterations,
3257                should_continue
3258            );
3259
3260            Ok(state)
3261        })
3262    })
3263}
3264
3265// ── Subgraph ──────────────────────────────────────────────────────────────────
3266/// Creates a handler that loads and runs a nested agent from a YAML config file.
3267///
3268/// The subgraph receives the parent's current state, executes its own full graph,
3269/// and returns the final state back to the parent graph. This enables true
3270/// hierarchical multi-agent systems where each subgraph is an independent agent.
3271///
3272/// Handlers are auto-discovered from the shared inventory (#[register_handler] pool).
3273///
3274/// YAML:
3275/// ```yaml
3276/// - name: research_agent
3277///   type: subgraph           # aliases: agent, agent_or_graph
3278///   config:
3279///     path: agents/research_agent.yaml
3280///
3281/// - name: coordinator
3282///   type: supervisor
3283///   config:
3284///     strategy: sequential
3285///     children: [research_agent, writer_agent]
3286/// ```
3287///
3288/// State: parent state flows in → subgraph executes fully → result flows back to parent
3289fn create_subgraph_handler(
3290    config: crate::core::node::agent_or_graph_node::SubgraphNodeConfig,
3291) -> Handler<SharedState> {
3292    Box::new(move |state| {
3293        let path = config.path.clone();
3294        let name = config.name.clone();
3295        Box::pin(async move {
3296            tracing::info!("SubgraphNode '{}': loading agent from '{}'", name, path);
3297
3298            // Compile the subgraph from its YAML using the shared handler inventory.
3299            // This is synchronous (YAML parse + hashmap build) — safe inside async.
3300            let mut sub_agent = from_config_path(&path).map_err(|e| {
3301                FlowgentraError::ConfigError(format!(
3302                    "SubgraphNode '{}': failed to load '{}': {}",
3303                    name, path, e
3304                ))
3305            })?;
3306
3307            // Inject parent state into the subgraph
3308            sub_agent.state = state;
3309
3310            // Execute the subgraph and return its final state to the parent graph
3311            let result = sub_agent.run().await?;
3312            tracing::info!("SubgraphNode '{}': completed", name);
3313            Ok(result)
3314        })
3315    })
3316}