flowgentra_ai/core/agent/mod.rs
1use tracing::info;
2// # Agent API - High-Level Interface
3//
4// The `Agent` struct provides a simple API for creating and running agents.
5// Most users interact with this module through the `from_config_path()` function
6// which uses automatic handler discovery.
7//
8// ## Quick Start
9//
10// 1. **Decorate your handlers** with `#[register_handler]`
11// 2. **Create a config.yaml** with your agent graph
12// 3. **Use `from_config_path()`** to create and run the agent
13//
14// ```ignore
15// use flowgentra_ai::prelude::*;
16// use serde_json::json;
17//
18// #[register_handler]
19// pub async fn my_handler(mut state: State) -> Result<State> {
20// let input = state.get("input").and_then(|v| v.as_str()).unwrap_or("");
21// state.set("result", json!(input.to_uppercase()));
22// Ok(state)
23// }
24//
25// #[tokio::main]
26// async fn main() -> Result<()> {
27// // Auto-discovers all #[register_handler] functions
28// let mut agent = from_config_path("config.yaml")?;
29//
30// let mut state = State::new(Default::default());
31// state.set("input", json!("hello world"));
32//
33// let result = agent.run(state).await?;
34// println!("Done: {}", result.to_json_string()?);
35// Ok(())
36// }
37// ```
38//
39// ## Handler Registration
40//
41// Handlers are automatically registered via the `#[register_handler]` attribute macro.
42/// Handler names must match the function name and be referenced by that name in your config.yaml.
43pub(crate) use crate::core::config::AgentConfig;
44pub(crate) use crate::core::error::{FlowgentraError, Result};
45pub(crate) use crate::core::llm::{create_llm_client, LLMClient};
46pub(crate) use crate::core::memory::{
47 ConversationMemory, InMemoryConversationMemory, MemoryCheckpointer,
48};
49pub(crate) use crate::core::runtime::AgentRuntime;
50pub(crate) use crate::core::state::SharedState;
51use std::collections::HashMap;
52use std::sync::Arc;
53// Use inventory for auto-registration of handlers - collected dynamically at runtime
54inventory::collect!(HandlerEntry);
55
56// =============================================================================
57// Auto-Registration via Inventory
58// =============================================================================
59
60/// Entry for a handler in the global inventory
61/// Handlers submit themselves to this list for auto-registration
62pub struct HandlerEntry {
63 /// Name of the handler (matches config node names)
64 pub name: String,
65 /// The handler function (always uses SharedState)
66 pub handler: ArcHandler<SharedState>,
67}
68
69impl HandlerEntry {
70 /// Create a new handler entry for auto-registration
71 pub fn new(name: impl Into<String>, handler: ArcHandler<SharedState>) -> HandlerEntry {
72 HandlerEntry {
73 name: name.into(),
74 handler,
75 }
76 }
77}
78
79// Type for Arc-wrapped handlers (used by inventory auto-registration)
80pub type ArcHandler<T> = Arc<
81 dyn Fn(T) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>>
82 + Send
83 + Sync,
84>;
85
86// =============================================================================
87// Type Aliases
88// =============================================================================
89
90/// Type signature for handler functions
91///
92/// A handler receives the current state and returns the updated state.
93/// Handlers are async and can perform I/O operations.
94///
95/// # Example
96/// ```no_run
97/// use flowgentra_ai::core::agent::Handler;
98/// use flowgentra_ai::core::state::SharedState;
99/// use serde_json::json;
100///
101/// let my_handler: Handler<SharedState> = Box::new(|state| {
102/// Box::pin(async move {
103/// let input = state.get("input");
104/// state.set("output", json!("processed"));
105/// Ok(state)
106/// })
107/// });
108/// ```
109pub type Handler<T> = Box<
110 dyn Fn(T) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<T>> + Send>>
111 + Send
112 + Sync,
113>;
114
115/// Type signature for condition functions
116///
117/// A condition function evaluates the current state and returns a boolean.
118/// Used for branching logic in the graph.
119///
120/// # Example
121/// ```no_run
122/// use flowgentra_ai::core::agent::Condition;
123/// use flowgentra_ai::core::state::SharedState;
124///
125/// let is_complex: Condition<SharedState> = Box::new(|state: &SharedState| {
126/// state.get("complexity_score")
127/// .and_then(|v| v.as_i64())
128/// .map(|score| score > 50)
129/// .unwrap_or(false)
130/// });
131/// ```
132pub type Condition<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
133
134/// Registry mapping handler names to handler functions
135pub type HandlerRegistry<T> = HashMap<String, Handler<T>>;
136
137/// Registry mapping condition names to condition functions
138pub type ConditionRegistry<T> = HashMap<String, Condition<T>>;
139
140// =============================================================================
141// Agent - Main API
142// =============================================================================
143
144/// The Agent - your main interface to FlowgentraAI
145///
146/// Create an agent from a YAML config and handler implementations,
147/// then run it with a state to get results.
148///
149/// The Agent handles:
150/// - Loading and validating configuration
151/// - Registering handlers and conditions
152/// - Managing the execution runtime
153/// - Orchestrating node execution
154/// - Optional checkpointer and conversation memory (from config or programmatic)
155pub struct Agent {
156 runtime: AgentRuntime<SharedState>,
157 llm_client: Arc<dyn LLMClient>,
158 config: AgentConfig,
159 /// Current state of the agent (initialized from state_schema)
160 pub state: SharedState,
161 /// Optional conversation memory (message history per thread). Set via config or with_conversation_memory().
162 conversation_memory: Option<Arc<dyn ConversationMemory>>,
163}
164
165impl Agent {
166 /// Log agent startup and configuration for observability
167 pub fn log_startup(&self) {
168 info!(
169 "Agent '{}' starting with config: {:?}",
170 self.config.name, self.config
171 );
172 }
173 /// Create an agent from a YAML config file
174 ///
175 /// This is the main entry point for users. It:
176 /// 1. Loads the YAML config file
177 /// 2. Validates the graph structure
178 /// 3. Creates the runtime with your handlers and conditions
179 /// 4. Sets up the LLM client
180 ///
181 /// # Arguments
182 /// - `config_path`: Path to your `config.yaml` file
183 /// - `handlers`: Registry of handler functions
184 /// - `conditions`: Registry of condition functions
185 ///
186 /// # Example
187 /// ```no_run
188 /// use flowgentra_ai::prelude::*;
189 /// use std::collections::HashMap;
190 ///
191 /// #[tokio::main]
192 /// async fn main() -> Result<()> {
193 /// let handlers = HashMap::new();
194 /// let conditions = HashMap::new();
195 /// let mut agent = Agent::from_config("config.yaml", handlers, conditions)?;
196 /// Ok(())
197 /// }
198 /// ```
199 pub fn from_config(
200 config_path: &str,
201 handlers: HandlerRegistry<SharedState>,
202 conditions: ConditionRegistry<SharedState>,
203 ) -> Result<Self> {
204 let config = AgentConfig::from_file(config_path)?;
205 config.validate()?;
206 Self::from_config_inner(config, handlers, conditions)
207 }
208
209 /// Create an agent from an already-loaded [`AgentConfig`].
210 ///
211 /// Use this when you have a config object in hand (e.g. from `from_config_path`
212 /// which already loaded the file) to avoid reading the file a second time.
213 pub fn from_config_inner(
214 config: AgentConfig,
215 handlers: HandlerRegistry<SharedState>,
216 conditions: ConditionRegistry<SharedState>,
217 ) -> Result<Self> {
218 // Create runtime
219 let mut runtime = AgentRuntime::<SharedState>::from_config(config.clone())?;
220
221 // Create LLM client
222 let llm_client = create_llm_client(&config.llm)?;
223
224 // Convert all handlers to Arc (cloneable) so multiple nodes can share a handler
225 let arc_handlers: HashMap<String, ArcHandler<SharedState>> = handlers
226 .into_iter()
227 .map(|(name, handler)| {
228 let arc: ArcHandler<SharedState> = Arc::new(move |state| handler(state));
229 (name, arc)
230 })
231 .collect();
232
233 // Register handlers by mapping each node config to its handler function.
234 // Looks up by node.name first (from from_config_path, which pre-resolves by node name),
235 // then by node.handler (from manual Agent::from_config, which uses handler names).
236 let mut missing_handlers = Vec::new();
237
238 // Built-in node types that don't require a user-supplied handler
239 const BUILTIN_TYPES: &[&str] = &[
240 "evaluation",
241 "retry",
242 "timeout",
243 "loop",
244 "planner",
245 "human_in_the_loop",
246 "memory",
247 // supervisor (+ backwards-compat alias)
248 "supervisor",
249 "orchestrator",
250 // subgraph (+ backwards-compat aliases)
251 "subgraph",
252 "agent",
253 "agent_or_graph",
254 ];
255
256 // Supervisor-managed children are not in the runtime graph — skip registration.
257 let supervisor_children: std::collections::HashSet<String> = config
258 .graph
259 .nodes
260 .iter()
261 .filter(|n| {
262 matches!(
263 n.node_type.as_deref(),
264 Some("supervisor") | Some("orchestrator")
265 )
266 })
267 .flat_map(|n| {
268 n.config
269 .get("children")
270 .and_then(|v| v.as_array())
271 .map(|arr| {
272 arr.iter()
273 .filter_map(|v| v.as_str().map(String::from))
274 .collect::<Vec<_>>()
275 })
276 .unwrap_or_default()
277 })
278 .collect();
279
280 for node in &config.graph.nodes {
281 if node.name == "START" || node.name == "END" {
282 continue;
283 }
284 if supervisor_children.contains(&node.name) {
285 continue;
286 }
287 if node.handler.starts_with("builtin::") {
288 continue;
289 }
290 let arc_handler = arc_handlers
291 .get(&node.name)
292 .or_else(|| arc_handlers.get(&node.handler))
293 .cloned();
294
295 // Standalone built-in type with no user-supplied handler — only skip when
296 // from_config_path did NOT pre-build a handler for this node.
297 // If a handler IS present (e.g. the planner built by from_config_path), always
298 // register it so the placeholder function is replaced.
299 let is_builtin_standalone = node.handler.is_empty()
300 && node
301 .node_type
302 .as_deref()
303 .is_some_and(|t| BUILTIN_TYPES.contains(&t))
304 && arc_handler.is_none();
305 if is_builtin_standalone {
306 continue;
307 }
308
309 match arc_handler {
310 Some(h) => {
311 runtime
312 .register_node(&node.name, Box::new(move |state| h(state)))
313 .map_err(|e| {
314 if matches!(e, FlowgentraError::NodeNotFound(_)) {
315 FlowgentraError::NodeNotFound(format!(
316 "Node '{}' not found in graph. Check your config.",
317 node.name
318 ))
319 } else {
320 e
321 }
322 })?;
323 }
324 None => {
325 missing_handlers.push(node.handler.clone());
326 }
327 }
328 }
329
330 if !missing_handlers.is_empty() {
331 let available_list = if arc_handlers.is_empty() {
332 "(none registered)".to_string()
333 } else {
334 arc_handlers
335 .keys()
336 .map(|k| format!("'{}'", k))
337 .collect::<Vec<_>>()
338 .join(", ")
339 };
340
341 return Err(FlowgentraError::ConfigError(format!(
342 "Configuration references unknown handler(s): {}.\nAvailable handlers: {}\nMake sure to #[register_handler] these functions in your code.",
343 missing_handlers.iter()
344 .map(|h| format!("'{}'", h))
345 .collect::<Vec<_>>()
346 .join(", "),
347 available_list
348 )));
349 }
350
351 // Register all conditions
352 type EdgeConditionFn = std::sync::Arc<
353 dyn Fn(
354 &SharedState,
355 )
356 -> std::result::Result<Option<String>, crate::core::error::FlowgentraError>
357 + Send
358 + Sync,
359 >;
360 for (condition_name, condition_fn) in conditions {
361 let cond_name_clone = condition_name.clone();
362 let edge_condition: EdgeConditionFn =
363 std::sync::Arc::new(move |state: &SharedState| {
364 if condition_fn(state) {
365 Ok(Some(cond_name_clone.clone()))
366 } else {
367 Ok(None)
368 }
369 });
370
371 let edges_to_register: Vec<String> = runtime
372 .graph()
373 .edges
374 .iter()
375 .filter(|e| e.condition_name.as_deref() == Some(condition_name.as_str()))
376 .map(|e| e.from.clone())
377 .collect();
378
379 for from_node in edges_to_register {
380 runtime.register_edge_condition(
381 &from_node,
382 &condition_name,
383 edge_condition.clone(),
384 )?;
385 }
386 }
387
388 let initial_state = config.create_initial_state();
389 Ok(Agent {
390 runtime,
391 llm_client,
392 config,
393 state: initial_state,
394 conversation_memory: None,
395 })
396 }
397
398 /// Set the checkpointer (e.g. for thread-scoped state persistence). Can also be set via config.yaml `memory.checkpointer`.
399 pub fn set_checkpointer(&mut self, checkpointer: Arc<MemoryCheckpointer>) -> &mut Self {
400 self.runtime.set_checkpointer(checkpointer);
401 self
402 }
403
404 /// Builder-style: set the checkpointer.
405 pub fn with_checkpointer(mut self, checkpointer: Arc<MemoryCheckpointer>) -> Self {
406 self.runtime.set_checkpointer(checkpointer);
407 self
408 }
409
410 /// Set conversation memory (message history per thread). Can also be set via config.yaml `memory.conversation`.
411 pub fn set_conversation_memory(&mut self, memory: Arc<dyn ConversationMemory>) -> &mut Self {
412 self.conversation_memory = Some(memory);
413 self
414 }
415
416 /// Builder-style: set conversation memory.
417 pub fn with_conversation_memory(mut self, memory: Arc<dyn ConversationMemory>) -> Self {
418 self.conversation_memory = Some(memory);
419 self
420 }
421
422 /// Get conversation memory if set (for use in handlers to add/get messages).
423 pub fn conversation_memory(&self) -> Option<Arc<dyn ConversationMemory>> {
424 self.conversation_memory.clone()
425 }
426
427 /// Execute the agent with its current state
428 ///
429 /// Runs the agent through all nodes following the edges until completion.
430 /// Uses the agent's built-in state (initialized from state_schema).
431 /// Automatically injects the LLM configuration into state so handlers can access it.
432 ///
433 /// Set state values before calling:
434 /// ```no_run
435 /// use flowgentra_ai::prelude::*;
436 /// use serde_json::json;
437 ///
438 /// #[tokio::main]
439 /// async fn main() -> Result<()> {
440 /// let mut agent = from_config_path("config.yaml")?;
441 ///
442 /// // Set initial values on agent.state
443 /// agent.state.set("input", json!("Say hello"));
444 ///
445 /// let result_state = agent.run().await?;
446 /// println!("Done! Result: {}", result_state.to_json_string()?);
447 ///
448 /// Ok(())
449 /// }
450 /// ```
451 pub async fn run(&mut self) -> Result<SharedState> {
452 // Automatically inject LLM config into state for handler access
453 let llm_config_json =
454 serde_json::to_value(&self.config.llm).unwrap_or_else(|_| serde_json::json!({}));
455 self.state.set("_llm_config", llm_config_json); // Ensure set() is defined in State
456
457 // Automatically inject MCP configs into state for handler access
458 if !self.config.graph.mcps.is_empty() {
459 let mcp_configs_json = serde_json::to_value(&self.config.graph.mcps)
460 .unwrap_or_else(|_| serde_json::json!({}));
461 self.state.set("_mcp_configs", mcp_configs_json);
462 }
463
464 // Automatically inject RAG config into state for handler access
465 if let Some(ref rag_config) = self.config.graph.rag {
466 let mut resolved = rag_config.clone();
467 resolved.resolve_env_vars();
468 let rag_config_json =
469 serde_json::to_value(&resolved).unwrap_or_else(|_| serde_json::json!({}));
470 self.state.set("_rag_config", rag_config_json);
471 }
472
473 self.runtime.execute(self.state.clone()).await
474 }
475
476 /// Run with a thread id for checkpointing and conversation memory. When a checkpointer is set,
477 /// state is loaded from the last checkpoint for this thread (if any) and saved after each node.
478 /// Use the same thread_id with conversation_memory to get/add messages for this conversation.
479 /// Automatically injects the LLM configuration into state so handlers can access it.
480 pub async fn run_with_thread(&mut self, thread_id: &str) -> Result<SharedState> {
481 // Automatically inject LLM config into state for handler access
482 let llm_config_json =
483 serde_json::to_value(&self.config.llm).unwrap_or_else(|_| serde_json::json!({}));
484 self.state.set("_llm_config", llm_config_json);
485
486 // Automatically inject MCP configs into state for handler access
487 if !self.config.graph.mcps.is_empty() {
488 let mcp_configs_json = serde_json::to_value(&self.config.graph.mcps)
489 .unwrap_or_else(|_| serde_json::json!({}));
490 self.state.set("_mcp_configs", mcp_configs_json);
491 }
492
493 // Automatically inject RAG config into state for handler access
494 if let Some(ref rag_config) = self.config.graph.rag {
495 let mut resolved = rag_config.clone();
496 resolved.resolve_env_vars();
497 let rag_config_json =
498 serde_json::to_value(&resolved).unwrap_or_else(|_| serde_json::json!({}));
499 self.state.set("_rag_config", rag_config_json);
500 }
501
502 self.runtime
503 .execute_with_thread(thread_id, self.state.clone())
504 .await
505 }
506
507 /// Get the LLM client for use in handlers
508 ///
509 /// Handlers can use this to access the configured LLM provider.
510 pub fn llm_client(&self) -> Arc<dyn LLMClient> {
511 Arc::clone(&self.llm_client)
512 }
513
514 /// Get a reference to the agent's configuration
515 pub fn config(&self) -> &AgentConfig {
516 &self.config
517 }
518
519 // Visualize the agent's execution graph
520 //
521 // Generates a text-based or graphical representation of your agent's workflow.
522 // Useful for debugging and documentation.
523 //
524 // visualize_graph requires type parameter T to be known, which is not available in Agent context
525 // pub async fn visualize_graph(&self, output_path: &str) -> Result<()> {
526 // self.runtime.visualize_graph(output_path)
527 // }
528
529 /// Get mutable access to the underlying runtime
530 ///
531 /// For advanced users who need direct runtime access.
532 pub fn runtime_mut(&mut self) -> &mut AgentRuntime<SharedState> {
533 &mut self.runtime
534 }
535
536 // ==============================================
537 // Memory Management API
538 // ==============================================
539
540 /// Initialize message history in state for memory
541 ///
542 /// Creates an empty message history that handlers can append to.
543 ///
544 /// # Example (Code API)
545 /// ```ignore
546 /// let mut agent = from_config_path("config.yaml")?;
547 /// agent.enable_message_history()?; // Just add this line!
548 /// ```
549 ///
550 /// # Example (Config approach - just add to state_schema)
551 /// ```yaml
552 /// state_schema:
553 /// messages:
554 /// type: array
555 /// description: "Message history"
556 /// ```
557 pub fn enable_message_history(&mut self) -> Result<()> {
558 use crate::core::state::MessageHistory;
559 let history = MessageHistory::new();
560 history.save_to_state(&self.state)?;
561 Ok(())
562 }
563
564 /// Add a memory handler node to the graph
565 ///
566 /// Allows programmatic addition of memory handlers without editing config.
567 /// Handlers: "memory::append_message", "memory::compress_history", "memory::clear_history"
568 ///
569 /// # Example
570 /// ```ignore
571 /// agent.add_memory_handler("append", "memory::append_message")?;
572 /// agent.add_memory_handler("compress", "memory::compress_history")?;
573 /// ```
574 pub fn add_memory_handler(&mut self, node_name: &str, handler_type: &str) -> Result<()> {
575 // Register the memory handler in the runtime
576 let handler: Handler<SharedState> = match handler_type {
577 "memory::append_message" => Box::new(|state| {
578 Box::pin(crate::core::node::memory_handlers::append_message_handler(
579 state,
580 ))
581 }),
582 "memory::compress_history" => Box::new(|state| {
583 Box::pin(crate::core::node::memory_handlers::compress_history_handler(state))
584 }),
585 "memory::clear_history" => Box::new(|state| {
586 Box::pin(crate::core::node::memory_handlers::clear_history_handler(
587 state,
588 ))
589 }),
590 "memory::get_message_count" => Box::new(|state| {
591 Box::pin(crate::core::node::memory_handlers::get_message_count_handler(state))
592 }),
593 "memory::format_history_for_context" => Box::new(|state| {
594 Box::pin(
595 crate::core::node::memory_handlers::format_history_for_context_handler(state),
596 )
597 }),
598 _ => {
599 return Err(FlowgentraError::ValidationError(format!(
600 "Unknown memory handler: {}",
601 handler_type
602 )))
603 }
604 };
605
606 self.runtime.register_node(node_name, handler)?;
607
608 Ok(())
609 }
610
611 /// Get message history helper
612 ///
613 /// Convenience method to work with message history
614 ///
615 /// # Example
616 /// ```ignore
617 /// let mut history = agent.get_message_history()?;
618 /// history.add_user_message("Hello!");
619 /// history.save_to_state(&agent.state)?;
620 /// ```
621 pub fn get_message_history(&self) -> Result<crate::core::state::MessageHistory> {
622 crate::core::state::MessageHistory::from_state(&self.state)
623 }
624
625 /// Set message history from user messages
626 ///
627 /// Useful for loading existing conversation or initializing with context
628 ///
629 /// # Example
630 /// ```ignore
631 /// let messages = vec![
632 /// ("user", "What is Rust?"),
633 /// ("assistant", "Rust is a systems programming language..."),
634 /// ];
635 /// for (role, content) in &messages {
636 /// agent.add_message(role, content)?;
637 /// }
638 /// ```
639 pub fn add_message(&mut self, role: &str, content: &str) -> Result<()> {
640 let mut history = self.get_message_history()?;
641 match role {
642 "user" => history.add_user_message(content),
643 "assistant" => history.add_assistant_message(content),
644 "system" => history.add_system_message(content),
645 _ => {
646 return Err(FlowgentraError::ValidationError(format!(
647 "Invalid role: {}",
648 role
649 )))
650 }
651 }
652 history.save_to_state(&self.state)?;
653 Ok(())
654 }
655
656 /// Clear all messages from history
657 ///
658 /// Useful for resetting conversation state
659 pub fn clear_messages(&mut self) -> Result<()> {
660 let history = crate::core::state::MessageHistory::new();
661 history.save_to_state(&self.state)?;
662 Ok(())
663 }
664
665 /// Get custom state field helper
666 ///
667 /// For Pattern 4: Custom State Fields
668 pub fn custom_state(&self) -> Result<crate::core::state::CustomState> {
669 crate::core::state::CustomState::from_state(&self.state)
670 }
671
672 /// Set a custom state field
673 pub fn set_custom_field(&mut self, key: &str, value: serde_json::Value) -> Result<()> {
674 let mut custom = self.custom_state()?;
675 custom.set(key, value);
676 custom.save_to_state(&self.state)?;
677 Ok(())
678 }
679}
680
681/// Create an agent from config path only - handlers are auto-discovered!
682///
683/// Handlers registered via `#[register_handler]` attribute are automatically collected
684/// and available to the agent builder. No manual registration needed!
685///
686/// # Arguments
687/// - `config_path`: Path to your `config.yaml` file
688///
689/// # Returns
690/// Result with Agent if successful, error if any handlers are missing
691///
692/// # Error Messages
693/// The function will provide helpful error messages if handlers are missing,
694/// showing which handlers are required by the config but not registered.
695///
696/// # Example
697/// ```ignore
698/// use flowgentra_ai::prelude::*;
699///
700/// #[register_handler]
701/// pub async fn my_handler(state: State) -> Result<State> {
702/// state.set("output", json!("done"));
703/// Ok(state)
704/// }
705///
706/// #[tokio::main]
707/// async fn main() -> Result<()> {
708/// // That's it! Just create the agent from config
709/// let mut agent = from_config_path("config.yaml")?;
710///
711/// let state = State::new(Default::default());
712/// let result = agent.run(state).await?;
713/// Ok(())
714/// }
715/// ```
716pub fn from_config_path(config_path: &str) -> Result<Agent> {
717 // Load config to get required node names
718 let mut config = AgentConfig::from_file(config_path)?;
719 config.validate()?;
720
721 // Resolve relative paths in subgraph configs against the parent config file's directory.
722 // This ensures `path: agents/researcher.yaml` in config.yaml correctly resolves to
723 // `<config_dir>/agents/researcher.yaml` regardless of the process CWD.
724 let config_dir = std::path::Path::new(config_path)
725 .parent()
726 .filter(|p| !p.as_os_str().is_empty())
727 .unwrap_or(std::path::Path::new("."))
728 .to_path_buf();
729
730 // For stdio MCPs: resolve working_dir and command path.
731 // - Default working_dir to config file's directory so relative script paths work
732 // - Resolve command via PATH if it's not an absolute path
733 for mcp in config.graph.mcps.values_mut() {
734 if mcp.connection_type == crate::core::mcp::MCPConnectionType::Stdio {
735 if mcp.connection_settings.working_dir.is_none() {
736 mcp.connection_settings.working_dir =
737 Some(config_dir.to_string_lossy().to_string());
738 }
739 // Resolve command to absolute path if needed (e.g. "python" → "/usr/bin/python")
740 let cmd = mcp.stdio_command().to_string();
741 if !std::path::Path::new(&cmd).is_absolute() {
742 let mut resolved = String::new();
743
744 // Try system lookup (where on Windows, which on Unix)
745 let lookup_cmd = if cfg!(windows) { "where" } else { "which" };
746 if let Ok(output) = std::process::Command::new(lookup_cmd).arg(&cmd).output() {
747 if output.status.success() {
748 resolved = String::from_utf8_lossy(&output.stdout)
749 .lines()
750 .next()
751 .unwrap_or("")
752 .trim()
753 .to_string();
754 }
755 }
756
757 // On Windows, also search common installation paths
758 #[cfg(windows)]
759 if resolved.is_empty() && (cmd == "python" || cmd == "python3") {
760 let candidates = [
761 // Standard Python installer paths
762 "C:\\Python312\\python.exe".to_string(),
763 "C:\\Python311\\python.exe".to_string(),
764 "C:\\Python310\\python.exe".to_string(),
765 "C:\\Python39\\python.exe".to_string(),
766 // Program Files
767 "C:\\Program Files\\Python312\\python.exe".to_string(),
768 "C:\\Program Files\\Python311\\python.exe".to_string(),
769 "C:\\Program Files\\Python310\\python.exe".to_string(),
770 "C:\\Program Files\\Python39\\python.exe".to_string(),
771 ];
772 if let Ok(home) = std::env::var("USERPROFILE") {
773 let user_candidates = [
774 format!(
775 "{}\\AppData\\Local\\Programs\\Python\\Python312\\python.exe",
776 home
777 ),
778 format!(
779 "{}\\AppData\\Local\\Programs\\Python\\Python311\\python.exe",
780 home
781 ),
782 format!(
783 "{}\\AppData\\Local\\Programs\\Python\\Python310\\python.exe",
784 home
785 ),
786 ];
787 for c in user_candidates.iter().chain(candidates.iter()) {
788 if std::path::Path::new(c).exists() {
789 resolved = c.clone();
790 break;
791 }
792 }
793 } else {
794 for c in &candidates {
795 if std::path::Path::new(c).exists() {
796 resolved = c.clone();
797 break;
798 }
799 }
800 }
801 }
802
803 if !resolved.is_empty() {
804 tracing::info!(command = %cmd, resolved = %resolved, "Resolved stdio command path");
805 if mcp.command.is_some() {
806 mcp.command = Some(resolved);
807 } else {
808 mcp.uri = resolved;
809 }
810 }
811 }
812 }
813 }
814
815 // Collect all registered handlers from inventory (ArcHandler = cloneable)
816 let mut handlers_map: HashMap<String, ArcHandler<SharedState>> = HashMap::new();
817 for entry in inventory::iter::<HandlerEntry> {
818 handlers_map.insert(entry.name.clone(), entry.handler.clone());
819 }
820
821 // Inject builtin::planner into handlers_map for backward compatibility
822 // (when a node uses handler: "builtin::planner" instead of type: "planner")
823 let has_legacy_planner = config
824 .graph
825 .nodes
826 .iter()
827 .any(|n| n.handler == "builtin::planner");
828 let has_planner_type = config
829 .graph
830 .nodes
831 .iter()
832 .any(|n| n.node_type.as_deref() == Some("planner"));
833 if has_legacy_planner || has_planner_type {
834 let llm_client = config.create_llm_client()?;
835 let prompt_template = config.graph.planner.prompt_template.clone();
836 let planner_fn = Arc::new(crate::core::node::planner::create_planner_handler(
837 llm_client,
838 prompt_template,
839 ));
840 let arc_handler: ArcHandler<SharedState> =
841 Arc::new(move |state| planner_fn.as_ref()(state));
842 handlers_map.insert("__builtin_planner__".to_string(), arc_handler);
843 }
844
845 // Build handler registry keyed by NODE NAME.
846 // Dispatch based on node type — every built-in type is detected here.
847 let mut node_handlers: HandlerRegistry<SharedState> = HashMap::new();
848 let mut missing_handlers: Vec<String> = Vec::new();
849
850 // Helper: look up a handler by name, or record it as missing
851 macro_rules! require_handler {
852 ($name:expr) => {
853 match handlers_map.get($name) {
854 Some(h) => h.clone(),
855 None => {
856 missing_handlers.push($name.to_string());
857 continue;
858 }
859 }
860 };
861 }
862
863 for node_config in &config.graph.nodes {
864 if node_config.name == "START" || node_config.name == "END" {
865 continue;
866 }
867 // Supervisor nodes are built in a second pass (need child handlers to exist first).
868 // Subgraph nodes are self-contained and built in the second pass too.
869 match node_config.node_type.as_deref() {
870 Some("supervisor") | Some("orchestrator") => continue,
871 Some("subgraph") | Some("agent") | Some("agent_or_graph") => continue,
872 _ => {}
873 }
874
875 let node_name = node_config.name.clone();
876 let handler = match node_config.node_type.as_deref() {
877 // ── Evaluation: loop until confident ───────────────────────────
878 // Standalone (no handler): scores the current state field and writes
879 // evaluation metadata. Designed to be used as a graph node with back-edges.
880 // Wrapping mode (handler provided): calls the handler repeatedly until
881 // min_confidence is reached or max_retries is exhausted.
882 Some("evaluation") => {
883 use crate::core::node::evaluation_node::EvaluationNodeConfig;
884 let cfg = EvaluationNodeConfig::from_node_config(node_config)?;
885 if node_config.handler.is_empty() {
886 create_evaluation_standalone_handler(cfg)
887 } else {
888 let arc = require_handler!(&node_config.handler);
889 wrap_handler_with_evaluation(arc, cfg)
890 }
891 }
892
893 // ── Retry: exponential-backoff retry management ─────────────────
894 // Standalone: manages __retry_count__ and __retry_should_retry__ in
895 // state. Pair with a back-edge and condition on __retry_should_retry__.
896 // Wrapping mode: re-calls the handler on error automatically.
897 Some("retry") => {
898 use crate::core::node::nodes_trait::RetryNodeConfig;
899 let cfg = RetryNodeConfig::from_node_config(node_config)?;
900 if node_config.handler.is_empty() {
901 create_retry_standalone_handler(cfg)
902 } else {
903 let arc = require_handler!(&node_config.handler);
904 wrap_handler_with_retry(arc, cfg)
905 }
906 }
907
908 // ── Timeout: wall-clock deadline tracking ───────────────────────
909 // Standalone: sets __timeout_deadline__ on first visit, checks it on
910 // subsequent visits. Sets __timeout_timed_out__ for routing.
911 // Wrapping mode: aborts the handler if it exceeds the duration.
912 Some("timeout") => {
913 use crate::core::node::nodes_trait::TimeoutNodeConfig;
914 let cfg = TimeoutNodeConfig::from_node_config(node_config)?;
915 if node_config.handler.is_empty() {
916 create_timeout_standalone_handler(cfg)
917 } else {
918 let arc = require_handler!(&node_config.handler);
919 wrap_handler_with_timeout(arc, cfg)
920 }
921 }
922
923 // ── Loop: iteration counter management ──────────────────────────
924 // Standalone: tracks __loop_iteration__ and __loop_continue__ in state.
925 // Pair with a back-edge conditioned on __loop_continue__.
926 // Wrapping mode: runs the handler up to max_iterations times inline.
927 Some("loop") => {
928 use crate::core::node::advanced_nodes::LoopNodeConfig;
929 let cfg = LoopNodeConfig::from_node_config(node_config)?;
930 if node_config.handler.is_empty() {
931 create_loop_standalone_handler(cfg)
932 } else {
933 let arc = require_handler!(&node_config.handler);
934 wrap_handler_with_loop(arc, cfg)
935 }
936 }
937
938 // ── Planner: LLM-driven next-node selection (no user handler) ───
939 // `type: planner` always uses the builtin planner regardless of the handler field.
940 // `handler: "builtin::planner"` on a typeless node is the legacy spelling.
941 // NOTE: the guard `if` on a `|` pattern applies to ALL alternatives, so these
942 // two cases must be separate arms to avoid the guard blocking `Some("planner")`.
943 Some("planner") => {
944 let arc = handlers_map
945 .get("__builtin_planner__")
946 .cloned()
947 .expect("planner was pre-injected");
948 Box::new(move |state| arc(state))
949 }
950 None if node_config.handler == "builtin::planner" => {
951 let arc = handlers_map
952 .get("__builtin_planner__")
953 .cloned()
954 .expect("planner was pre-injected");
955 Box::new(move |state| arc(state))
956 }
957
958 // ── Human-in-the-loop: pause for human approval/edit ───────────
959 Some("human_in_the_loop") => {
960 use crate::core::node::nodes_trait::HumanInTheLoopConfig;
961 let cfg = HumanInTheLoopConfig::from_node_config(node_config)?;
962 create_human_in_loop_handler(cfg)
963 }
964
965 // ── Memory: built-in memory operations ─────────────────────────
966 Some("memory") => {
967 let op = node_config
968 .config
969 .get("operation")
970 .and_then(|v| v.as_str())
971 .unwrap_or("");
972 match create_memory_handler(op) {
973 Some(h) => h,
974 None => {
975 return Err(FlowgentraError::ConfigError(format!(
976 "Unknown memory operation '{}' for node '{}'. \
977 Valid operations: append_message, compress_history, \
978 clear_history, get_message_count, format_history_for_context",
979 op, node_config.name
980 )))
981 }
982 }
983 }
984
985 // ── Plain handler (or unrecognized type: falls back to handler) ─
986 _ => {
987 let arc = require_handler!(&node_config.handler);
988 Box::new(move |state| arc(state))
989 }
990 };
991
992 node_handlers.insert(node_name, handler);
993 }
994
995 // ── Second pass: supervisor + subgraph nodes ────────────────────────────────
996 // Convert first-pass handlers to Arc so supervisor children can be shared
997 // without being consumed (each child must remain a standalone graph node too).
998 let node_handlers_arc: HashMap<String, ArcHandler<SharedState>> = node_handlers
999 .into_iter()
1000 .map(|(name, h)| {
1001 let arc: ArcHandler<SharedState> = Arc::new(move |state| h(state));
1002 (name, arc)
1003 })
1004 .collect();
1005
1006 let mut second_pass_handlers: HandlerRegistry<SharedState> = HashMap::new();
1007
1008 // ── Sub-pass A: index all subgraph configs by node name ───────────────────
1009 // create_subgraph_handler is cheap (YAML loads only at execution time),
1010 // so we build on demand in sub-pass B rather than pre-building.
1011 use crate::core::node::agent_or_graph_node::SubgraphNodeConfig;
1012 let subgraph_configs: HashMap<String, SubgraphNodeConfig> = config
1013 .graph
1014 .nodes
1015 .iter()
1016 .filter(|n| {
1017 matches!(
1018 n.node_type.as_deref(),
1019 Some("subgraph") | Some("agent") | Some("agent_or_graph")
1020 )
1021 })
1022 .map(|n| {
1023 let mut cfg = SubgraphNodeConfig::from_node_config(n)?;
1024 // Resolve subgraph path relative to the parent config file's directory
1025 if !std::path::Path::new(&cfg.path).is_absolute() {
1026 cfg.path = config_dir.join(&cfg.path).to_string_lossy().to_string();
1027 }
1028 Ok((n.name.clone(), cfg))
1029 })
1030 .collect::<Result<_>>()?;
1031
1032 // Register all subgraph nodes as standalone graph nodes too
1033 for (name, cfg) in &subgraph_configs {
1034 second_pass_handlers.insert(name.clone(), create_subgraph_handler(cfg.clone()));
1035 }
1036
1037 // ── Sub-pass B: build supervisor handlers ─────────────────────────────────
1038 // Child resolution order:
1039 // 1. first-pass plain handlers (node_handlers_arc)
1040 // 2. subgraph nodes (subgraph_configs — built fresh per supervisor)
1041 // 3. already-built supervisor handlers (built_supervisor_arcs — enables nesting)
1042 // 4. inventory handlers (handlers_map)
1043 //
1044 // Supervisors are built in multiple passes so that parent supervisors can
1045 // reference child supervisors that were built in an earlier pass.
1046 let supervisor_nodes: Vec<_> = config
1047 .graph
1048 .nodes
1049 .iter()
1050 .filter(|n| {
1051 matches!(
1052 n.node_type.as_deref(),
1053 Some("supervisor") | Some("orchestrator")
1054 )
1055 })
1056 .collect();
1057
1058 let mut built_supervisor_arcs: HashMap<String, ArcHandler<SharedState>> = HashMap::new();
1059 let mut remaining: Vec<_> = supervisor_nodes.iter().map(|n| n.name.clone()).collect();
1060 let max_passes = remaining.len() + 1; // guard against infinite loops
1061
1062 for _pass in 0..max_passes {
1063 if remaining.is_empty() {
1064 break;
1065 }
1066 let mut still_remaining = Vec::new();
1067
1068 for sup_name in &remaining {
1069 let node_config = supervisor_nodes
1070 .iter()
1071 .find(|n| &n.name == sup_name)
1072 .unwrap();
1073 use crate::core::node::orchestrator_node::SupervisorNodeConfig;
1074 let cfg = SupervisorNodeConfig::from_node_config(node_config)?;
1075
1076 let mut child_arcs: Vec<(String, ArcHandler<SharedState>)> = Vec::new();
1077 let mut all_resolved = true;
1078
1079 for child_name in &cfg.children {
1080 if let Some(arc) = node_handlers_arc.get(child_name).cloned() {
1081 child_arcs.push((child_name.clone(), arc));
1082 } else if let Some(sub_cfg) = subgraph_configs.get(child_name) {
1083 let handler = create_subgraph_handler(sub_cfg.clone());
1084 child_arcs.push((child_name.clone(), Arc::new(move |state| handler(state))));
1085 } else if let Some(arc) = built_supervisor_arcs.get(child_name).cloned() {
1086 // child is a supervisor built in a previous pass
1087 child_arcs.push((child_name.clone(), arc));
1088 } else if let Some(arc) = handlers_map.get(child_name).cloned() {
1089 child_arcs.push((child_name.clone(), arc));
1090 } else {
1091 // child not yet available — might be built in a later pass
1092 all_resolved = false;
1093 break;
1094 }
1095 }
1096
1097 if all_resolved {
1098 // Build per-child MCP map from node configs
1099 let child_mcps: HashMap<String, Vec<String>> = cfg
1100 .children
1101 .iter()
1102 .filter_map(|name| {
1103 let node = config.graph.nodes.iter().find(|n| &n.name == name)?;
1104 if node.mcps.is_empty() {
1105 None
1106 } else {
1107 Some((name.clone(), node.mcps.clone()))
1108 }
1109 })
1110 .collect();
1111
1112 let handler = if matches!(
1113 cfg.strategy,
1114 crate::core::node::orchestrator_node::OrchestrationStrategy::Dynamic
1115 ) {
1116 let llm = config
1117 .create_llm_client()
1118 .ok()
1119 .map(|c| c as Arc<dyn LLMClient>);
1120 create_supervisor_handler_with_llm(cfg, child_arcs, llm, child_mcps)
1121 } else {
1122 create_supervisor_handler(cfg, child_arcs, child_mcps)
1123 };
1124 let arc: ArcHandler<SharedState> = Arc::new(move |state| handler(state));
1125 built_supervisor_arcs.insert(sup_name.clone(), arc);
1126 second_pass_handlers.insert(sup_name.clone(), {
1127 let arc = built_supervisor_arcs.get(sup_name).unwrap().clone();
1128 Box::new(move |state| arc(state))
1129 });
1130 } else {
1131 still_remaining.push(sup_name.clone());
1132 }
1133 }
1134
1135 if still_remaining.len() == remaining.len() {
1136 // No progress — remaining supervisors have unresolvable children
1137 for sup_name in &still_remaining {
1138 let node_config = supervisor_nodes
1139 .iter()
1140 .find(|n| &n.name == sup_name)
1141 .unwrap();
1142 use crate::core::node::orchestrator_node::SupervisorNodeConfig;
1143 let cfg = SupervisorNodeConfig::from_node_config(node_config)?;
1144 for child_name in &cfg.children {
1145 if !node_handlers_arc.contains_key(child_name)
1146 && !subgraph_configs.contains_key(child_name)
1147 && !built_supervisor_arcs.contains_key(child_name)
1148 && !handlers_map.contains_key(child_name)
1149 {
1150 missing_handlers.push(child_name.clone());
1151 }
1152 }
1153 }
1154 break;
1155 }
1156
1157 remaining = still_remaining;
1158 }
1159
1160 // Merge both passes back into a single HandlerRegistry
1161 let mut node_handlers: HandlerRegistry<SharedState> = node_handlers_arc
1162 .into_iter()
1163 .map(|(name, arc)| {
1164 let h: Handler<SharedState> = Box::new(move |state| arc(state));
1165 (name, h)
1166 })
1167 .collect();
1168 node_handlers.extend(second_pass_handlers);
1169
1170 if !missing_handlers.is_empty() {
1171 missing_handlers.dedup();
1172 let missing = missing_handlers.join(", ");
1173 let registered = handlers_map
1174 .keys()
1175 .filter(|k| !k.starts_with("__builtin"))
1176 .map(|k: &String| k.as_str())
1177 .collect::<Vec<_>>()
1178 .join(", ");
1179
1180 let msg = if registered.is_empty() {
1181 format!(
1182 "Missing handlers: {}. No handlers registered yet.\n\
1183 Use #[register_handler] attribute on your handler functions to register them.\n\
1184 Example:\n #[register_handler]\n pub async fn {} (state: State) -> Result<State> {{ ... }}",
1185 missing,
1186 missing_handlers[0]
1187 )
1188 } else {
1189 format!(
1190 "Missing handlers: {}\n\
1191 Registered handlers: {}\n\
1192 Use #[register_handler] attribute to register missing handlers.",
1193 missing, registered
1194 )
1195 };
1196
1197 return Err(crate::core::error::FlowgentraError::ConfigError(msg));
1198 }
1199
1200 // Pass handlers keyed by node name — from_config_inner will match by node.name
1201 let mut agent = Agent::from_config_inner(config.clone(), node_handlers, HashMap::new())?;
1202
1203 // Apply memory from config (checkpointer, conversation memory, buffer/window)
1204 if config
1205 .memory
1206 .checkpointer
1207 .type_
1208 .eq_ignore_ascii_case("memory")
1209 {
1210 agent.set_checkpointer(Arc::new(MemoryCheckpointer::new()));
1211 }
1212 if config.memory.conversation.enabled {
1213 let conv = InMemoryConversationMemory::with_config(&config.memory.buffer);
1214 agent.set_conversation_memory(Arc::new(conv));
1215 }
1216
1217 // Apply evaluation from config (auto-add AutoEvaluationMiddleware)
1218 if let Some(eval_config) = &config.evaluation {
1219 if eval_config.enabled {
1220 use crate::core::evaluation::{
1221 AutoEvaluationMiddleware, ConfidenceConfig, LegacyEvaluationPolicy, RetryConfig,
1222 ScoringCriteria,
1223 };
1224
1225 let policy = LegacyEvaluationPolicy {
1226 enable_scoring: true,
1227 enable_grading: eval_config
1228 .grading
1229 .as_ref()
1230 .map(|g| g.enabled)
1231 .unwrap_or(false),
1232 enable_confidence_scoring: true,
1233 confidence_threshold: eval_config.min_confidence,
1234 max_retries: eval_config.max_retries,
1235 enable_self_correction: true,
1236 store_evaluation_history: true,
1237 };
1238
1239 let scoring_criteria = ScoringCriteria::default();
1240
1241 let confidence_config = ConfidenceConfig {
1242 low_threshold: eval_config.min_confidence * 0.6,
1243 high_threshold: eval_config.min_confidence,
1244 ..Default::default()
1245 };
1246
1247 let retry_config = RetryConfig {
1248 max_retries: eval_config.max_retries,
1249 confidence_threshold: eval_config.min_confidence,
1250 ..Default::default()
1251 };
1252
1253 let middleware = AutoEvaluationMiddleware::new()
1254 .with_policy(policy)
1255 .with_scoring_criteria(scoring_criteria)
1256 .with_confidence_config(confidence_config)
1257 .with_retry_config(retry_config);
1258
1259 agent.runtime_mut().add_middleware(Arc::new(middleware));
1260 }
1261 }
1262
1263 Ok(agent)
1264}
1265
1266// Memory-aware agent wrapper for simplified memory handling
1267mod memory_aware;
1268pub use memory_aware::{MemoryAwareAgent, MemoryStats};
1269
1270// =============================================================================
1271// Built-in Node Handler Wrappers
1272//
1273// Each function converts a built-in node config into a Handler<SharedState>
1274// so every node type is uniformly represented as a NodeFunction in the runtime.
1275// =============================================================================
1276
1277/// Wraps a handler in an evaluation retry loop.
1278/// Delegates to `EvaluationNodeConfig::into_wrapping_node_fn` — single source of truth.
1279fn wrap_handler_with_evaluation(
1280 handler: ArcHandler<SharedState>,
1281 eval_config: crate::core::node::evaluation_node::EvaluationNodeConfig,
1282) -> Handler<SharedState> {
1283 eval_config.into_wrapping_node_fn(handler)
1284}
1285
1286// ── Retry ──────────────────────────────────────────────────────────────────
1287/// Wraps a handler in an exponential-backoff retry loop.
1288/// Re-calls the handler on error up to `max_retries` times.
1289///
1290/// YAML:
1291/// ```yaml
1292/// - name: call_api
1293/// type: retry
1294/// handler: call_api_handler
1295/// config:
1296/// max_retries: 3
1297/// backoff_ms: 1000
1298/// backoff_multiplier: 2.0
1299/// max_backoff_ms: 30000
1300/// ```
1301fn wrap_handler_with_retry(
1302 handler: ArcHandler<SharedState>,
1303 config: crate::core::node::nodes_trait::RetryNodeConfig,
1304) -> Handler<SharedState> {
1305 Box::new(move |state| {
1306 let handler = handler.clone();
1307 let config = config.clone();
1308 Box::pin(async move {
1309 let mut last_err: Option<FlowgentraError> = None;
1310
1311 for attempt in 0..=config.max_retries {
1312 match handler(state.clone()).await {
1313 Ok(new_state) => return Ok(new_state),
1314 Err(e) => {
1315 tracing::warn!(
1316 "Retry '{}' attempt {}/{} failed: {}",
1317 config.name,
1318 attempt + 1,
1319 config.max_retries,
1320 e
1321 );
1322 last_err = Some(e);
1323
1324 if attempt < config.max_retries {
1325 let backoff = if attempt == 0 {
1326 config.backoff_ms
1327 } else {
1328 let exp = config.backoff_ms as f64
1329 * (config.backoff_multiplier as f64).powi(attempt as i32);
1330 (exp as u64).min(config.max_backoff_ms)
1331 };
1332 tokio::time::sleep(std::time::Duration::from_millis(backoff)).await;
1333 }
1334 }
1335 }
1336 }
1337
1338 Err(last_err.unwrap_or_else(|| {
1339 FlowgentraError::RuntimeError(format!(
1340 "Retry '{}' exhausted {} attempts",
1341 config.name, config.max_retries
1342 ))
1343 }))
1344 })
1345 })
1346}
1347
1348// ── Timeout ────────────────────────────────────────────────────────────────
1349/// Wraps a handler with a wall-clock timeout.
1350/// On timeout, behaviour is controlled by `on_timeout`:
1351/// - `"error"` (default) → return an error
1352/// - `"skip"` → return state unchanged
1353/// - `"default_value"` → set `_timeout_default` in state and return ok
1354///
1355/// YAML:
1356/// ```yaml
1357/// - name: slow_op
1358/// type: timeout
1359/// handler: slow_handler
1360/// config:
1361/// timeout_ms: 5000
1362/// on_timeout: "error" # or "skip" or "default_value"
1363/// ```
1364fn wrap_handler_with_timeout(
1365 handler: ArcHandler<SharedState>,
1366 config: crate::core::node::nodes_trait::TimeoutNodeConfig,
1367) -> Handler<SharedState> {
1368 Box::new(move |state| {
1369 let handler = handler.clone();
1370 let config = config.clone();
1371 Box::pin(async move {
1372 let duration = std::time::Duration::from_millis(config.timeout_ms);
1373 match tokio::time::timeout(duration, handler(state.clone())).await {
1374 Ok(Ok(new_state)) => Ok(new_state),
1375 Ok(Err(e)) => Err(e),
1376 Err(_elapsed) => match config.on_timeout.as_str() {
1377 "skip" => Ok(state),
1378 "default_value" => {
1379 if let Some(default) = &config.default_value {
1380 state.set("_timeout_default", default.clone());
1381 }
1382 Ok(state)
1383 }
1384 _ => Err(FlowgentraError::ExecutionTimeout(format!(
1385 "Node '{}' timed out after {}ms",
1386 config.name, config.timeout_ms
1387 ))),
1388 },
1389 }
1390 })
1391 })
1392}
1393
1394// ── Loop ───────────────────────────────────────────────────────────────────
1395/// Wraps a handler in a fixed-iteration loop.
1396/// Exits early when the state field named `break_condition` is true.
1397///
1398/// YAML:
1399/// ```yaml
1400/// - name: iterate_process
1401/// type: loop
1402/// handler: process_step
1403/// config:
1404/// max_iterations: 5
1405/// break_condition: "is_done" # name of a bool state field
1406/// ```
1407fn wrap_handler_with_loop(
1408 handler: ArcHandler<SharedState>,
1409 config: crate::core::node::advanced_nodes::LoopNodeConfig,
1410) -> Handler<SharedState> {
1411 Box::new(move |state| {
1412 let handler = handler.clone();
1413 let config = config.clone();
1414 Box::pin(async move {
1415 let mut state = state;
1416
1417 for iteration in 0..config.max_iterations {
1418 tracing::info!(
1419 "Loop '{}' iteration {}/{}",
1420 config.handler,
1421 iteration + 1,
1422 config.max_iterations
1423 );
1424 state = handler(state).await?;
1425
1426 if let Some(ref cond) = config.break_condition {
1427 if state.get(cond).and_then(|v| v.as_bool()).unwrap_or(false) {
1428 tracing::info!(
1429 "Loop '{}' break condition '{}' met at iteration {}",
1430 config.handler,
1431 cond,
1432 iteration + 1
1433 );
1434 break;
1435 }
1436 }
1437 }
1438
1439 Ok(state)
1440 })
1441 })
1442}
1443
1444// ── Human-in-the-Loop ─────────────────────────────────────────────────────
1445/// Simulates a human approval checkpoint.
1446/// Sets `_human_approved = true` and `_human_node` in state.
1447/// (In production, replace with a real interactive or webhook mechanism.)
1448///
1449/// YAML:
1450/// ```yaml
1451/// - name: approval
1452/// type: human_in_the_loop
1453/// config:
1454/// prompt: "Please approve this action"
1455/// require_approval: true
1456/// editable_fields: ["amount", "recipient"]
1457/// ```
1458fn create_human_in_loop_handler(
1459 config: crate::core::node::nodes_trait::HumanInTheLoopConfig,
1460) -> Handler<SharedState> {
1461 Box::new(move |state| {
1462 let config = config.clone();
1463 Box::pin(async move {
1464 tracing::info!("Human-in-the-loop '{}': {}", config.name, config.prompt);
1465 state.set("_human_approved", serde_json::json!(true));
1466 state.set("_human_node", serde_json::json!(config.name));
1467 if !config.editable_fields.is_empty() {
1468 state.set(
1469 "_human_editable_fields",
1470 serde_json::json!(config.editable_fields),
1471 );
1472 }
1473 Ok(state)
1474 })
1475 })
1476}
1477
1478// ── Memory ─────────────────────────────────────────────────────────────────
1479/// Creates a memory operation handler by operation name.
1480///
1481/// YAML:
1482/// ```yaml
1483/// - name: append_msg
1484/// type: memory
1485/// config:
1486/// operation: append_message # or compress_history, clear_history,
1487/// # get_message_count, format_history_for_context
1488/// ```
1489fn create_memory_handler(operation: &str) -> Option<Handler<SharedState>> {
1490 use crate::core::node::memory_handlers;
1491 match operation {
1492 "append_message" => Some(Box::new(|state| {
1493 Box::pin(memory_handlers::append_message_handler(state))
1494 })),
1495 "compress_history" => Some(Box::new(|state| {
1496 Box::pin(memory_handlers::compress_history_handler(state))
1497 })),
1498 "clear_history" => Some(Box::new(|state| {
1499 Box::pin(memory_handlers::clear_history_handler(state))
1500 })),
1501 "get_message_count" => Some(Box::new(|state| {
1502 Box::pin(memory_handlers::get_message_count_handler(state))
1503 })),
1504 "format_history_for_context" => Some(Box::new(|state| {
1505 Box::pin(memory_handlers::format_history_for_context_handler(state))
1506 })),
1507 _ => None,
1508 }
1509}
1510
1511// =============================================================================
1512// Standalone Built-in Node Handlers
1513//
1514// These create self-contained handlers that manage state flags so the caller
1515// can route the graph based on those flags (e.g. with conditional back-edges).
1516// No user handler is required — they are fully implemented by the library.
1517// =============================================================================
1518
1519// ── Standalone Evaluation ─────────────────────────────────────────────────────
1520/// Reads the configured `field_state` from the current state, scores it with
1521/// the built-in heuristic scorer, and writes evaluation metadata to state.
1522///
1523/// State keys written:
1524/// - `__eval_score__<name>` – current numeric score (0.0–1.0)
1525/// - `__eval_feedback__<name>` – textual feedback for the next handler
1526/// - `__eval_needs_retry__<name>` – bool, true when score < min_confidence
1527/// AND attempt < max_retries
1528/// - `__eval_attempt__<name>` – current attempt counter (starts at 1)
1529/// - `__eval_meta__<name>` – full JSON object with all metadata
1530///
1531/// Pair with a conditional back-edge on `__eval_needs_retry__<name>` to retry.
1532///
1533/// YAML:
1534/// ```yaml
1535/// - name: score_output
1536/// type: evaluation
1537/// config:
1538/// field_state: llm_output
1539/// min_confidence: 0.80
1540/// max_retries: 3
1541/// rubric: "Is the output clear and accurate?"
1542/// ```
1543/// Delegates to `EvaluationNodeConfig::into_standalone_node_fn` — single source of truth.
1544fn create_evaluation_standalone_handler(
1545 config: crate::core::node::evaluation_node::EvaluationNodeConfig,
1546) -> Handler<SharedState> {
1547 config.into_standalone_node_fn()
1548}
1549
1550// ── Standalone Retry ──────────────────────────────────────────────────────────
1551/// Manages a retry counter and computes exponential backoff in state.
1552/// Does NOT call any user handler — use this as a gate node in your graph.
1553///
1554/// State keys written:
1555/// - `__retry_count__<name>` – current attempt number (starts at 1)
1556/// - `__retry_should_retry__<name>` – bool, true while count < max_retries
1557/// - `__retry_meta__<name>` – JSON with attempt / backoff / should_retry
1558///
1559/// Pair with a conditional back-edge on `__retry_should_retry__<name>`.
1560///
1561/// YAML:
1562/// ```yaml
1563/// - name: retry_gate
1564/// type: retry
1565/// config:
1566/// max_retries: 3
1567/// backoff_ms: 500
1568/// backoff_multiplier: 2.0
1569/// max_backoff_ms: 10000
1570/// ```
1571fn create_retry_standalone_handler(
1572 config: crate::core::node::nodes_trait::RetryNodeConfig,
1573) -> Handler<SharedState> {
1574 use serde_json::json;
1575
1576 Box::new(move |state| {
1577 let config = config.clone();
1578 Box::pin(async move {
1579 let count_key = format!("__retry_count__{}", config.name);
1580 let should_retry_key = format!("__retry_should_retry__{}", config.name);
1581
1582 let current = state.get(&count_key).and_then(|v| v.as_u64()).unwrap_or(0) as usize;
1583
1584 let next = current + 1;
1585 let should_retry = current < config.max_retries;
1586
1587 // Exponential backoff for the current attempt
1588 let backoff_ms = if current == 0 {
1589 config.backoff_ms
1590 } else {
1591 let exp = config.backoff_ms as f64
1592 * (config.backoff_multiplier as f64).powi(current as i32);
1593 (exp as u64).min(config.max_backoff_ms)
1594 };
1595
1596 state.set(&count_key, json!(next));
1597 state.set(&should_retry_key, json!(should_retry));
1598 state.set(
1599 format!("__retry_meta__{}", config.name),
1600 json!({
1601 "attempt": next,
1602 "max_retries": config.max_retries,
1603 "should_retry": should_retry,
1604 "backoff_ms": backoff_ms,
1605 }),
1606 );
1607
1608 if should_retry {
1609 tracing::info!(
1610 "Standalone retry '{}' attempt {}/{}, sleeping {}ms",
1611 config.name,
1612 next,
1613 config.max_retries,
1614 backoff_ms
1615 );
1616 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
1617 } else {
1618 tracing::info!(
1619 "Standalone retry '{}' exhausted {} attempts — no more retries",
1620 config.name,
1621 config.max_retries
1622 );
1623 }
1624
1625 Ok(state)
1626 })
1627 })
1628}
1629
1630// ── Standalone Timeout ────────────────────────────────────────────────────────
1631/// Tracks a wall-clock deadline in state.
1632/// On first visit, records `now + timeout_ms` as the deadline.
1633/// On each subsequent visit, checks whether the deadline has passed.
1634///
1635/// State keys written:
1636/// - `__timeout_deadline__<name>` – UNIX-ms deadline (set once on first visit)
1637/// - `__timeout_timed_out__<name>` – bool, true once deadline passes
1638/// - `__timeout_meta__<name>` – JSON with deadline / elapsed info
1639///
1640/// On timeout, behaviour mirrors the wrapping variant:
1641/// - `on_timeout: "error"` → returns an error
1642/// - `on_timeout: "skip"` → continues silently
1643/// - `on_timeout: "default_value"` → writes `_timeout_default` to state
1644///
1645/// YAML:
1646/// ```yaml
1647/// - name: check_deadline
1648/// type: timeout
1649/// config:
1650/// timeout_ms: 30000
1651/// on_timeout: "skip"
1652/// ```
1653fn create_timeout_standalone_handler(
1654 config: crate::core::node::nodes_trait::TimeoutNodeConfig,
1655) -> Handler<SharedState> {
1656 use serde_json::json;
1657
1658 Box::new(move |state| {
1659 let config = config.clone();
1660 Box::pin(async move {
1661 let deadline_key = format!("__timeout_deadline__{}", config.name);
1662 let timed_out_key = format!("__timeout_timed_out__{}", config.name);
1663
1664 let now_ms = std::time::SystemTime::now()
1665 .duration_since(std::time::UNIX_EPOCH)
1666 .unwrap_or_default()
1667 .as_millis() as u64;
1668
1669 let deadline_ms = match state.get(&deadline_key).and_then(|v| v.as_u64()) {
1670 Some(d) => d,
1671 None => {
1672 // First visit: record the deadline, not yet timed out
1673 let d = now_ms + config.timeout_ms;
1674 state.set(&deadline_key, json!(d));
1675 state.set(&timed_out_key, json!(false));
1676 state.set(
1677 format!("__timeout_meta__{}", config.name),
1678 json!({ "timed_out": false, "deadline_ms": d, "current_ms": now_ms }),
1679 );
1680 return Ok(state);
1681 }
1682 };
1683
1684 let timed_out = now_ms > deadline_ms;
1685 state.set(&timed_out_key, json!(timed_out));
1686 state.set(
1687 format!("__timeout_meta__{}", config.name),
1688 json!({
1689 "timed_out": timed_out,
1690 "timeout_ms": config.timeout_ms,
1691 "deadline_ms": deadline_ms,
1692 "current_ms": now_ms,
1693 "elapsed_ms": now_ms.saturating_sub(deadline_ms - config.timeout_ms),
1694 }),
1695 );
1696
1697 if timed_out {
1698 tracing::warn!(
1699 "Standalone timeout '{}' expired (deadline={}ms, now={}ms)",
1700 config.name,
1701 deadline_ms,
1702 now_ms
1703 );
1704 match config.on_timeout.as_str() {
1705 "error" => {
1706 return Err(FlowgentraError::ExecutionTimeout(format!(
1707 "Node '{}' timed out after {}ms",
1708 config.name, config.timeout_ms
1709 )))
1710 }
1711 "default_value" => {
1712 if let Some(default) = &config.default_value {
1713 state.set("_timeout_default", default.clone());
1714 }
1715 }
1716 _ => {} // "skip" — continue silently
1717 }
1718 }
1719
1720 Ok(state)
1721 })
1722 })
1723}
1724
1725// ── Standalone Loop ───────────────────────────────────────────────────────────
1726/// Manages an iteration counter in state.
1727/// Use this as a loop-gate node with a conditional back-edge on
1728/// `__loop_continue__<name>` to build explicit loop subgraphs.
1729///
1730/// State keys written:
1731/// - `__loop_iteration__<name>` – current iteration (starts at 1)
1732/// - `__loop_continue__<name>` – bool, true while within max_iterations
1733/// AND break_condition is not set in state
1734/// - `__loop_meta__<name>` – JSON with full iteration metadata
1735///
1736/// YAML:
1737/// ```yaml
1738/// - name: loop_gate
1739/// type: loop
1740/// config:
1741/// max_iterations: 5
1742/// break_condition: "is_done" # optional bool state field
1743/// ```
1744// ── Supervisor ────────────────────────────────────────────────────────────────
1745/// Coordinates multiple child handlers (or subgraphs) as a single node.
1746///
1747/// This is the canonical multi-agent orchestration pattern:
1748/// a Supervisor delegates to sub-agents, manages their execution, and aggregates results.
1749///
1750/// Sequential: calls children one after another; state flows through each.
1751/// Parallel: calls all children concurrently; merges results per `merge_strategy`.
1752///
1753/// State written:
1754/// - `__supervisor_meta__<name>` — per-child results, errors, timing
1755///
1756/// YAML:
1757/// ```yaml
1758/// - name: research_coordinator
1759/// type: supervisor # alias: orchestrator
1760/// config:
1761/// strategy: sequential # or "parallel"
1762/// children: [research_agent, writer_agent, critic_agent]
1763/// fail_fast: true
1764/// child_timeout_ms: 30000
1765/// collect_stats: true
1766/// merge_strategy: latest # parallel only — or "first_success", "deep_merge"
1767/// parallel_aggregation: all # or "majority", "first_success"
1768/// ```
1769fn create_supervisor_handler(
1770 config: crate::core::node::orchestrator_node::SupervisorNodeConfig,
1771 children: Vec<(String, ArcHandler<SharedState>)>,
1772 child_mcps: HashMap<String, Vec<String>>,
1773) -> Handler<SharedState> {
1774 create_supervisor_handler_with_llm(config, children, None, child_mcps)
1775}
1776
1777fn create_supervisor_handler_with_llm(
1778 config: crate::core::node::orchestrator_node::SupervisorNodeConfig,
1779 children: Vec<(String, ArcHandler<SharedState>)>,
1780 llm_client: Option<Arc<dyn LLMClient>>,
1781 child_mcps: HashMap<String, Vec<String>>,
1782) -> Handler<SharedState> {
1783 use crate::core::node::orchestrator_node::{OrchestrationStrategy, ParallelMergeStrategy};
1784 use serde_json::json;
1785
1786 // Evaluate a skip condition expression against state.
1787 // Supported forms:
1788 // "key" — skip if state[key] is non-null and non-false
1789 // "key != null" — skip if state[key] is non-null
1790 // "key == null" — skip if state[key] is null/absent
1791 // "key == value" — skip if state[key].to_string() == value
1792 fn should_skip(condition: &str, state: &SharedState) -> bool {
1793 let condition = condition.trim();
1794 if condition.contains("!=") {
1795 let parts: Vec<&str> = condition.splitn(2, "!=").collect();
1796 let key = parts[0].trim();
1797 let rhs = parts[1].trim();
1798 let val = state.get(key);
1799 if rhs == "null" {
1800 return val.is_some() && !val.unwrap().is_null();
1801 }
1802 return val
1803 .map(|v| v.to_string().trim_matches('"') != rhs)
1804 .unwrap_or(false);
1805 }
1806 if condition.contains("==") {
1807 let parts: Vec<&str> = condition.splitn(2, "==").collect();
1808 let key = parts[0].trim();
1809 let rhs = parts[1].trim();
1810 let val = state.get(key);
1811 if rhs == "null" {
1812 return val.is_none() || val.unwrap().is_null();
1813 }
1814 return val
1815 .map(|v| v.to_string().trim_matches('"') == rhs)
1816 .unwrap_or(false);
1817 }
1818 // bare key: skip if truthy
1819 state
1820 .get(condition)
1821 .map(|v| !v.is_null() && v.as_bool() != Some(false))
1822 .unwrap_or(false)
1823 }
1824
1825 // Run a single child with optional timeout, returning (result, duration_ms).
1826 async fn run_child(
1827 name: &str,
1828 handler: &ArcHandler<SharedState>,
1829 state: SharedState,
1830 timeout_ms: Option<u64>,
1831 ) -> (crate::core::error::Result<SharedState>, u128) {
1832 let child_start = std::time::Instant::now();
1833 let result = if let Some(ms) = timeout_ms {
1834 tokio::time::timeout(std::time::Duration::from_millis(ms), handler(state))
1835 .await
1836 .unwrap_or_else(|_| {
1837 Err(FlowgentraError::ExecutionTimeout(format!(
1838 "Child '{}' timed out after {}ms",
1839 name, ms
1840 )))
1841 })
1842 } else {
1843 handler(state).await
1844 };
1845 (result, child_start.elapsed().as_millis())
1846 }
1847
1848 Box::new(move |state| {
1849 let config = config.clone();
1850 let children = children.clone();
1851 let llm_client = llm_client.clone();
1852 let child_mcps = child_mcps.clone();
1853 Box::pin(async move {
1854 let start = std::time::Instant::now();
1855
1856 // Inject per-node MCP assignments into state before running a child
1857 let inject_mcps = |child_name: &str, state: &SharedState| {
1858 if let Some(mcps) = child_mcps.get(child_name) {
1859 state.set("_node_mcps", serde_json::json!(mcps));
1860 } else {
1861 state.remove("_node_mcps");
1862 }
1863 };
1864
1865 match &config.strategy {
1866 // ── Sequential ──────────────────────────────────────────────
1867 OrchestrationStrategy::Sequential => {
1868 let mut current = state;
1869 let mut child_results = Vec::new();
1870 let mut errors: Vec<String> = Vec::new();
1871
1872 for (name, handler) in &children {
1873 // ── Skip condition ──────────────────────────────────
1874 if let Some(cond) = config.skip_conditions.get(name.as_str()) {
1875 if should_skip(cond, ¤t) {
1876 tracing::info!(
1877 "Supervisor '{}': skipping '{}' (condition: {cond})",
1878 config.name,
1879 name
1880 );
1881 child_results.push(json!({
1882 "name": name, "skipped": true, "condition": cond
1883 }));
1884 continue;
1885 }
1886 }
1887
1888 // ── Run with retry ──────────────────────────────────
1889 let max_attempts = config.max_retries_per_child + 1;
1890 let mut last_err = String::new();
1891 let mut succeeded = false;
1892 let mut total_ms = 0u128;
1893
1894 inject_mcps(name, ¤t);
1895 for attempt in 1..=max_attempts {
1896 let (result, ms) =
1897 run_child(name, handler, current.clone(), config.child_timeout_ms)
1898 .await;
1899 total_ms += ms;
1900 match result {
1901 Ok(new_state) => {
1902 current = new_state;
1903 succeeded = true;
1904 if attempt > 1 {
1905 tracing::info!(
1906 "Supervisor '{}': child '{}' succeeded on attempt {attempt}",
1907 config.name, name
1908 );
1909 }
1910 break;
1911 }
1912 Err(e) => {
1913 last_err = e.to_string();
1914 if attempt < max_attempts {
1915 tracing::warn!(
1916 "Supervisor '{}': child '{}' failed (attempt {attempt}/{max_attempts}): {e}",
1917 config.name, name
1918 );
1919 }
1920 }
1921 }
1922 }
1923
1924 if succeeded {
1925 child_results.push(json!({
1926 "name": name,
1927 "success": true,
1928 "duration_ms": total_ms,
1929 "attempts": (total_ms > 0) as u8, // always 1+ if here
1930 }));
1931 } else {
1932 child_results.push(json!({
1933 "name": name,
1934 "success": false,
1935 "error": last_err,
1936 "duration_ms": total_ms,
1937 "attempts": max_attempts,
1938 }));
1939 errors.push(format!("'{}': {}", name, last_err));
1940 if config.fail_fast {
1941 tracing::warn!(
1942 "Supervisor '{}' fail_fast: stopped after '{}' failed after {max_attempts} attempt(s)",
1943 config.name, name
1944 );
1945 break;
1946 }
1947 }
1948 }
1949
1950 current.set(
1951 format!("__supervisor_meta__{}", config.name),
1952 json!({
1953 "strategy": "sequential",
1954 "children": child_results,
1955 "errors": errors,
1956 "success": errors.is_empty(),
1957 "duration_ms": start.elapsed().as_millis(),
1958 }),
1959 );
1960 tracing::info!(
1961 "Supervisor '{}' sequential done in {}ms, errors={}",
1962 config.name,
1963 start.elapsed().as_millis(),
1964 errors.len()
1965 );
1966 Ok(current)
1967 }
1968
1969 // ── Parallel ────────────────────────────────────────────────
1970 OrchestrationStrategy::Parallel => {
1971 let base_state = Arc::new(state);
1972 // Semaphore limits how many children run at the same time.
1973 // All futures are spawned immediately; only `max_concurrent` can hold a permit.
1974 let semaphore = config
1975 .max_concurrent
1976 .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
1977
1978 // Filter children by skip conditions, cloning so futures own their data
1979 let active_children: Vec<(String, ArcHandler<SharedState>)> = children
1980 .iter()
1981 .filter(|(name, _)| {
1982 if let Some(cond) = config.skip_conditions.get(name.as_str()) {
1983 if should_skip(cond, &base_state) {
1984 tracing::info!(
1985 "Supervisor '{}': skipping '{}' in parallel (condition: {cond})",
1986 config.name, name
1987 );
1988 return false;
1989 }
1990 }
1991 true
1992 })
1993 .map(|(n, h)| (n.clone(), h.clone()))
1994 .collect();
1995
1996 let futures: Vec<_> = active_children
1997 .into_iter()
1998 .map(|(name, handler)| {
1999 let state_copy = base_state.deep_clone();
2000 inject_mcps(&name, &state_copy);
2001 let timeout_ms = config.child_timeout_ms;
2002 let max_attempts = config.max_retries_per_child + 1;
2003 let sem = semaphore.clone();
2004 async move {
2005 // Acquire semaphore permit — blocks until a slot is free
2006 let _permit = if let Some(ref sem) = sem {
2007 Some(sem.acquire().await.expect("semaphore closed"))
2008 } else {
2009 None
2010 };
2011
2012 let mut total_ms = 0u128;
2013 let mut last_err = String::new();
2014 for attempt in 1..=max_attempts {
2015 let (result, ms) = run_child(&name, &handler, state_copy.clone(), timeout_ms).await;
2016 total_ms += ms;
2017 match result {
2018 Ok(s) => {
2019 if attempt > 1 {
2020 tracing::info!(
2021 "Supervisor parallel: '{}' succeeded on attempt {attempt}",
2022 name
2023 );
2024 }
2025 return (name, Ok(s), total_ms, attempt);
2026 }
2027 Err(e) => {
2028 last_err = e.to_string();
2029 if attempt < max_attempts {
2030 tracing::warn!(
2031 "Supervisor parallel: '{}' failed attempt {attempt}/{max_attempts}: {e}",
2032 name
2033 );
2034 }
2035 }
2036 }
2037 }
2038 (name, Err(FlowgentraError::ExecutionError(last_err)), total_ms, max_attempts)
2039 }
2040 })
2041 .collect();
2042
2043 let results = futures::future::join_all(futures).await;
2044
2045 let mut successes: Vec<SharedState> = Vec::new();
2046 let mut child_results = Vec::new();
2047 let mut errors: Vec<String> = Vec::new();
2048
2049 for (name, result, duration_ms, attempts) in results {
2050 match result {
2051 Ok(s) => {
2052 child_results.push(json!({
2053 "name": name, "success": true,
2054 "duration_ms": duration_ms, "attempts": attempts,
2055 }));
2056 successes.push(s);
2057 }
2058 Err(e) => {
2059 child_results.push(json!({
2060 "name": name, "success": false,
2061 "error": e.to_string(),
2062 "duration_ms": duration_ms, "attempts": attempts,
2063 }));
2064 errors.push(format!("'{}': {}", name, e));
2065 }
2066 }
2067 }
2068
2069 // ── Merge ───────────────────────────────────────────────
2070 let final_state = match &config.merge_strategy {
2071 ParallelMergeStrategy::Latest => successes
2072 .into_iter()
2073 .last()
2074 .unwrap_or_else(|| (*base_state).clone()),
2075
2076 ParallelMergeStrategy::FirstSuccess => successes
2077 .into_iter()
2078 .next()
2079 .unwrap_or_else(|| (*base_state).clone()),
2080
2081 ParallelMergeStrategy::DeepMerge => {
2082 // Start from a fresh clone of base state, then overlay
2083 // only keys that each child actually changed.
2084 // Each child starts with a deep_clone of base_state, so we
2085 // compare each child's values against the base to detect
2086 // real modifications — avoiding null overwrites from
2087 // schema-initialized keys the child never touched.
2088 let base_snapshot: Vec<(String, serde_json::Value)> =
2089 base_state.iter_map();
2090 let merged = (*base_state).clone();
2091 for child_state in successes {
2092 for (key, value) in child_state.iter_map() {
2093 // Skip internal supervisor/planner metadata keys
2094 if key.starts_with("__supervisor_meta__")
2095 || key.starts_with("__eval_")
2096 || key.starts_with("_next_node")
2097 {
2098 continue;
2099 }
2100 // Only merge if the child actually changed this key
2101 let changed =
2102 match base_snapshot.iter().find(|(k, _)| k == &key) {
2103 Some((_, base_val)) => value != *base_val,
2104 None => true, // new key not in base — always merge
2105 };
2106 if changed {
2107 merged.set(key, value);
2108 }
2109 }
2110 }
2111 merged
2112 }
2113
2114 ParallelMergeStrategy::Custom(_) => successes
2115 .into_iter()
2116 .last()
2117 .unwrap_or_else(|| (*base_state).clone()),
2118 };
2119
2120 final_state.set(
2121 format!("__supervisor_meta__{}", config.name),
2122 json!({
2123 "strategy": "parallel",
2124 "children": child_results,
2125 "errors": errors,
2126 "success": errors.is_empty(),
2127 "duration_ms": start.elapsed().as_millis(),
2128 }),
2129 );
2130 tracing::info!(
2131 "Supervisor '{}' parallel done in {}ms, errors={}",
2132 config.name,
2133 start.elapsed().as_millis(),
2134 errors.len()
2135 );
2136 Ok(final_state)
2137 }
2138
2139 // ── Autonomous ──────────────────────────────────────────────
2140 // The supervisor loops, calling whichever agent owns each missing
2141 // required output, until all are present or max_iterations is hit.
2142 OrchestrationStrategy::Autonomous => {
2143 let mut current = state;
2144
2145 if let Some(goal) = &config.goal {
2146 tracing::info!(
2147 "Supervisor '{}' autonomous start, goal: {}",
2148 config.name,
2149 goal
2150 );
2151 }
2152
2153 // Build O(1) lookup: child name → handler
2154 let child_map: HashMap<String, ArcHandler<SharedState>> = children
2155 .iter()
2156 .map(|(n, h)| (n.clone(), h.clone()))
2157 .collect();
2158
2159 let max_iter = if config.max_iterations == 0 {
2160 10
2161 } else {
2162 config.max_iterations
2163 };
2164 let mut iteration_log = Vec::new();
2165
2166 'outer: for iteration in 1..=max_iter {
2167 // Which required outputs are still missing?
2168 let missing: Vec<String> = config
2169 .required_outputs
2170 .iter()
2171 .filter(|key| {
2172 current
2173 .get(key.as_str())
2174 .map(|v| v.is_null())
2175 .unwrap_or(true)
2176 })
2177 .cloned()
2178 .collect();
2179
2180 if missing.is_empty() {
2181 tracing::info!(
2182 "Supervisor '{}' autonomous: all outputs satisfied after {} iterations",
2183 config.name, iteration - 1
2184 );
2185 break;
2186 }
2187
2188 tracing::info!(
2189 "Supervisor '{}' autonomous iteration {}/{}: missing {:?}",
2190 config.name,
2191 iteration,
2192 max_iter,
2193 missing
2194 );
2195
2196 // Collect agents responsible for the missing outputs (deduplicated)
2197 let mut agents_to_call: Vec<String> = Vec::new();
2198 for key in &missing {
2199 if let Some(owner) = config.output_owners.get(key.as_str()) {
2200 if !agents_to_call.contains(owner) {
2201 agents_to_call.push(owner.clone());
2202 }
2203 }
2204 }
2205
2206 if agents_to_call.is_empty() {
2207 tracing::warn!(
2208 "Supervisor '{}' autonomous: no owners for missing {:?}, stopping",
2209 config.name,
2210 missing
2211 );
2212 break;
2213 }
2214
2215 let mut iter_child_results = Vec::new();
2216 for agent_name in &agents_to_call {
2217 if let Some(handler) = child_map.get(agent_name.as_str()) {
2218 inject_mcps(agent_name, ¤t);
2219 let max_attempts = config.max_retries_per_child + 1;
2220 let mut succeeded = false;
2221 let mut total_ms = 0u128;
2222 let mut last_err = String::new();
2223
2224 for attempt in 1..=max_attempts {
2225 let (result, ms) = run_child(
2226 agent_name,
2227 handler,
2228 current.clone(),
2229 config.child_timeout_ms,
2230 )
2231 .await;
2232 total_ms += ms;
2233 match result {
2234 Ok(new_state) => {
2235 current = new_state;
2236 succeeded = true;
2237 break;
2238 }
2239 Err(e) => {
2240 last_err = e.to_string();
2241 if attempt < max_attempts {
2242 tracing::warn!(
2243 "Supervisor '{}' autonomous: '{}' failed attempt {}/{}: {}",
2244 config.name, agent_name, attempt, max_attempts, e
2245 );
2246 }
2247 }
2248 }
2249 }
2250
2251 iter_child_results.push(json!({
2252 "name": agent_name,
2253 "success": succeeded,
2254 "duration_ms": total_ms,
2255 "error": if succeeded { serde_json::Value::Null } else { json!(last_err) },
2256 }));
2257
2258 if !succeeded && config.fail_fast {
2259 break 'outer;
2260 }
2261 } else {
2262 tracing::warn!(
2263 "Supervisor '{}' autonomous: agent '{}' not found",
2264 config.name,
2265 agent_name
2266 );
2267 }
2268 }
2269
2270 iteration_log.push(json!({
2271 "iteration": iteration,
2272 "missing_before": missing,
2273 "agents_called": agents_to_call,
2274 "results": iter_child_results,
2275 }));
2276 }
2277
2278 // Final completeness check
2279 let final_missing: Vec<String> = config
2280 .required_outputs
2281 .iter()
2282 .filter(|key| {
2283 current
2284 .get(key.as_str())
2285 .map(|v| v.is_null())
2286 .unwrap_or(true)
2287 })
2288 .cloned()
2289 .collect();
2290
2291 let success = final_missing.is_empty();
2292 current.set(
2293 format!("__supervisor_meta__{}", config.name),
2294 json!({
2295 "strategy": "autonomous",
2296 "goal": config.goal,
2297 "required_outputs": config.required_outputs,
2298 "iterations": iteration_log,
2299 "missing_outputs": final_missing,
2300 "success": success,
2301 "duration_ms": start.elapsed().as_millis(),
2302 }),
2303 );
2304 tracing::info!(
2305 "Supervisor '{}' autonomous done in {}ms, success={}",
2306 config.name,
2307 start.elapsed().as_millis(),
2308 success
2309 );
2310 Ok(current)
2311 }
2312
2313 // ── Dynamic (LLM-driven) ────────────────────────────────────
2314 // The supervisor asks an LLM which agents to call, in what order.
2315 OrchestrationStrategy::Dynamic => {
2316 let mut current = state;
2317
2318 let child_map: HashMap<String, ArcHandler<SharedState>> = children
2319 .iter()
2320 .map(|(n, h)| (n.clone(), h.clone()))
2321 .collect();
2322
2323 let child_names: Vec<String> =
2324 children.iter().map(|(n, _)| n.clone()).collect();
2325 let max_iter = if config.max_iterations == 0 {
2326 10
2327 } else {
2328 config.max_iterations
2329 };
2330 let mut iteration_log = Vec::new();
2331
2332 let llm = llm_client.clone();
2333
2334 'dynamic_outer: for iteration in 1..=max_iter {
2335 println!("\n ── Dynamic iteration {iteration}/{max_iter} ──");
2336
2337 // Build a filtered view: separate populated vs null keys, hide internals
2338 let all_keys: Vec<String> = current.keys().collect();
2339 let populated_keys: Vec<&String> = all_keys
2340 .iter()
2341 .filter(|k| !k.starts_with('_'))
2342 .filter(|k| {
2343 current
2344 .get(k.as_str())
2345 .map(|v| !v.is_null())
2346 .unwrap_or(false)
2347 })
2348 .collect();
2349 let null_keys: Vec<&String> = all_keys
2350 .iter()
2351 .filter(|k| !k.starts_with('_'))
2352 .filter(|k| {
2353 current.get(k.as_str()).map(|v| v.is_null()).unwrap_or(true)
2354 })
2355 .collect();
2356
2357 println!(" [dynamic] Completed: {populated_keys:?}");
2358 println!(" [dynamic] Missing: {null_keys:?}");
2359
2360 // Ask LLM which agents to call
2361 let agents_to_call = if let Some(ref llm) = llm {
2362 let system_prompt = config.selector_prompt.as_deref().unwrap_or(
2363 "You pick which agents to run. Reply with ONLY a JSON array of agent name strings. \
2364 Example: [\"agent_a\", \"agent_b\"]. If no agents needed, reply []."
2365 );
2366 let user_prompt = if null_keys.is_empty() {
2367 "All outputs are complete. Reply with [].".to_string()
2368 } else {
2369 format!(
2370 "Goal: {}\n\
2371 Agents: {:?}\n\
2372 Done: {:?}\n\
2373 Still needed: {:?}\n\
2374 Pick the agents to run next. Reply with ONLY a JSON array.",
2375 config.goal.as_deref().unwrap_or("Complete the task"),
2376 child_names,
2377 populated_keys,
2378 null_keys,
2379 )
2380 };
2381
2382 println!(" [dynamic] Asking LLM...");
2383
2384 let messages = vec![
2385 crate::core::llm::Message::system(system_prompt),
2386 crate::core::llm::Message::user(user_prompt),
2387 ];
2388
2389 match llm.chat(messages).await {
2390 Ok(response) => {
2391 println!(
2392 " [dynamic] LLM raw response: {:?}",
2393 response.content
2394 );
2395 let content = response.content.trim();
2396 // Try to extract JSON array from the response
2397 let json_str = if let Some(start) = content.find('[') {
2398 if let Some(end) = content.rfind(']') {
2399 &content[start..=end]
2400 } else {
2401 content
2402 }
2403 } else {
2404 content
2405 };
2406 let parsed = serde_json::from_str::<Vec<String>>(json_str)
2407 .unwrap_or_default();
2408 if parsed.is_empty() && !content.contains("[]") {
2409 println!(" [dynamic] WARNING: could not parse LLM response as JSON array");
2410 }
2411 parsed
2412 }
2413 Err(e) => {
2414 println!(" [dynamic] LLM error: {e}");
2415 child_names.clone()
2416 }
2417 }
2418 } else {
2419 // No LLM — fall back to output_owners
2420 let missing: Vec<String> = config
2421 .required_outputs
2422 .iter()
2423 .filter(|key| {
2424 current
2425 .get(key.as_str())
2426 .map(|v| v.is_null())
2427 .unwrap_or(true)
2428 })
2429 .cloned()
2430 .collect();
2431
2432 if missing.is_empty() {
2433 break;
2434 }
2435
2436 let mut agents = Vec::new();
2437 for key in &missing {
2438 if let Some(owner) = config.output_owners.get(key.as_str()) {
2439 if !agents.contains(owner) {
2440 agents.push(owner.clone());
2441 }
2442 }
2443 }
2444 if agents.is_empty() {
2445 break;
2446 }
2447 agents
2448 };
2449
2450 // If LLM returned empty but outputs still missing, use fallback
2451 if agents_to_call.is_empty() {
2452 let still_missing: Vec<String> = config
2453 .required_outputs
2454 .iter()
2455 .filter(|key| {
2456 current
2457 .get(key.as_str())
2458 .map(|v| v.is_null())
2459 .unwrap_or(true)
2460 })
2461 .cloned()
2462 .collect();
2463
2464 if still_missing.is_empty() {
2465 println!(" [dynamic] All required outputs present → done");
2466 break;
2467 }
2468
2469 println!(" [dynamic] LLM returned [] but still missing: {still_missing:?}, using fallback");
2470 let mut fallback = Vec::new();
2471 for key in &still_missing {
2472 if let Some(owner) = config.output_owners.get(key.as_str()) {
2473 if !fallback.contains(owner)
2474 && child_map.contains_key(owner.as_str())
2475 {
2476 fallback.push(owner.clone());
2477 }
2478 }
2479 }
2480 if fallback.is_empty() {
2481 println!(" [dynamic] No fallback agents, stopping");
2482 break;
2483 }
2484
2485 // Run fallback agents
2486 let mut iter_results = Vec::new();
2487 for agent_name in &fallback {
2488 if let Some(handler) = child_map.get(agent_name.as_str()) {
2489 inject_mcps(agent_name, ¤t);
2490 println!(" [dynamic] Running {agent_name}...");
2491 let (result, ms) = run_child(
2492 agent_name,
2493 handler,
2494 current.clone(),
2495 config.child_timeout_ms,
2496 )
2497 .await;
2498 match result {
2499 Ok(new_state) => {
2500 current = new_state;
2501 println!(" [dynamic] {agent_name} ✓ ({ms}ms)");
2502 iter_results.push(json!({
2503 "name": agent_name, "success": true, "duration_ms": ms,
2504 }));
2505 }
2506 Err(e) => {
2507 println!(" [dynamic] {agent_name} ✗ ({ms}ms): {e}");
2508 iter_results.push(json!({
2509 "name": agent_name, "success": false,
2510 "error": e.to_string(), "duration_ms": ms,
2511 }));
2512 if config.fail_fast {
2513 break 'dynamic_outer;
2514 }
2515 }
2516 }
2517 }
2518 }
2519 iteration_log.push(json!({
2520 "iteration": iteration,
2521 "agents_called": fallback,
2522 "results": iter_results,
2523 }));
2524 continue;
2525 }
2526
2527 println!(" [dynamic] Calling: {agents_to_call:?}");
2528
2529 let mut iter_results = Vec::new();
2530 for agent_name in &agents_to_call {
2531 if let Some(handler) = child_map.get(agent_name.as_str()) {
2532 inject_mcps(agent_name, ¤t);
2533 println!(" [dynamic] Running {agent_name}...");
2534 let (result, ms) = run_child(
2535 agent_name,
2536 handler,
2537 current.clone(),
2538 config.child_timeout_ms,
2539 )
2540 .await;
2541 match result {
2542 Ok(new_state) => {
2543 current = new_state;
2544 println!(" [dynamic] {agent_name} ✓ ({ms}ms)");
2545 iter_results.push(json!({
2546 "name": agent_name, "success": true, "duration_ms": ms,
2547 }));
2548 }
2549 Err(e) => {
2550 println!(" [dynamic] {agent_name} ✗ ({ms}ms): {e}");
2551 iter_results.push(json!({
2552 "name": agent_name, "success": false,
2553 "error": e.to_string(), "duration_ms": ms,
2554 }));
2555 if config.fail_fast {
2556 break 'dynamic_outer;
2557 }
2558 }
2559 }
2560 } else {
2561 println!(" [dynamic] WARNING: agent '{agent_name}' not found");
2562 }
2563 }
2564
2565 iteration_log.push(json!({
2566 "iteration": iteration,
2567 "agents_called": agents_to_call,
2568 "results": iter_results,
2569 }));
2570 }
2571
2572 current.set(
2573 format!("__supervisor_meta__{}", config.name),
2574 json!({
2575 "strategy": "dynamic",
2576 "goal": config.goal,
2577 "iterations": iteration_log,
2578 "duration_ms": start.elapsed().as_millis(),
2579 }),
2580 );
2581 Ok(current)
2582 }
2583
2584 // ── RoundRobin ─────────────────────────────────────────────────
2585 // Tasks from a state array are distributed across agents in rotation.
2586 OrchestrationStrategy::RoundRobin => {
2587 let tasks_key = config.tasks_key.as_deref().unwrap_or("tasks");
2588 let tasks: Vec<serde_json::Value> = state
2589 .get(tasks_key)
2590 .and_then(|v| v.as_array().cloned())
2591 .unwrap_or_default();
2592
2593 if tasks.is_empty() {
2594 tracing::warn!(
2595 "Supervisor '{}' round_robin: no tasks at key '{}'",
2596 config.name,
2597 tasks_key
2598 );
2599 state.set(
2600 format!("__supervisor_meta__{}", config.name),
2601 json!({"strategy": "round_robin", "tasks": 0, "success": true}),
2602 );
2603 return Ok(state);
2604 }
2605
2606 let mut current = state;
2607 let num_children = children.len();
2608 let mut all_results: Vec<serde_json::Value> = Vec::new();
2609 let mut child_results = Vec::new();
2610 let mut errors: Vec<String> = Vec::new();
2611
2612 for (i, task) in tasks.iter().enumerate() {
2613 let child_idx = i % num_children;
2614 let (name, handler) = &children[child_idx];
2615
2616 current.set("__current_task__".to_string(), task.clone());
2617 current.set("__task_index__".to_string(), json!(i));
2618 inject_mcps(name, ¤t);
2619
2620 let (result, ms) =
2621 run_child(name, handler, current.clone(), config.child_timeout_ms)
2622 .await;
2623 match result {
2624 Ok(new_state) => {
2625 if let Some(r) = new_state.get("__task_result__") {
2626 all_results.push(r);
2627 }
2628 current = new_state;
2629 child_results.push(json!({
2630 "task_index": i, "agent": name, "success": true, "duration_ms": ms,
2631 }));
2632 }
2633 Err(e) => {
2634 errors.push(format!("Task {}: {}", i, e));
2635 child_results.push(json!({
2636 "task_index": i, "agent": name, "success": false,
2637 "error": e.to_string(), "duration_ms": ms,
2638 }));
2639 if config.fail_fast {
2640 break;
2641 }
2642 }
2643 }
2644 }
2645
2646 current.set("__round_robin_results__".to_string(), json!(all_results));
2647 current.set(
2648 format!("__supervisor_meta__{}", config.name),
2649 json!({
2650 "strategy": "round_robin",
2651 "tasks": tasks.len(),
2652 "children": child_results,
2653 "errors": errors,
2654 "success": errors.is_empty(),
2655 "duration_ms": start.elapsed().as_millis(),
2656 }),
2657 );
2658 Ok(current)
2659 }
2660
2661 // ── Hierarchical ───────────────────────────────────────────────
2662 // Delegates to sub-supervisors. Each child is expected to be a
2663 // supervisor or subgraph managing its own agent group.
2664 OrchestrationStrategy::Hierarchical => {
2665 let mut current = state;
2666 let mut child_results = Vec::new();
2667 let mut errors: Vec<String> = Vec::new();
2668
2669 for (name, handler) in &children {
2670 tracing::info!(
2671 "Supervisor '{}' hierarchical: delegating to sub-supervisor '{}'",
2672 config.name,
2673 name
2674 );
2675
2676 // Skip condition
2677 if let Some(cond) = config.skip_conditions.get(name.as_str()) {
2678 if should_skip(cond, ¤t) {
2679 child_results.push(json!({"name": name, "skipped": true}));
2680 continue;
2681 }
2682 }
2683
2684 inject_mcps(name, ¤t);
2685 let max_attempts = config.max_retries_per_child + 1;
2686 let mut succeeded = false;
2687 let mut total_ms = 0u128;
2688 let mut last_err = String::new();
2689
2690 for attempt in 1..=max_attempts {
2691 let (result, ms) =
2692 run_child(name, handler, current.clone(), config.child_timeout_ms)
2693 .await;
2694 total_ms += ms;
2695 match result {
2696 Ok(new_state) => {
2697 current = new_state;
2698 succeeded = true;
2699 break;
2700 }
2701 Err(e) => {
2702 last_err = e.to_string();
2703 if attempt < max_attempts {
2704 tracing::warn!(
2705 "Supervisor '{}' hierarchical: sub-supervisor '{}' failed attempt {}/{}",
2706 config.name, name, attempt, max_attempts
2707 );
2708 }
2709 }
2710 }
2711 }
2712
2713 child_results.push(json!({
2714 "name": name, "success": succeeded,
2715 "duration_ms": total_ms,
2716 "error": if succeeded { serde_json::Value::Null } else { json!(last_err) },
2717 }));
2718
2719 if !succeeded {
2720 errors.push(format!("Sub-supervisor '{}': {}", name, last_err));
2721 if config.fail_fast {
2722 break;
2723 }
2724 }
2725 }
2726
2727 current.set(
2728 format!("__supervisor_meta__{}", config.name),
2729 json!({
2730 "strategy": "hierarchical",
2731 "children": child_results,
2732 "errors": errors,
2733 "success": errors.is_empty(),
2734 "duration_ms": start.elapsed().as_millis(),
2735 }),
2736 );
2737 Ok(current)
2738 }
2739
2740 // ── Broadcast (Fan-out) ────────────────────────────────────────
2741 // Same task sent to all agents; best result selected.
2742 // Each child gets a deep_clone so writes are isolated.
2743 OrchestrationStrategy::Broadcast => {
2744 let base_state = Arc::new(state);
2745 let semaphore = config
2746 .max_concurrent
2747 .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
2748
2749 let futures: Vec<_> = children
2750 .iter()
2751 .map(|(name, handler)| {
2752 let state_copy = base_state.deep_clone();
2753 inject_mcps(name, &state_copy);
2754 let timeout_ms = config.child_timeout_ms;
2755 let sem = semaphore.clone();
2756 let name = name.clone();
2757 let handler = handler.clone();
2758 async move {
2759 let _permit = if let Some(ref sem) = sem {
2760 Some(sem.acquire().await.expect("semaphore closed"))
2761 } else {
2762 None
2763 };
2764 let (result, ms) =
2765 run_child(&name, &handler, state_copy, timeout_ms).await;
2766 (name, result, ms)
2767 }
2768 })
2769 .collect();
2770
2771 let results = futures::future::join_all(futures).await;
2772
2773 let score_key = config.score_key.as_deref().unwrap_or("__score__");
2774 let criteria = config
2775 .selection_criteria
2776 .as_deref()
2777 .unwrap_or("first_success");
2778
2779 let mut successes: Vec<(String, SharedState, u128)> = Vec::new();
2780 let mut child_results = Vec::new();
2781 let mut errors: Vec<String> = Vec::new();
2782
2783 for (name, result, ms) in results {
2784 match result {
2785 Ok(s) => {
2786 child_results.push(json!({
2787 "name": name, "success": true, "duration_ms": ms,
2788 }));
2789 successes.push((name, s, ms));
2790 }
2791 Err(e) => {
2792 child_results.push(json!({
2793 "name": name, "success": false,
2794 "error": e.to_string(), "duration_ms": ms,
2795 }));
2796 errors.push(format!("'{}': {}", name, e));
2797 }
2798 }
2799 }
2800
2801 if successes.is_empty() {
2802 let final_state = (*base_state).clone();
2803 final_state.set(
2804 format!("__supervisor_meta__{}", config.name),
2805 json!({
2806 "strategy": "broadcast",
2807 "children": child_results,
2808 "errors": errors,
2809 "success": false,
2810 "duration_ms": start.elapsed().as_millis(),
2811 }),
2812 );
2813 return Ok(final_state);
2814 }
2815
2816 let (winner_name, winner_state) = match criteria {
2817 "highest_score" => {
2818 let best = successes.into_iter().max_by(|(_, a, _), (_, b, _)| {
2819 let sa = a.get(score_key).and_then(|v| v.as_f64()).unwrap_or(0.0);
2820 let sb = b.get(score_key).and_then(|v| v.as_f64()).unwrap_or(0.0);
2821 sa.partial_cmp(&sb).unwrap_or(std::cmp::Ordering::Equal)
2822 });
2823 best.map(|(n, s, _)| (n, s))
2824 .unwrap_or_else(|| ("unknown".to_string(), (*base_state).clone()))
2825 }
2826 _ => {
2827 // first_success
2828 successes
2829 .into_iter()
2830 .next()
2831 .map(|(n, s, _)| (n, s))
2832 .unwrap_or_else(|| ("unknown".to_string(), (*base_state).clone()))
2833 }
2834 };
2835
2836 winner_state.set(
2837 format!("__supervisor_meta__{}", config.name),
2838 json!({
2839 "strategy": "broadcast",
2840 "winner": winner_name,
2841 "selection_criteria": criteria,
2842 "children": child_results,
2843 "success": true,
2844 "duration_ms": start.elapsed().as_millis(),
2845 }),
2846 );
2847 Ok(winner_state)
2848 }
2849
2850 // ── MapReduce ──────────────────────────────────────────────────
2851 // Input split into chunks, each processed by a child in parallel, then merged.
2852 OrchestrationStrategy::MapReduce => {
2853 let map_key = config.map_key.as_deref().unwrap_or("input_chunks");
2854 let reduce_key = config.reduce_key.as_deref().unwrap_or("reduced_output");
2855
2856 let chunks: Vec<serde_json::Value> = state
2857 .get(map_key)
2858 .and_then(|v| v.as_array().cloned())
2859 .unwrap_or_default();
2860
2861 if chunks.is_empty() {
2862 tracing::warn!(
2863 "Supervisor '{}' map_reduce: no data at key '{}'",
2864 config.name,
2865 map_key
2866 );
2867 state.set(
2868 format!("__supervisor_meta__{}", config.name),
2869 json!({"strategy": "map_reduce", "chunks": 0, "success": true}),
2870 );
2871 state.set(reduce_key.to_string(), json!([]));
2872 return Ok(state);
2873 }
2874
2875 let base_state = Arc::new(state);
2876 let num_children = children.len();
2877 let semaphore = config
2878 .max_concurrent
2879 .map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
2880
2881 // Map phase: distribute chunks across children
2882 let futures: Vec<_> = chunks
2883 .iter()
2884 .enumerate()
2885 .map(|(i, chunk)| {
2886 let child_idx = i % num_children;
2887 let (name, handler) = children[child_idx].clone();
2888 let chunk_state = base_state.deep_clone();
2889 chunk_state.set("__map_chunk__".to_string(), chunk.clone());
2890 chunk_state.set("__chunk_index__".to_string(), json!(i));
2891 let timeout_ms = config.child_timeout_ms;
2892 let sem = semaphore.clone();
2893 async move {
2894 let _permit = if let Some(ref sem) = sem {
2895 Some(sem.acquire().await.expect("semaphore closed"))
2896 } else {
2897 None
2898 };
2899 let (result, ms) =
2900 run_child(&name, &handler, chunk_state, timeout_ms).await;
2901 (i, name, result, ms)
2902 }
2903 })
2904 .collect();
2905
2906 let results = futures::future::join_all(futures).await;
2907
2908 // Reduce phase: collect results in order
2909 let mut reduced: Vec<serde_json::Value> = vec![json!(null); chunks.len()];
2910 let mut child_results = Vec::new();
2911 let mut errors: Vec<String> = Vec::new();
2912
2913 for (i, name, result, ms) in results {
2914 match result {
2915 Ok(s) => {
2916 let chunk_result = s.get("__map_result__").unwrap_or(json!(null));
2917 reduced[i] = chunk_result;
2918 child_results.push(json!({
2919 "chunk_index": i, "agent": name, "success": true, "duration_ms": ms,
2920 }));
2921 }
2922 Err(e) => {
2923 errors.push(format!("Chunk {}: {}", i, e));
2924 child_results.push(json!({
2925 "chunk_index": i, "agent": name, "success": false,
2926 "error": e.to_string(), "duration_ms": ms,
2927 }));
2928 }
2929 }
2930 }
2931
2932 let final_state = (*base_state).clone();
2933 final_state.set(reduce_key.to_string(), json!(reduced));
2934 final_state.set(
2935 format!("__supervisor_meta__{}", config.name),
2936 json!({
2937 "strategy": "map_reduce",
2938 "chunks": chunks.len(),
2939 "children": child_results,
2940 "errors": errors,
2941 "success": errors.is_empty(),
2942 "duration_ms": start.elapsed().as_millis(),
2943 }),
2944 );
2945 Ok(final_state)
2946 }
2947
2948 // ── ConditionalRouting ─────────────────────────────────────────
2949 // Routes to the most appropriate agent based on state-driven rules.
2950 OrchestrationStrategy::ConditionalRouting => {
2951 let child_map: HashMap<String, ArcHandler<SharedState>> = children
2952 .iter()
2953 .map(|(n, h)| (n.clone(), h.clone()))
2954 .collect();
2955
2956 let mut selected_name: Option<String> = None;
2957
2958 for (condition, child_name) in &config.routing_rules {
2959 if should_skip(condition, &state) {
2960 // Condition is truthy → route to this agent
2961 if child_map.contains_key(child_name.as_str()) {
2962 selected_name = Some(child_name.clone());
2963 break;
2964 }
2965 }
2966 }
2967
2968 // Default to first child
2969 let agent_name = selected_name.unwrap_or_else(|| {
2970 let default = children.first().map(|(n, _)| n.clone()).unwrap_or_default();
2971 tracing::info!(
2972 "Supervisor '{}' conditional_routing: no rule matched, using default '{}'",
2973 config.name, default
2974 );
2975 default
2976 });
2977
2978 if let Some(handler) = child_map.get(&agent_name) {
2979 let (result, ms) =
2980 run_child(&agent_name, handler, state, config.child_timeout_ms).await;
2981 match result {
2982 Ok(final_state) => {
2983 final_state.set(
2984 format!("__supervisor_meta__{}", config.name),
2985 json!({
2986 "strategy": "conditional_routing",
2987 "routed_to": agent_name,
2988 "duration_ms": ms,
2989 "success": true,
2990 }),
2991 );
2992 Ok(final_state)
2993 }
2994 Err(e) => Err(e),
2995 }
2996 } else {
2997 Err(FlowgentraError::ConfigError(format!(
2998 "Supervisor '{}': conditional routing target '{}' not found",
2999 config.name, agent_name
3000 )))
3001 }
3002 }
3003
3004 // ── RetryFallback ──────────────────────────────────────────────
3005 // Agents tried in order until one succeeds.
3006 OrchestrationStrategy::RetryFallback => {
3007 let order: Vec<String> = if !config.fallback_order.is_empty() {
3008 config.fallback_order.clone()
3009 } else {
3010 children.iter().map(|(n, _)| n.clone()).collect()
3011 };
3012
3013 let child_map: HashMap<String, ArcHandler<SharedState>> = children
3014 .iter()
3015 .map(|(n, h)| (n.clone(), h.clone()))
3016 .collect();
3017
3018 let mut child_results = Vec::new();
3019 let mut last_error = String::new();
3020
3021 for agent_name in &order {
3022 if let Some(handler) = child_map.get(agent_name.as_str()) {
3023 let max_attempts = config.max_retries_per_child + 1;
3024 let mut total_ms = 0u128;
3025
3026 for attempt in 1..=max_attempts {
3027 let (result, ms) = run_child(
3028 agent_name,
3029 handler,
3030 state.clone(),
3031 config.child_timeout_ms,
3032 )
3033 .await;
3034 total_ms += ms;
3035
3036 match result {
3037 Ok(final_state) => {
3038 child_results.push(json!({
3039 "name": agent_name, "success": true,
3040 "duration_ms": total_ms, "attempts": attempt,
3041 }));
3042 final_state.set(
3043 format!("__supervisor_meta__{}", config.name),
3044 json!({
3045 "strategy": "retry_fallback",
3046 "succeeded_agent": agent_name,
3047 "children": child_results,
3048 "success": true,
3049 "duration_ms": start.elapsed().as_millis(),
3050 }),
3051 );
3052 tracing::info!(
3053 "Supervisor '{}' retry_fallback: '{}' succeeded",
3054 config.name,
3055 agent_name
3056 );
3057 return Ok(final_state);
3058 }
3059 Err(e) => {
3060 last_error = e.to_string();
3061 if attempt < max_attempts {
3062 tracing::warn!(
3063 "Supervisor '{}' retry_fallback: '{}' failed attempt {}/{}",
3064 config.name, agent_name, attempt, max_attempts
3065 );
3066 }
3067 }
3068 }
3069 }
3070
3071 child_results.push(json!({
3072 "name": agent_name, "success": false,
3073 "error": last_error, "duration_ms": total_ms,
3074 "attempts": max_attempts,
3075 }));
3076 tracing::info!(
3077 "Supervisor '{}' retry_fallback: '{}' exhausted, trying next fallback",
3078 config.name, agent_name
3079 );
3080 }
3081 }
3082
3083 // All agents failed
3084 state.set(
3085 format!("__supervisor_meta__{}", config.name),
3086 json!({
3087 "strategy": "retry_fallback",
3088 "children": child_results,
3089 "success": false,
3090 "error": format!("All fallback agents failed. Last: {}", last_error),
3091 "duration_ms": start.elapsed().as_millis(),
3092 }),
3093 );
3094 Err(FlowgentraError::ExecutionError(format!(
3095 "Supervisor '{}': all fallback agents failed. Last error: {}",
3096 config.name, last_error
3097 )))
3098 }
3099
3100 // ── Debate / Critique ──────────────────────────────────────────
3101 // Agents generate responses and critique each other across rounds.
3102 OrchestrationStrategy::Debate => {
3103 let mut current = state;
3104 let rounds = if config.debate_rounds == 0 {
3105 2
3106 } else {
3107 config.debate_rounds
3108 };
3109 let mut debate_log: Vec<serde_json::Value> = Vec::new();
3110
3111 let child_map: HashMap<String, ArcHandler<SharedState>> = children
3112 .iter()
3113 .map(|(n, h)| (n.clone(), h.clone()))
3114 .collect();
3115 let child_names: Vec<String> =
3116 children.iter().map(|(n, _)| n.clone()).collect();
3117
3118 for round in 0..rounds {
3119 tracing::info!(
3120 "Supervisor '{}' debate: round {}/{}",
3121 config.name,
3122 round + 1,
3123 rounds
3124 );
3125 let mut round_responses: Vec<serde_json::Value> = Vec::new();
3126
3127 // Provide debate context to each agent
3128 if !debate_log.is_empty() {
3129 current.set("__debate_history__".to_string(), json!(debate_log));
3130 }
3131 current.set("__debate_round__".to_string(), json!(round));
3132
3133 for agent_name in &child_names {
3134 if let Some(handler) = child_map.get(agent_name.as_str()) {
3135 // Set previous round's responses so agent can critique
3136 if !round_responses.is_empty() {
3137 current.set(
3138 "__debate_current_responses__".to_string(),
3139 json!(round_responses),
3140 );
3141 }
3142
3143 let (result, ms) = run_child(
3144 agent_name,
3145 handler,
3146 current.clone(),
3147 config.child_timeout_ms,
3148 )
3149 .await;
3150
3151 match result {
3152 Ok(new_state) => {
3153 let response = new_state
3154 .get("__debate_response__")
3155 .unwrap_or(json!(null));
3156 round_responses.push(json!({
3157 "agent": agent_name,
3158 "response": response,
3159 "duration_ms": ms,
3160 }));
3161 current = new_state;
3162 }
3163 Err(e) => {
3164 round_responses.push(json!({
3165 "agent": agent_name,
3166 "error": e.to_string(),
3167 "duration_ms": ms,
3168 }));
3169 }
3170 }
3171 }
3172 }
3173
3174 current.set(
3175 "__debate_current_responses__".to_string(),
3176 json!(round_responses.clone()),
3177 );
3178 debate_log.push(json!({
3179 "round": round + 1,
3180 "responses": round_responses,
3181 }));
3182 }
3183
3184 current.set("__debate_log__".to_string(), json!(debate_log));
3185 current.set(
3186 format!("__supervisor_meta__{}", config.name),
3187 json!({
3188 "strategy": "debate",
3189 "rounds": rounds,
3190 "participants": child_names,
3191 "success": true,
3192 "duration_ms": start.elapsed().as_millis(),
3193 }),
3194 );
3195 Ok(current)
3196 }
3197
3198 OrchestrationStrategy::Custom(strategy_name) => {
3199 Err(FlowgentraError::ConfigError(format!(
3200 "Orchestrator '{}': custom strategy '{}' is not implemented.",
3201 config.name, strategy_name
3202 )))
3203 }
3204 }
3205 })
3206 })
3207}
3208
3209fn create_loop_standalone_handler(
3210 config: crate::core::node::advanced_nodes::LoopNodeConfig,
3211) -> Handler<SharedState> {
3212 use serde_json::json;
3213
3214 Box::new(move |state| {
3215 let config = config.clone();
3216 Box::pin(async move {
3217 // Use handler name as the key base (may be empty — fall back to "loop")
3218 let key_base = if config.handler.is_empty() {
3219 "loop".to_string()
3220 } else {
3221 config.handler.clone()
3222 };
3223 let iteration_key = format!("__loop_iteration__{}", key_base);
3224 let continue_key = format!("__loop_continue__{}", key_base);
3225
3226 let iteration = state
3227 .get(&iteration_key)
3228 .and_then(|v| v.as_u64())
3229 .unwrap_or(0) as usize
3230 + 1;
3231
3232 let break_now = config
3233 .break_condition
3234 .as_ref()
3235 .map(|cond| state.get(cond).and_then(|v| v.as_bool()).unwrap_or(false))
3236 .unwrap_or(false);
3237
3238 let should_continue = iteration < config.max_iterations && !break_now;
3239
3240 state.set(&iteration_key, json!(iteration));
3241 state.set(&continue_key, json!(should_continue));
3242 state.set(
3243 format!("__loop_meta__{}", key_base),
3244 json!({
3245 "iteration": iteration,
3246 "max_iterations": config.max_iterations,
3247 "should_continue": should_continue,
3248 "break_condition_met": break_now,
3249 }),
3250 );
3251
3252 tracing::info!(
3253 "Standalone loop '{}' iteration={}/{}, continue={}",
3254 key_base,
3255 iteration,
3256 config.max_iterations,
3257 should_continue
3258 );
3259
3260 Ok(state)
3261 })
3262 })
3263}
3264
3265// ── Subgraph ──────────────────────────────────────────────────────────────────
3266/// Creates a handler that loads and runs a nested agent from a YAML config file.
3267///
3268/// The subgraph receives the parent's current state, executes its own full graph,
3269/// and returns the final state back to the parent graph. This enables true
3270/// hierarchical multi-agent systems where each subgraph is an independent agent.
3271///
3272/// Handlers are auto-discovered from the shared inventory (#[register_handler] pool).
3273///
3274/// YAML:
3275/// ```yaml
3276/// - name: research_agent
3277/// type: subgraph # aliases: agent, agent_or_graph
3278/// config:
3279/// path: agents/research_agent.yaml
3280///
3281/// - name: coordinator
3282/// type: supervisor
3283/// config:
3284/// strategy: sequential
3285/// children: [research_agent, writer_agent]
3286/// ```
3287///
3288/// State: parent state flows in → subgraph executes fully → result flows back to parent
3289fn create_subgraph_handler(
3290 config: crate::core::node::agent_or_graph_node::SubgraphNodeConfig,
3291) -> Handler<SharedState> {
3292 Box::new(move |state| {
3293 let path = config.path.clone();
3294 let name = config.name.clone();
3295 Box::pin(async move {
3296 tracing::info!("SubgraphNode '{}': loading agent from '{}'", name, path);
3297
3298 // Compile the subgraph from its YAML using the shared handler inventory.
3299 // This is synchronous (YAML parse + hashmap build) — safe inside async.
3300 let mut sub_agent = from_config_path(&path).map_err(|e| {
3301 FlowgentraError::ConfigError(format!(
3302 "SubgraphNode '{}': failed to load '{}': {}",
3303 name, path, e
3304 ))
3305 })?;
3306
3307 // Inject parent state into the subgraph
3308 sub_agent.state = state;
3309
3310 // Execute the subgraph and return its final state to the parent graph
3311 let result = sub_agent.run().await?;
3312 tracing::info!("SubgraphNode '{}': completed", name);
3313 Ok(result)
3314 })
3315 })
3316}