flowgentra_ai/core/node/mod.rs
1/// NodeId type alias for node identification
2pub type NodeId = String;
3use crate::core::error::{FlowgentraError, Result};
4use crate::core::node::nodes_trait::PluggableNode;
5use crate::core::state::State;
6use std::sync::Arc;
7
8/// Factory function to create a node from config.
9///
10/// Supports: `supervisor` (`orchestrator`), `subgraph` (`agent`, `agent_or_graph`), `evaluation`.
11///
12/// # Arguments
13/// * `config` - The node configuration (typically from YAML)
14/// * `node_map` - A map of all available nodes, indexed by name
15pub fn create_node_from_config<T: State>(
16 config: &serde_json::Value,
17 node_map: &std::collections::HashMap<String, Box<dyn PluggableNode<T>>>,
18) -> Result<Box<dyn PluggableNode<T>>> {
19 let node_type = config.get("type").and_then(|v| v.as_str()).unwrap_or("");
20 match node_type {
21 // ── Supervisor (alias: orchestrator) ─────────────────────────────────
22 "supervisor" | "orchestrator" => {
23 use crate::core::node::orchestrator_node::{SupervisorNode, SupervisorNodeConfig};
24 let cfg: SupervisorNodeConfig =
25 serde_json::from_value(config.clone()).map_err(|e| {
26 FlowgentraError::ConfigError(format!(
27 "Failed to parse supervisor node config: {}",
28 e
29 ))
30 })?;
31 let node_name = &cfg.name;
32
33 let mut children: Vec<Arc<dyn PluggableNode<T>>> = Vec::new();
34 let mut missing_children = Vec::new();
35
36 for child_name in &cfg.children {
37 match node_map.get(child_name) {
38 Some(node) => {
39 let cloned = node.clone_box();
40 let arc_node: Arc<dyn PluggableNode<T>> = Arc::from(cloned);
41 children.push(arc_node);
42 }
43 None => missing_children.push(child_name.clone()),
44 }
45 }
46
47 if !missing_children.is_empty() {
48 return Err(FlowgentraError::ConfigError(format!(
49 "SupervisorNode '{}': missing children: {:?}",
50 node_name, missing_children
51 )));
52 }
53
54 Ok(Box::new(SupervisorNode::new(cfg, children)?))
55 }
56
57 // ── Subgraph (aliases: agent, agent_or_graph) ──────────────────────
58 "subgraph" | "agent" | "agent_or_graph" => {
59 use crate::core::node::agent_or_graph_node::{SubgraphNode, SubgraphNodeConfig};
60 let cfg: SubgraphNodeConfig = serde_json::from_value(config.clone()).map_err(|e| {
61 FlowgentraError::ConfigError(format!("Failed to parse subgraph node config: {}", e))
62 })?;
63 let target_name = &cfg.path;
64
65 // For the PluggableNode path, look up an already-compiled inner node
66 match node_map.get(target_name) {
67 Some(node) => {
68 let inner = node.clone_box();
69 Ok(Box::new(SubgraphNode::new(cfg, inner)?))
70 }
71 None => Err(FlowgentraError::ConfigError(format!(
72 "SubgraphNode '{}': inner node '{}' not found in node_map. \
73 (In YAML config, set 'path' to the sub-agent YAML file.)",
74 cfg.name, target_name
75 ))),
76 }
77 }
78
79 // ── Evaluation ────────────────────────────────────────────────────────
80 "evaluation" => {
81 use crate::core::node::evaluation_node::{EvaluationNode, EvaluationNodeConfig};
82 let eval_config: EvaluationNodeConfig = serde_json::from_value(config.clone())
83 .map_err(|e| {
84 FlowgentraError::ConfigError(format!(
85 "Failed to parse evaluation node config: {}",
86 e
87 ))
88 })?;
89 let handler_name = &eval_config.handler;
90 match node_map.get(handler_name) {
91 Some(inner_node) => {
92 let inner = inner_node.clone_box();
93 Ok(Box::new(EvaluationNode::new(eval_config, inner)?))
94 }
95 None => Err(FlowgentraError::ConfigError(format!(
96 "EvaluationNode '{}': handler '{}' not found in node_map.",
97 eval_config.name, handler_name
98 ))),
99 }
100 }
101
102 _ => Err(FlowgentraError::ConfigError(format!(
103 "Unknown node type: '{}'. Expected: supervisor, subgraph, evaluation \
104 (aliases: orchestrator, agent, agent_or_graph)",
105 node_type
106 ))),
107 }
108}
109
110pub mod agent_or_graph_node;
111pub mod evaluation_node;
112pub mod orchestrator_node;
113
114// ── Re-exports ────────────────────────────────────────────────────────────────
115
116// Supervisor (canonical) + backwards-compat aliases
117pub use orchestrator_node::{
118 ChildExecutionStats,
119 // supporting types
120 OrchestrationStrategy,
121 OrchestratorNode,
122 // aliases
123 OrchestratorNodeConfig,
124 ParallelAggregation,
125 ParallelMergeStrategy,
126 SupervisorNode,
127 SupervisorNodeConfig,
128};
129
130// Subgraph (canonical) + backwards-compat aliases
131pub use agent_or_graph_node::{
132 AgentOrGraphNode,
133 // aliases
134 AgentOrGraphNodeConfig,
135 // kept for existing imports
136 AgentOrGraphType,
137 SubgraphNode,
138 SubgraphNodeConfig,
139};
140
141pub use evaluation_node::{
142 default_scorer, scorer_combine, scorer_from_confidence, scorer_from_llm_grader,
143 scorer_from_node_scorer, scorer_from_sync, Attempt, EvaluationNode, EvaluationNodeConfig,
144 EvaluationResult, ExitReason, ScorerFn,
145};
146// # Node Configuration and Execution
147//
148// Nodes represent individual computational steps in your agent's workflow.
149// Each node has a handler function that processes state and produces new state.
150//
151// ## Node Concepts
152//
153// **Handlers** - The actual computation logic executed by a node.
154// Each handler receives the current state and returns updated state.
155//
156// **Conditions** - Decision points that determine which edges to take.
157// Used for branching logic based on the current state.
158//
159// **MCPs** - External tools and services available to the node.
160// Nodes can reference MCP clients to extend their capabilities.
161//
162// ## Example
163//
164// ```yaml
165// # Standard node
166// - name: process_data
167// type: handler
168// handler: my_handler
169// mcps: [web_search]
170//
171// # Orchestrator node
172// - name: orchestrate_agents
173// type: orchestrator
174// strategy: parallel
175// children: [agent1, agent2, agent3]
176//
177// # Agent or Graph node
178// - name: agent1
179// type: agent
180// target: some_agent
181// - name: subgraph1
182// type: graph
183// target: subgraph.yaml
184/// ```
185use crate::core::routing::Condition;
186use serde::{Deserialize, Deserializer, Serialize};
187use std::collections::HashMap;
188
189fn deserialize_to_one_or_many<'de, D>(deserializer: D) -> std::result::Result<Vec<String>, D::Error>
190where
191 D: Deserializer<'de>,
192{
193 #[derive(Deserialize)]
194 #[serde(untagged)]
195 enum OneOrMany {
196 One(String),
197 Many(Vec<String>),
198 }
199 match OneOrMany::deserialize(deserializer)? {
200 OneOrMany::One(s) => Ok(vec![s]),
201 OneOrMany::Many(v) => Ok(v),
202 }
203}
204
205fn serialize_to_one_or_many<S>(v: &[String], s: S) -> std::result::Result<S::Ok, S::Error>
206where
207 S: serde::Serializer,
208{
209 use serde::Serialize;
210 if v.len() == 1 {
211 v[0].serialize(s)
212 } else {
213 v.serialize(s)
214 }
215}
216
217// =============================================================================
218// Node Configuration (from YAML)
219// =============================================================================
220
221/// Configuration for a node as specified in the YAML config file.
222///
223/// This represents the node definition before it's compiled with actual handler code.
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct NodeConfig {
226 /// Unique identifier for this node
227 pub name: String,
228
229 /// Name of the handler function to execute.
230 /// Optional for node types that don't use a registered handler
231 /// (e.g., "planner", "human_in_the_loop", "memory").
232 #[serde(alias = "function", default)]
233 pub handler: String,
234
235 /// Optional node type (e.g., "standard", "evaluation", "orchestrator")
236 #[serde(rename = "type", default)]
237 pub node_type: Option<String>,
238
239 /// Optional MCP tools available to this node
240 #[serde(default)]
241 pub mcps: Vec<String>,
242
243 /// Optional execution timeout in milliseconds
244 /// If the node takes longer than this, execution will be interrupted
245 #[serde(default)]
246 pub timeout_ms: Option<u64>,
247
248 /// Optional node-specific configuration
249 /// (Useful for passing parameters to the handler)
250 #[serde(default)]
251 pub config: HashMap<String, serde_json::Value>,
252}
253
254impl NodeConfig {
255 /// Create a new node configuration with the given name and handler.
256 pub fn new(name: impl Into<String>, handler: impl Into<String>) -> Self {
257 NodeConfig {
258 name: name.into(),
259 handler: handler.into(),
260 node_type: None,
261 mcps: Vec::new(),
262 timeout_ms: None,
263 config: HashMap::new(),
264 }
265 }
266
267 /// Attach MCP tools to this node.
268 pub fn with_mcps(mut self, mcps: Vec<String>) -> Self {
269 self.mcps = mcps;
270 self
271 }
272
273 /// Set timeout for this node in milliseconds
274 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
275 self.timeout_ms = Some(timeout_ms);
276 self
277 }
278 /// Add custom configuration for this node.
279 pub fn with_config(mut self, config: HashMap<String, serde_json::Value>) -> Self {
280 self.config = config;
281 self
282 }
283
284 /// Validate the node configuration
285 ///
286 /// Checks:
287 /// - Timeout is in a reasonable range (if specified)
288 /// - Timeout is not exactly 0 (would timeout immediately)
289 ///
290 /// # Returns
291 /// Error if validation fails
292 pub fn validate(&self) -> crate::core::error::Result<()> {
293 if let Some(timeout_ms) = self.timeout_ms {
294 // Timeout of 0 doesn't make sense
295 if timeout_ms == 0 {
296 return Err(crate::core::error::FlowgentraError::ConfigError(
297 format!(
298 "Node '{}': timeout_ms cannot be 0 (would timeout immediately). Set it to a positive milliseconds value.",
299 self.name
300 ),
301 ));
302 }
303
304 // Warn if timeout is extremely large (more than 1 hour = 3,600,000ms)
305 const MAX_REASONABLE_TIMEOUT_MS: u64 = 3_600_000; // 1 hour
306 if timeout_ms > MAX_REASONABLE_TIMEOUT_MS {
307 eprintln!(
308 "⚠️ Warning: Node '{}' has timeout_ms = {} (more than 1 hour). Is this intentional?",
309 self.name, timeout_ms
310 );
311 }
312
313 // Warn if timeout is very small (less than 100ms)
314 const MIN_REASONABLE_TIMEOUT_MS: u64 = 100;
315 if timeout_ms < MIN_REASONABLE_TIMEOUT_MS {
316 eprintln!(
317 "⚠️ Warning: Node '{}' has timeout_ms = {} (less than 100ms). This might be too short for I/O operations.",
318 self.name, timeout_ms
319 );
320 }
321 }
322 Ok(())
323 }
324}
325
326// =============================================================================
327// Node Function Type
328// =============================================================================
329
330/// The function signature for node handlers.
331///
332/// A node handler is an async function that:
333/// 1. Receives the current state
334/// 2. Performs its computation (possibly using MCPs or LLM)
335/// 3. Returns the updated state or an error
336///
337/// # Example Handler
338/// ```no_run
339/// use flowgentra_ai::core::node::NodeFunction;
340/// use flowgentra_ai::core::state::SharedState;
341/// use serde_json::json;
342///
343/// fn my_handler() -> NodeFunction<SharedState> {
344/// Box::new(|state| {
345/// Box::pin(async move {
346/// // Your logic here
347/// state.set("processed", json!(true));
348/// Ok(state)
349/// })
350/// })
351/// }
352/// ```
353pub type NodeFunction<T> = Box<
354 dyn Fn(T) -> futures::future::BoxFuture<'static, crate::core::error::Result<T>> + Send + Sync,
355>;
356
357// =============================================================================
358// Compiled Node
359// =============================================================================
360
361/// A compiled node ready for execution.
362///
363/// This is created internally by the runtime when configuration is loaded.
364pub struct Node<T: State> {
365 /// Node name (identifier)
366 pub name: String,
367
368 /// The actual handler function to execute
369 pub function: NodeFunction<T>,
370
371 /// MCP tools available to this node
372 pub mcps: Vec<String>,
373
374 /// Custom configuration
375 pub config: HashMap<String, serde_json::Value>,
376
377 /// Whether this node is a planner node (LLM-driven dynamic routing).
378 pub(crate) is_planner: bool,
379
380 /// Whether this node still has a placeholder function (not yet registered).
381 /// If true, execution will return an error instead of silently passing through.
382 pub(crate) is_placeholder: bool,
383}
384
385impl<T: State> Node<T> {
386 /// Create a new compiled node.
387 pub fn new(
388 name: impl Into<String>,
389 function: NodeFunction<T>,
390 mcps: Vec<String>,
391 config: HashMap<String, serde_json::Value>,
392 ) -> Self {
393 Node {
394 name: name.into(),
395 function,
396 mcps,
397 config,
398 is_planner: false,
399 is_placeholder: false,
400 }
401 }
402
403 /// Create a placeholder node (identity function that will be replaced via `register_node`).
404 pub fn new_placeholder(
405 name: impl Into<String>,
406 function: NodeFunction<T>,
407 mcps: Vec<String>,
408 config: HashMap<String, serde_json::Value>,
409 ) -> Self {
410 Node {
411 name: name.into(),
412 function,
413 mcps,
414 config,
415 is_planner: false,
416 is_placeholder: true,
417 }
418 }
419
420 /// Create a planner node (LLM-driven dynamic next-node selection).
421 pub fn new_planner(
422 name: impl Into<String>,
423 function: NodeFunction<T>,
424 mcps: Vec<String>,
425 config: HashMap<String, serde_json::Value>,
426 ) -> Self {
427 Node {
428 name: name.into(),
429 function,
430 mcps,
431 config,
432 is_planner: true,
433 is_placeholder: false,
434 }
435 }
436
437 /// Execute this node with the given state.
438 ///
439 /// # Returns
440 /// The updated state after node execution, or an error.
441 /// Returns an error if the node handler was never registered (still a placeholder).
442 pub async fn execute(&self, state: T) -> crate::core::error::Result<T> {
443 if self.is_placeholder {
444 return Err(FlowgentraError::ExecutionError(format!(
445 "Node '{}' has no registered handler. Call register_node() before execution.",
446 self.name
447 )));
448 }
449 (self.function)(state).await
450 }
451}
452
453// =============================================================================
454// Edge Configuration (from YAML)
455// =============================================================================
456
457/// Configuration for an edge (connection between nodes) from the YAML config.
458/// Supports `to` as a single string or list (parallel targets): `to: [a, b, c]`
459#[derive(Debug, Clone, Serialize, Deserialize)]
460pub struct EdgeConfig {
461 /// Source node name
462 pub from: String,
463
464 /// Target node name, or list for parallel: `to: [analyze_logs, analyze_pcap]`
465 #[serde(
466 deserialize_with = "deserialize_to_one_or_many",
467 serialize_with = "serialize_to_one_or_many"
468 )]
469 pub to: Vec<String>,
470
471 /// Optional condition expression for conditional routing
472 /// If specified, the edge is only taken if this condition evaluates to true.
473 /// The condition name must correspond to a registered condition function.
474 #[serde(default)]
475 pub condition: Option<String>,
476}
477
478impl EdgeConfig {
479 /// Create a new edge from source to target node.
480 pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
481 EdgeConfig {
482 from: from.into(),
483 to: vec![to.into()],
484 condition: None,
485 }
486 }
487
488 /// Iterate over target nodes (one or many for parallel)
489 pub fn to_targets(&self) -> impl Iterator<Item = &str> {
490 self.to.iter().map(String::as_str)
491 }
492
493 /// Add a condition that must be satisfied for this edge to be taken.
494 pub fn with_condition(mut self, condition: impl Into<String>) -> Self {
495 self.condition = Some(condition.into());
496 self
497 }
498}
499
500// =============================================================================
501// Edge Condition Type
502// =============================================================================
503
504/// A condition function that determines if an edge should be taken.
505///
506/// The condition function receives the current state and returns:
507/// - `Some(node_name)` to explicitly jump to that node
508/// - `None` to use the default target node
509/// - An error if the condition evaluation fails
510///
511/// # Example
512/// ```ignore
513/// use flowgentra_ai::core::state::State;
514/// use std::sync::Arc;
515///
516/// let is_complex = Arc::new(|state: &State| -> Result<Option<String>, _> {
517/// if let Some(score) = state.get("complexity_score") {
518/// if score.as_i64().unwrap_or(0) > 50 {
519/// return Ok(Some("process_complex".to_string()));
520/// }
521/// }
522/// Ok(Some("process_simple".to_string()))
523/// } as Box<dyn Fn(&State) -> _>);
524/// ```
525pub type EdgeCondition<T> =
526 std::sync::Arc<dyn Fn(&T) -> crate::core::error::Result<Option<String>> + Send + Sync>;
527
528// =============================================================================
529// Compiled Edge
530// =============================================================================
531
532/// A compiled edge (connection) between two nodes, with optional condition.
533#[derive(Clone)]
534pub struct Edge<T: State> {
535 /// Source node name
536 pub from: String,
537
538 /// Target node name
539 pub to: String,
540
541 /// Optional condition function (legacy string-based conditions)
542 pub condition: Option<EdgeCondition<T>>,
543
544 /// The original condition name from config (for reference)
545 pub condition_name: Option<String>,
546
547 /// Type-safe routing condition (new DSL-based conditions)
548 pub routing_condition: Option<Condition<T>>,
549}
550
551impl<T: State> std::fmt::Debug for Edge<T> {
552 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
553 f.debug_struct("Edge")
554 .field("from", &self.from)
555 .field("to", &self.to)
556 .field("condition", &self.condition.is_some())
557 .field("condition_name", &self.condition_name)
558 .field("routing_condition", &self.routing_condition)
559 .finish()
560 }
561}
562
563impl<T: State> Edge<T> {
564 /// Create a new edge from source to target.
565 pub fn new(
566 from: impl Into<String>,
567 to: impl Into<String>,
568 condition: Option<EdgeCondition<T>>,
569 ) -> Self {
570 Edge::<T> {
571 from: from.into(),
572 to: to.into(),
573 condition,
574 condition_name: None,
575 routing_condition: None,
576 }
577 }
578
579 /// Attach the original condition name (for reference).
580 pub fn with_condition_name(mut self, name: impl Into<String>) -> Self {
581 self.condition_name = Some(name.into());
582 self
583 }
584
585 /// Set the type-safe routing condition for this edge
586 pub fn with_routing_condition(
587 mut self,
588 condition: crate::core::graph::routing::Condition<T>,
589 ) -> Self {
590 self.routing_condition = Some(condition);
591 self
592 }
593
594 /// Check if this edge should be taken given the current state.
595 ///
596 /// Evaluates both legacy conditions (EdgeCondition) and new conditions (Condition DSL).
597 /// Returns true if the edge is unconditional or if the condition evaluates to true.
598 pub async fn should_take(&self, state: &T) -> crate::core::error::Result<bool> {
599 // First check legacy condition function if present
600 if let Some(cond) = &self.condition {
601 match cond(state)? {
602 Some(_) => return Ok(true),
603 None => return Ok(false),
604 }
605 }
606
607 // Then check new routing condition if present
608 if let Some(routing_cond) = &self.routing_condition {
609 return Ok(routing_cond.evaluate(state));
610 }
611
612 // No condition = always traversable
613 Ok(true)
614 }
615
616 /// Get the next node if the condition specifies one.
617 ///
618 /// Returns the explicitly specified next node, or None to use the default target.
619 pub async fn get_next_node(&self, state: &T) -> crate::core::error::Result<Option<String>> {
620 match &self.condition {
621 Some(cond) => cond(state),
622 None => Ok(None),
623 }
624 }
625}
626
627// Sub-modules for node organization
628pub mod advanced_nodes;
629pub mod builtin_nodes;
630pub mod memory_handlers;
631pub mod nodes_trait;
632pub mod planner;