Skip to main content

flowgentra_ai/core/node/
mod.rs

1/// NodeId type alias for node identification
2pub type NodeId = String;
3use crate::core::error::{FlowgentraError, Result};
4use crate::core::node::nodes_trait::PluggableNode;
5use crate::core::state::State;
6use std::sync::Arc;
7
8/// Factory function to create a node from config.
9///
10/// Supports: `supervisor` (`orchestrator`), `subgraph` (`agent`, `agent_or_graph`), `evaluation`.
11///
12/// # Arguments
13/// * `config` - The node configuration (typically from YAML)
14/// * `node_map` - A map of all available nodes, indexed by name
15pub fn create_node_from_config<T: State>(
16    config: &serde_json::Value,
17    node_map: &std::collections::HashMap<String, Box<dyn PluggableNode<T>>>,
18) -> Result<Box<dyn PluggableNode<T>>> {
19    let node_type = config.get("type").and_then(|v| v.as_str()).unwrap_or("");
20    match node_type {
21        // ── Supervisor (alias: orchestrator) ─────────────────────────────────
22        "supervisor" | "orchestrator" => {
23            use crate::core::node::orchestrator_node::{SupervisorNode, SupervisorNodeConfig};
24            let cfg: SupervisorNodeConfig =
25                serde_json::from_value(config.clone()).map_err(|e| {
26                    FlowgentraError::ConfigError(format!(
27                        "Failed to parse supervisor node config: {}",
28                        e
29                    ))
30                })?;
31            let node_name = &cfg.name;
32
33            let mut children: Vec<Arc<dyn PluggableNode<T>>> = Vec::new();
34            let mut missing_children = Vec::new();
35
36            for child_name in &cfg.children {
37                match node_map.get(child_name) {
38                    Some(node) => {
39                        let cloned = node.clone_box();
40                        let arc_node: Arc<dyn PluggableNode<T>> = Arc::from(cloned);
41                        children.push(arc_node);
42                    }
43                    None => missing_children.push(child_name.clone()),
44                }
45            }
46
47            if !missing_children.is_empty() {
48                return Err(FlowgentraError::ConfigError(format!(
49                    "SupervisorNode '{}': missing children: {:?}",
50                    node_name, missing_children
51                )));
52            }
53
54            Ok(Box::new(SupervisorNode::new(cfg, children)?))
55        }
56
57        // ── Subgraph (aliases: agent, agent_or_graph) ──────────────────────
58        "subgraph" | "agent" | "agent_or_graph" => {
59            use crate::core::node::agent_or_graph_node::{SubgraphNode, SubgraphNodeConfig};
60            let cfg: SubgraphNodeConfig = serde_json::from_value(config.clone()).map_err(|e| {
61                FlowgentraError::ConfigError(format!("Failed to parse subgraph node config: {}", e))
62            })?;
63            let target_name = &cfg.path;
64
65            // For the PluggableNode path, look up an already-compiled inner node
66            match node_map.get(target_name) {
67                Some(node) => {
68                    let inner = node.clone_box();
69                    Ok(Box::new(SubgraphNode::new(cfg, inner)?))
70                }
71                None => Err(FlowgentraError::ConfigError(format!(
72                    "SubgraphNode '{}': inner node '{}' not found in node_map. \
73                     (In YAML config, set 'path' to the sub-agent YAML file.)",
74                    cfg.name, target_name
75                ))),
76            }
77        }
78
79        // ── Evaluation ────────────────────────────────────────────────────────
80        "evaluation" => {
81            use crate::core::node::evaluation_node::{EvaluationNode, EvaluationNodeConfig};
82            let eval_config: EvaluationNodeConfig = serde_json::from_value(config.clone())
83                .map_err(|e| {
84                    FlowgentraError::ConfigError(format!(
85                        "Failed to parse evaluation node config: {}",
86                        e
87                    ))
88                })?;
89            let handler_name = &eval_config.handler;
90            match node_map.get(handler_name) {
91                Some(inner_node) => {
92                    let inner = inner_node.clone_box();
93                    Ok(Box::new(EvaluationNode::new(eval_config, inner)?))
94                }
95                None => Err(FlowgentraError::ConfigError(format!(
96                    "EvaluationNode '{}': handler '{}' not found in node_map.",
97                    eval_config.name, handler_name
98                ))),
99            }
100        }
101
102        _ => Err(FlowgentraError::ConfigError(format!(
103            "Unknown node type: '{}'. Expected: supervisor, subgraph, evaluation \
104             (aliases: orchestrator, agent, agent_or_graph)",
105            node_type
106        ))),
107    }
108}
109
110pub mod agent_or_graph_node;
111pub mod evaluation_node;
112pub mod orchestrator_node;
113
114// ── Re-exports ────────────────────────────────────────────────────────────────
115
116// Supervisor (canonical) + backwards-compat aliases
117pub use orchestrator_node::{
118    ChildExecutionStats,
119    // supporting types
120    OrchestrationStrategy,
121    OrchestratorNode,
122    // aliases
123    OrchestratorNodeConfig,
124    ParallelAggregation,
125    ParallelMergeStrategy,
126    SupervisorNode,
127    SupervisorNodeConfig,
128};
129
130// Subgraph (canonical) + backwards-compat aliases
131pub use agent_or_graph_node::{
132    AgentOrGraphNode,
133    // aliases
134    AgentOrGraphNodeConfig,
135    // kept for existing imports
136    AgentOrGraphType,
137    SubgraphNode,
138    SubgraphNodeConfig,
139};
140
141pub use evaluation_node::{
142    default_scorer, scorer_combine, scorer_from_confidence, scorer_from_llm_grader,
143    scorer_from_node_scorer, scorer_from_sync, Attempt, EvaluationNode, EvaluationNodeConfig,
144    EvaluationResult, ExitReason, ScorerFn,
145};
146// # Node Configuration and Execution
147//
148// Nodes represent individual computational steps in your agent's workflow.
149// Each node has a handler function that processes state and produces new state.
150//
151// ## Node Concepts
152//
153// **Handlers** - The actual computation logic executed by a node.
154// Each handler receives the current state and returns updated state.
155//
156// **Conditions** - Decision points that determine which edges to take.
157// Used for branching logic based on the current state.
158//
159// **MCPs** - External tools and services available to the node.
160// Nodes can reference MCP clients to extend their capabilities.
161//
162// ## Example
163//
164// ```yaml
165// # Standard node
166// - name: process_data
167//   type: handler
168//   handler: my_handler
169//   mcps: [web_search]
170//
171// # Orchestrator node
172// - name: orchestrate_agents
173//   type: orchestrator
174//   strategy: parallel
175//   children: [agent1, agent2, agent3]
176//
177// # Agent or Graph node
178// - name: agent1
179//   type: agent
180//   target: some_agent
181// - name: subgraph1
182//   type: graph
183//   target: subgraph.yaml
184/// ```
185use crate::core::routing::Condition;
186use serde::{Deserialize, Deserializer, Serialize};
187use std::collections::HashMap;
188
189fn deserialize_to_one_or_many<'de, D>(deserializer: D) -> std::result::Result<Vec<String>, D::Error>
190where
191    D: Deserializer<'de>,
192{
193    #[derive(Deserialize)]
194    #[serde(untagged)]
195    enum OneOrMany {
196        One(String),
197        Many(Vec<String>),
198    }
199    match OneOrMany::deserialize(deserializer)? {
200        OneOrMany::One(s) => Ok(vec![s]),
201        OneOrMany::Many(v) => Ok(v),
202    }
203}
204
205fn serialize_to_one_or_many<S>(v: &[String], s: S) -> std::result::Result<S::Ok, S::Error>
206where
207    S: serde::Serializer,
208{
209    use serde::Serialize;
210    if v.len() == 1 {
211        v[0].serialize(s)
212    } else {
213        v.serialize(s)
214    }
215}
216
217// =============================================================================
218// Node Configuration (from YAML)
219// =============================================================================
220
221/// Configuration for a node as specified in the YAML config file.
222///
223/// This represents the node definition before it's compiled with actual handler code.
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct NodeConfig {
226    /// Unique identifier for this node
227    pub name: String,
228
229    /// Name of the handler function to execute.
230    /// Optional for node types that don't use a registered handler
231    /// (e.g., "planner", "human_in_the_loop", "memory").
232    #[serde(alias = "function", default)]
233    pub handler: String,
234
235    /// Optional node type (e.g., "standard", "evaluation", "orchestrator")
236    #[serde(rename = "type", default)]
237    pub node_type: Option<String>,
238
239    /// Optional MCP tools available to this node
240    #[serde(default)]
241    pub mcps: Vec<String>,
242
243    /// Optional execution timeout in milliseconds
244    /// If the node takes longer than this, execution will be interrupted
245    #[serde(default)]
246    pub timeout_ms: Option<u64>,
247
248    /// Optional node-specific configuration
249    /// (Useful for passing parameters to the handler)
250    #[serde(default)]
251    pub config: HashMap<String, serde_json::Value>,
252}
253
254impl NodeConfig {
255    /// Create a new node configuration with the given name and handler.
256    pub fn new(name: impl Into<String>, handler: impl Into<String>) -> Self {
257        NodeConfig {
258            name: name.into(),
259            handler: handler.into(),
260            node_type: None,
261            mcps: Vec::new(),
262            timeout_ms: None,
263            config: HashMap::new(),
264        }
265    }
266
267    /// Attach MCP tools to this node.
268    pub fn with_mcps(mut self, mcps: Vec<String>) -> Self {
269        self.mcps = mcps;
270        self
271    }
272
273    /// Set timeout for this node in milliseconds
274    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
275        self.timeout_ms = Some(timeout_ms);
276        self
277    }
278    /// Add custom configuration for this node.
279    pub fn with_config(mut self, config: HashMap<String, serde_json::Value>) -> Self {
280        self.config = config;
281        self
282    }
283
284    /// Validate the node configuration
285    ///
286    /// Checks:
287    /// - Timeout is in a reasonable range (if specified)
288    /// - Timeout is not exactly 0 (would timeout immediately)
289    ///
290    /// # Returns
291    /// Error if validation fails
292    pub fn validate(&self) -> crate::core::error::Result<()> {
293        if let Some(timeout_ms) = self.timeout_ms {
294            // Timeout of 0 doesn't make sense
295            if timeout_ms == 0 {
296                return Err(crate::core::error::FlowgentraError::ConfigError(
297                    format!(
298                        "Node '{}': timeout_ms cannot be 0 (would timeout immediately). Set it to a positive milliseconds value.",
299                        self.name
300                    ),
301                ));
302            }
303
304            // Warn if timeout is extremely large (more than 1 hour = 3,600,000ms)
305            const MAX_REASONABLE_TIMEOUT_MS: u64 = 3_600_000; // 1 hour
306            if timeout_ms > MAX_REASONABLE_TIMEOUT_MS {
307                eprintln!(
308                    "⚠️  Warning: Node '{}' has timeout_ms = {} (more than 1 hour). Is this intentional?",
309                    self.name, timeout_ms
310                );
311            }
312
313            // Warn if timeout is very small (less than 100ms)
314            const MIN_REASONABLE_TIMEOUT_MS: u64 = 100;
315            if timeout_ms < MIN_REASONABLE_TIMEOUT_MS {
316                eprintln!(
317                    "⚠️  Warning: Node '{}' has timeout_ms = {} (less than 100ms). This might be too short for I/O operations.",
318                    self.name, timeout_ms
319                );
320            }
321        }
322        Ok(())
323    }
324}
325
326// =============================================================================
327// Node Function Type
328// =============================================================================
329
330/// The function signature for node handlers.
331///
332/// A node handler is an async function that:
333/// 1. Receives the current state
334/// 2. Performs its computation (possibly using MCPs or LLM)
335/// 3. Returns the updated state or an error
336///
337/// # Example Handler
338/// ```no_run
339/// use flowgentra_ai::core::node::NodeFunction;
340/// use flowgentra_ai::core::state::SharedState;
341/// use serde_json::json;
342///
343/// fn my_handler() -> NodeFunction<SharedState> {
344///     Box::new(|state| {
345///         Box::pin(async move {
346///             // Your logic here
347///             state.set("processed", json!(true));
348///             Ok(state)
349///         })
350///     })
351/// }
352/// ```
353pub type NodeFunction<T> = Box<
354    dyn Fn(T) -> futures::future::BoxFuture<'static, crate::core::error::Result<T>> + Send + Sync,
355>;
356
357// =============================================================================
358// Compiled Node
359// =============================================================================
360
361/// A compiled node ready for execution.
362///
363/// This is created internally by the runtime when configuration is loaded.
364pub struct Node<T: State> {
365    /// Node name (identifier)
366    pub name: String,
367
368    /// The actual handler function to execute
369    pub function: NodeFunction<T>,
370
371    /// MCP tools available to this node
372    pub mcps: Vec<String>,
373
374    /// Custom configuration
375    pub config: HashMap<String, serde_json::Value>,
376
377    /// Whether this node is a planner node (LLM-driven dynamic routing).
378    pub(crate) is_planner: bool,
379
380    /// Whether this node still has a placeholder function (not yet registered).
381    /// If true, execution will return an error instead of silently passing through.
382    pub(crate) is_placeholder: bool,
383}
384
385impl<T: State> Node<T> {
386    /// Create a new compiled node.
387    pub fn new(
388        name: impl Into<String>,
389        function: NodeFunction<T>,
390        mcps: Vec<String>,
391        config: HashMap<String, serde_json::Value>,
392    ) -> Self {
393        Node {
394            name: name.into(),
395            function,
396            mcps,
397            config,
398            is_planner: false,
399            is_placeholder: false,
400        }
401    }
402
403    /// Create a placeholder node (identity function that will be replaced via `register_node`).
404    pub fn new_placeholder(
405        name: impl Into<String>,
406        function: NodeFunction<T>,
407        mcps: Vec<String>,
408        config: HashMap<String, serde_json::Value>,
409    ) -> Self {
410        Node {
411            name: name.into(),
412            function,
413            mcps,
414            config,
415            is_planner: false,
416            is_placeholder: true,
417        }
418    }
419
420    /// Create a planner node (LLM-driven dynamic next-node selection).
421    pub fn new_planner(
422        name: impl Into<String>,
423        function: NodeFunction<T>,
424        mcps: Vec<String>,
425        config: HashMap<String, serde_json::Value>,
426    ) -> Self {
427        Node {
428            name: name.into(),
429            function,
430            mcps,
431            config,
432            is_planner: true,
433            is_placeholder: false,
434        }
435    }
436
437    /// Execute this node with the given state.
438    ///
439    /// # Returns
440    /// The updated state after node execution, or an error.
441    /// Returns an error if the node handler was never registered (still a placeholder).
442    pub async fn execute(&self, state: T) -> crate::core::error::Result<T> {
443        if self.is_placeholder {
444            return Err(FlowgentraError::ExecutionError(format!(
445                "Node '{}' has no registered handler. Call register_node() before execution.",
446                self.name
447            )));
448        }
449        (self.function)(state).await
450    }
451}
452
453// =============================================================================
454// Edge Configuration (from YAML)
455// =============================================================================
456
457/// Configuration for an edge (connection between nodes) from the YAML config.
458/// Supports `to` as a single string or list (parallel targets): `to: [a, b, c]`
459#[derive(Debug, Clone, Serialize, Deserialize)]
460pub struct EdgeConfig {
461    /// Source node name
462    pub from: String,
463
464    /// Target node name, or list for parallel: `to: [analyze_logs, analyze_pcap]`
465    #[serde(
466        deserialize_with = "deserialize_to_one_or_many",
467        serialize_with = "serialize_to_one_or_many"
468    )]
469    pub to: Vec<String>,
470
471    /// Optional condition expression for conditional routing
472    /// If specified, the edge is only taken if this condition evaluates to true.
473    /// The condition name must correspond to a registered condition function.
474    #[serde(default)]
475    pub condition: Option<String>,
476}
477
478impl EdgeConfig {
479    /// Create a new edge from source to target node.
480    pub fn new(from: impl Into<String>, to: impl Into<String>) -> Self {
481        EdgeConfig {
482            from: from.into(),
483            to: vec![to.into()],
484            condition: None,
485        }
486    }
487
488    /// Iterate over target nodes (one or many for parallel)
489    pub fn to_targets(&self) -> impl Iterator<Item = &str> {
490        self.to.iter().map(String::as_str)
491    }
492
493    /// Add a condition that must be satisfied for this edge to be taken.
494    pub fn with_condition(mut self, condition: impl Into<String>) -> Self {
495        self.condition = Some(condition.into());
496        self
497    }
498}
499
500// =============================================================================
501// Edge Condition Type
502// =============================================================================
503
504/// A condition function that determines if an edge should be taken.
505///
506/// The condition function receives the current state and returns:
507/// - `Some(node_name)` to explicitly jump to that node
508/// - `None` to use the default target node
509/// - An error if the condition evaluation fails
510///
511/// # Example
512/// ```ignore
513/// use flowgentra_ai::core::state::State;
514/// use std::sync::Arc;
515///
516/// let is_complex = Arc::new(|state: &State| -> Result<Option<String>, _> {
517///     if let Some(score) = state.get("complexity_score") {
518///         if score.as_i64().unwrap_or(0) > 50 {
519///             return Ok(Some("process_complex".to_string()));
520///         }
521///     }
522///     Ok(Some("process_simple".to_string()))
523/// } as Box<dyn Fn(&State) -> _>);
524/// ```
525pub type EdgeCondition<T> =
526    std::sync::Arc<dyn Fn(&T) -> crate::core::error::Result<Option<String>> + Send + Sync>;
527
528// =============================================================================
529// Compiled Edge
530// =============================================================================
531
532/// A compiled edge (connection) between two nodes, with optional condition.
533#[derive(Clone)]
534pub struct Edge<T: State> {
535    /// Source node name
536    pub from: String,
537
538    /// Target node name
539    pub to: String,
540
541    /// Optional condition function (legacy string-based conditions)
542    pub condition: Option<EdgeCondition<T>>,
543
544    /// The original condition name from config (for reference)
545    pub condition_name: Option<String>,
546
547    /// Type-safe routing condition (new DSL-based conditions)
548    pub routing_condition: Option<Condition<T>>,
549}
550
551impl<T: State> std::fmt::Debug for Edge<T> {
552    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
553        f.debug_struct("Edge")
554            .field("from", &self.from)
555            .field("to", &self.to)
556            .field("condition", &self.condition.is_some())
557            .field("condition_name", &self.condition_name)
558            .field("routing_condition", &self.routing_condition)
559            .finish()
560    }
561}
562
563impl<T: State> Edge<T> {
564    /// Create a new edge from source to target.
565    pub fn new(
566        from: impl Into<String>,
567        to: impl Into<String>,
568        condition: Option<EdgeCondition<T>>,
569    ) -> Self {
570        Edge::<T> {
571            from: from.into(),
572            to: to.into(),
573            condition,
574            condition_name: None,
575            routing_condition: None,
576        }
577    }
578
579    /// Attach the original condition name (for reference).
580    pub fn with_condition_name(mut self, name: impl Into<String>) -> Self {
581        self.condition_name = Some(name.into());
582        self
583    }
584
585    /// Set the type-safe routing condition for this edge
586    pub fn with_routing_condition(
587        mut self,
588        condition: crate::core::graph::routing::Condition<T>,
589    ) -> Self {
590        self.routing_condition = Some(condition);
591        self
592    }
593
594    /// Check if this edge should be taken given the current state.
595    ///
596    /// Evaluates both legacy conditions (EdgeCondition) and new conditions (Condition DSL).
597    /// Returns true if the edge is unconditional or if the condition evaluates to true.
598    pub async fn should_take(&self, state: &T) -> crate::core::error::Result<bool> {
599        // First check legacy condition function if present
600        if let Some(cond) = &self.condition {
601            match cond(state)? {
602                Some(_) => return Ok(true),
603                None => return Ok(false),
604            }
605        }
606
607        // Then check new routing condition if present
608        if let Some(routing_cond) = &self.routing_condition {
609            return Ok(routing_cond.evaluate(state));
610        }
611
612        // No condition = always traversable
613        Ok(true)
614    }
615
616    /// Get the next node if the condition specifies one.
617    ///
618    /// Returns the explicitly specified next node, or None to use the default target.
619    pub async fn get_next_node(&self, state: &T) -> crate::core::error::Result<Option<String>> {
620        match &self.condition {
621            Some(cond) => cond(state),
622            None => Ok(None),
623        }
624    }
625}
626
627// Sub-modules for node organization
628pub mod advanced_nodes;
629pub mod builtin_nodes;
630pub mod memory_handlers;
631pub mod nodes_trait;
632pub mod planner;