Skip to main content

motosan_workflow_model/
node.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8use crate::workflow::Workflow;
9
10/// Trait that allows `input_from()` to accept both a single string and a collection of strings.
11///
12/// # Examples
13/// ```ignore
14/// .input_from("single_node")      // single &str
15/// .input_from(["a", "b"])         // array of &str
16/// .input_from(vec!["a".into()])   // Vec<String>
17/// ```
18pub trait IntoInputIds {
19    fn into_input_ids(self) -> Vec<String>;
20}
21
22impl IntoInputIds for &str {
23    fn into_input_ids(self) -> Vec<String> {
24        vec![self.to_owned()]
25    }
26}
27
28impl IntoInputIds for String {
29    fn into_input_ids(self) -> Vec<String> {
30        vec![self]
31    }
32}
33
34impl IntoInputIds for &String {
35    fn into_input_ids(self) -> Vec<String> {
36        vec![self.clone()]
37    }
38}
39
40impl<const N: usize> IntoInputIds for [&str; N] {
41    fn into_input_ids(self) -> Vec<String> {
42        self.into_iter().map(|s| s.to_owned()).collect()
43    }
44}
45
46impl<const N: usize> IntoInputIds for [String; N] {
47    fn into_input_ids(self) -> Vec<String> {
48        self.into_iter().collect()
49    }
50}
51
52impl IntoInputIds for Vec<String> {
53    fn into_input_ids(self) -> Vec<String> {
54        self
55    }
56}
57
58impl IntoInputIds for Vec<&str> {
59    fn into_input_ids(self) -> Vec<String> {
60        self.into_iter().map(|s| s.to_owned()).collect()
61    }
62}
63
64/// What to do when a node exhausts all retries.
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
66pub enum FailureMode {
67    /// Skip this node and continue the workflow (store null output).
68    Skip,
69    /// Abort the entire workflow with an error.
70    Abort,
71    /// Use the given fallback string as the node output and continue.
72    Fallback(String),
73}
74
75/// Configurable retry behavior for a node.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct RetryPolicy {
78    /// Maximum number of retries (0 means no retries, just the initial attempt).
79    pub max_retries: u32,
80    /// Initial backoff in milliseconds before the first retry.
81    pub backoff_ms: u64,
82    /// Multiplier applied to backoff after each retry (exponential backoff).
83    pub backoff_multiplier: f64,
84    /// What to do when all retries are exhausted.
85    pub on_failure: FailureMode,
86}
87
88impl Default for RetryPolicy {
89    fn default() -> Self {
90        Self {
91            max_retries: 2,
92            backoff_ms: 100,
93            backoff_multiplier: 2.0,
94            on_failure: FailureMode::Abort,
95        }
96    }
97}
98
99/// Configuration for an Agent node.
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct AgentConfig {
102    pub name: String,
103    pub system_prompt: String,
104    #[serde(default)]
105    pub tools: Vec<String>,
106    #[serde(default)]
107    pub input_from: Vec<String>,
108    pub output_schema: Option<Value>,
109    #[serde(default)]
110    pub skills: Vec<String>,
111    /// Tool definitions available to this agent node for multi-turn tool use.
112    /// The primary mechanism is through `Runtime::with_tool_executor()`, but this
113    /// field allows YAML-defined tool schemas.
114    #[serde(default)]
115    pub tool_definitions: Vec<crate::traits::ToolDefinition>,
116    /// Maximum number of ReAct loop iterations when tools are present.
117    /// Defaults to 20 if not specified.
118    #[serde(default)]
119    pub max_turns: Option<u32>,
120}
121
122/// Configuration for a Human gate node.
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct HumanConfig {
125    /// Prompt to show to the human.
126    pub prompt: String,
127    /// Timeout in seconds. If None, waits indefinitely.
128    pub timeout_secs: Option<u64>,
129    /// Available options (e.g., ["approve", "reject", "edit"]).
130    #[serde(default)]
131    pub options: Vec<String>,
132    /// Default action on timeout (e.g., "approve" or "reject").
133    pub timeout_action: Option<String>,
134}
135
136/// A single branch in a ConditionNode: a JSON pointer path, comparison operator, and value.
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct ConditionBranch {
139    /// JSON pointer path to extract from upstream output (e.g., "/confidence").
140    pub path: String,
141    /// Comparison operator: "gt", "gte", "lt", "lte", "eq", "neq".
142    pub op: ConditionOp,
143    /// The value to compare against.
144    pub value: Value,
145    /// Target node ID to route to when this branch matches.
146    pub goto: String,
147}
148
149/// Comparison operators for condition evaluation.
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
151pub enum ConditionOp {
152    /// Greater than (>)
153    #[serde(rename = "gt")]
154    Gt,
155    /// Greater than or equal (>=)
156    #[serde(rename = "gte")]
157    Gte,
158    /// Less than (<)
159    #[serde(rename = "lt")]
160    Lt,
161    /// Less than or equal (<=)
162    #[serde(rename = "lte")]
163    Lte,
164    /// Equal (==)
165    #[serde(rename = "eq")]
166    Eq,
167    /// Not equal (!=)
168    #[serde(rename = "neq")]
169    Neq,
170}
171
172/// Evaluate a condition: extract `path` from `data`, compare with `op` against `value`.
173pub fn evaluate_condition(data: &Value, branch: &ConditionBranch) -> bool {
174    let extracted = data.pointer(&branch.path);
175    let extracted = match extracted {
176        Some(v) => v,
177        None => return false,
178    };
179
180    match &branch.op {
181        ConditionOp::Eq => extracted == &branch.value,
182        ConditionOp::Neq => extracted != &branch.value,
183        ConditionOp::Gt | ConditionOp::Gte | ConditionOp::Lt | ConditionOp::Lte => {
184            compare_numeric(extracted, &branch.value, &branch.op)
185        }
186    }
187}
188
189fn compare_numeric(lhs: &Value, rhs: &Value, op: &ConditionOp) -> bool {
190    let lhs_f = value_as_f64(lhs);
191    let rhs_f = value_as_f64(rhs);
192    match (lhs_f, rhs_f) {
193        (Some(l), Some(r)) => match op {
194            ConditionOp::Gt => l > r,
195            ConditionOp::Gte => l >= r,
196            ConditionOp::Lt => l < r,
197            ConditionOp::Lte => l <= r,
198            _ => false,
199        },
200        _ => false,
201    }
202}
203
204fn value_as_f64(v: &Value) -> Option<f64> {
205    v.as_f64().or_else(|| v.as_i64().map(|i| i as f64))
206}
207
208/// Configuration for a Condition node.
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct ConditionConfig {
211    /// IDs of upstream nodes whose output is used for evaluation.
212    pub input_from: Vec<String>,
213    /// Ordered list of branches; first match wins.
214    pub branches: Vec<ConditionBranch>,
215    /// Optional default target if no branch matches (instead of error).
216    pub default_goto: Option<String>,
217}
218
219/// Configuration for a Loop node.
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct LoopConfig {
222    /// Node IDs that form the loop body, executed in order each iteration.
223    pub body: Vec<String>,
224    /// Maximum number of iterations (safety limit).
225    pub max_iterations: usize,
226    /// Until condition: JSON pointer path + op + value checked against the last body node's output.
227    pub until: Option<ConditionBranch>,
228}
229
230/// The type of a synchronous transform function.
231pub type TransformFn = Arc<dyn Fn(&Value) -> std::result::Result<Value, String> + Send + Sync>;
232
233/// The type of an async transform function — supports HTTP calls, file I/O, etc.
234pub type AsyncTransformFn = Arc<
235    dyn Fn(
236            Value,
237        ) -> std::pin::Pin<
238            Box<dyn std::future::Future<Output = std::result::Result<Value, String>> + Send>,
239        > + Send
240        + Sync,
241>;
242
243/// Configuration for a Transform node (pure function, async function, or external script).
244///
245/// Execution priority: `script` > `async_fn` > `transform_fn`.
246#[derive(Clone)]
247pub struct TransformConfig {
248    /// IDs of upstream nodes whose output is used as input.
249    pub input_from: Vec<String>,
250    /// The sync transform function (lowest priority fallback).
251    pub transform_fn: TransformFn,
252    /// Optional async transform function.
253    pub async_fn: Option<AsyncTransformFn>,
254    /// Optional external script path. When set, the runtime spawns the script as a subprocess,
255    /// pipes input JSON via stdin, and captures output JSON from stdout.
256    /// Supports: .mjs, .js (node), .py (python3), .sh (bash).
257    pub script: Option<String>,
258}
259
260impl std::fmt::Debug for TransformConfig {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        let has_async = self.async_fn.is_some();
263        f.debug_struct("TransformConfig")
264            .field("input_from", &self.input_from)
265            .field("transform_fn", &"<fn>")
266            .field("async_fn", &if has_async { "<async fn>" } else { "<none>" })
267            .field("script", &self.script)
268            .finish()
269    }
270}
271
272/// How to handle errors in individual foreach items.
273#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
274#[serde(rename_all = "snake_case")]
275pub enum ForeachErrorMode {
276    /// Abort the entire foreach on the first error (current/default behavior).
277    #[default]
278    Abort,
279    /// Skip failed items, record the error as a JSON object, and continue.
280    Skip,
281}
282
283/// Configuration for a SubWorkflow node (nested workflow composition).
284#[derive(Debug, Clone)]
285pub struct SubWorkflowConfig {
286    /// The nested workflow to execute.
287    pub workflow: Workflow,
288    /// IDs of upstream nodes whose output is used as input.
289    pub input_from: Vec<String>,
290    /// JSON pointer path to extract an array from upstream output (e.g. "/issues").
291    /// When set, the sub-workflow is executed once per array element.
292    pub foreach: Option<String>,
293    /// Whether to run foreach iterations in parallel. Default: false.
294    pub parallel: bool,
295    /// Maximum number of concurrent iterations when parallel=true.
296    /// Default: unbounded (all items at once).
297    pub max_parallel: Option<usize>,
298    /// How to handle errors in individual foreach items.
299    pub on_item_error: ForeachErrorMode,
300}
301
302/// Strategy for automatic skill selection.
303#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
304pub enum SkillSelectStrategy {
305    /// Use an LLM to pick the best skill.
306    Llm,
307    /// Use keyword matching to pick the best skill.
308    Keyword,
309}
310
311/// A skill that an ACP agent can use.
312#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
313#[serde(tag = "type", content = "value", rename_all = "snake_case")]
314pub enum AgencySkill {
315    /// A skill referenced by name (resolved via skill system).
316    Named(String),
317    /// A skill loaded from a file path.
318    File(PathBuf),
319    /// An inline skill definition.
320    Inline(String),
321    /// Automatically select a skill from candidates.
322    AutoSelect {
323        candidates: Option<Vec<String>>,
324        from_dir: Option<PathBuf>,
325        strategy: SkillSelectStrategy,
326    },
327}
328
329/// Session mode for an ACP agent node.
330#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
331#[serde(tag = "mode", content = "value", rename_all = "snake_case")]
332pub enum AcpSessionMode {
333    /// Start a new session each time.
334    New,
335    /// Resume a previously started session by ID.
336    Resume(String),
337    /// Maintain a persistent session identified by key.
338    Persistent { key: String },
339}
340
341/// Fallback behavior when an ACP agent is unavailable.
342#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
343pub enum AcpFallback {
344    /// Fail the node with an error.
345    Fail,
346    /// Skip the node and continue.
347    Skip,
348    /// Use a local LLM as fallback.
349    UseLlm,
350}
351
352/// File access configuration for an ACP agent.
353#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
354pub struct FileAccessConfig {
355    /// Whether read access is allowed.
356    #[serde(default)]
357    pub read: bool,
358    /// Whether write access is allowed.
359    #[serde(default)]
360    pub write: bool,
361    /// Restrict access to these paths only. Empty means no restriction.
362    #[serde(default)]
363    pub allowed_paths: Vec<PathBuf>,
364}
365
366/// Capabilities exposed by an ACP client handler.
367#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
368pub struct AcpCapabilities {
369    /// Tools that are automatically approved without human confirmation.
370    #[serde(default)]
371    pub auto_approve_tools: Vec<String>,
372    /// File access restrictions.
373    pub file_access: Option<FileAccessConfig>,
374    /// Whether terminal access is allowed.
375    #[serde(default)]
376    pub terminal: bool,
377}
378
379/// Configuration for an MCP server to inject into the ACP agent.
380#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
381pub struct McpServerConfig {
382    pub name: String,
383    pub command: String,
384    #[serde(default)]
385    pub args: Vec<String>,
386    #[serde(default)]
387    pub env: HashMap<String, String>,
388}
389
390/// Configuration for an ACP (Agent Coding Protocol) agent node.
391#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
392pub struct AcpNodeConfig {
393    /// The command to invoke the coding agent (e.g., "claude", "aider").
394    pub agent_command: String,
395    /// Optional skill to load for the agent.
396    pub skill: Option<AgencySkill>,
397    /// Files to include in the agent's context window.
398    #[serde(default)]
399    pub context_files: Vec<PathBuf>,
400    /// Template string for the task prompt (supports `{{variable}}` interpolation).
401    pub task_template: String,
402    /// Session management mode.
403    pub session: AcpSessionMode,
404    /// What to do when the agent is unavailable.
405    pub on_unavailable: AcpFallback,
406    /// Optional JSON Schema to validate the agent's output.
407    pub output_schema: Option<Value>,
408    /// Maximum number of agentic turns before forcing completion.
409    pub max_turns: Option<u32>,
410    /// Timeout in seconds for the entire agent invocation.
411    pub timeout_secs: Option<u64>,
412    /// IDs of upstream nodes whose output is piped as input.
413    #[serde(default)]
414    pub input_from: Vec<String>,
415    /// Capabilities exposed by the ACP client handler.
416    #[serde(default)]
417    pub capabilities: Option<AcpCapabilities>,
418    /// Working directory for the agent. If None, uses process cwd.
419    #[serde(default)]
420    pub working_dir: Option<PathBuf>,
421    /// MCP servers to inject into the ACP agent subprocess.
422    #[serde(default)]
423    pub mcp_servers: Vec<McpServerConfig>,
424    /// Enable web search capability for the agent.
425    /// For Claude: adds Brave Search MCP server automatically.
426    /// For Codex: adds --search flag.
427    #[serde(default)]
428    pub enable_web_search: bool,
429}
430
431/// Configuration for a worker in a swarm node.
432#[derive(Debug, Clone, Serialize, Deserialize)]
433pub struct SwarmWorkerConfig {
434    pub skill: String,
435    pub agent: String,
436    #[serde(default)]
437    pub enable_web_search: bool,
438    /// Per-worker MCP servers (merged with SwarmConfig::shared_mcp_servers at runtime).
439    #[serde(default)]
440    pub mcp_servers: Vec<McpServerConfig>,
441}
442
443/// Completion criteria for a swarm node.
444#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
445#[serde(rename_all = "snake_case")]
446pub enum SwarmCompletionCriteria {
447    #[default]
448    QueueEmpty,
449    LlmDecides,
450}
451
452/// Configuration for a Swarm node in the DAG.
453#[derive(Debug, Clone, Serialize, Deserialize)]
454pub struct SwarmConfig {
455    /// Template for the goal passed to the planner (supports `{{variable}}` interpolation).
456    pub goal_template: String,
457    /// Maximum number of concurrent workers.
458    #[serde(default = "default_max_parallel")]
459    pub max_parallel: u32,
460    /// Maximum total tasks (prevents runaway planning).
461    pub max_tasks: Option<u32>,
462    /// Maximum task depth (prevents infinite subtask spawning).
463    pub max_depth: Option<u32>,
464    /// Worker configurations.
465    #[serde(default)]
466    pub workers: Vec<SwarmWorkerConfig>,
467    /// When to consider the swarm complete.
468    #[serde(default)]
469    pub completion: SwarmCompletionCriteria,
470    /// IDs of upstream nodes whose output is piped as input.
471    #[serde(default)]
472    pub input_from: Vec<String>,
473    /// Budget cap in USD. When cumulative task cost reaches this limit the swarm
474    /// stops scheduling new tasks and fails any remaining pending work.
475    /// `None` means unlimited.
476    #[serde(default)]
477    pub budget_usd: Option<f64>,
478    /// MCP servers injected into **all** workers in this swarm.
479    ///
480    /// At runtime, when spawning a worker the effective MCP server list is:
481    ///   `shared_mcp_servers ++ worker.mcp_servers`
482    /// (shared first, then per-worker; duplicates by name should be deduped,
483    /// with the per-worker entry winning).
484    #[serde(default)]
485    pub shared_mcp_servers: Vec<McpServerConfig>,
486}
487
488fn default_max_parallel() -> u32 {
489    4
490}
491
492/// The kind of a workflow node.
493#[derive(Debug, Clone)]
494pub enum NodeKind {
495    Agent(AgentConfig),
496    Human(HumanConfig),
497    Condition(ConditionConfig),
498    Loop(LoopConfig),
499    Transform(TransformConfig),
500    SubWorkflow(SubWorkflowConfig),
501    AcpAgent(AcpNodeConfig),
502    Swarm(SwarmConfig),
503}
504
505/// A single node in a workflow DAG.
506#[derive(Debug, Clone)]
507pub struct Node {
508    pub id: String,
509    pub kind: NodeKind,
510    /// Optional retry policy for this node.
511    pub retry_policy: Option<RetryPolicy>,
512}
513
514impl Node {
515    /// Returns a human-readable display name for this node.
516    /// For agent nodes, returns the agent name; for others, returns the node ID.
517    pub fn display_name(&self) -> &str {
518        match &self.kind {
519            NodeKind::Agent(c) => &c.name,
520            NodeKind::AcpAgent(c) => &c.agent_command,
521            _ => &self.id,
522        }
523    }
524
525    /// Returns a string label for the node kind.
526    pub fn kind_str(&self) -> &str {
527        match &self.kind {
528            NodeKind::Agent(_) => "agent",
529            NodeKind::Human(_) => "human",
530            NodeKind::Transform(c) if c.script.is_some() => "script",
531            NodeKind::Transform(_) => "transform",
532            NodeKind::Condition(_) => "condition",
533            NodeKind::Loop(_) => "loop",
534            NodeKind::SubWorkflow(_) => "sub_workflow",
535            NodeKind::AcpAgent(_) => "acp_agent",
536            NodeKind::Swarm(_) => "swarm",
537        }
538    }
539
540    /// Create a new Agent node builder.
541    pub fn agent(id: impl Into<String>) -> NodeBuilder {
542        NodeBuilder {
543            id: id.into(),
544            kind: NodeBuilderKind::Agent {
545                name: None,
546                system_prompt: None,
547                tools: vec![],
548                input_from: vec![],
549                output_schema: None,
550                skills: vec![],
551                max_turns: None,
552            },
553            retry_policy: None,
554        }
555    }
556
557    /// Create a new Human node builder.
558    pub fn human(id: impl Into<String>) -> NodeBuilder {
559        NodeBuilder {
560            id: id.into(),
561            kind: NodeBuilderKind::Human {
562                prompt: None,
563                timeout_secs: None,
564                options: vec![],
565                timeout_action: None,
566            },
567            retry_policy: None,
568        }
569    }
570
571    /// Create a new Condition node builder.
572    pub fn condition(id: impl Into<String>) -> NodeBuilder {
573        NodeBuilder {
574            id: id.into(),
575            kind: NodeBuilderKind::Condition {
576                input_from: vec![],
577                branches: vec![],
578                default_goto: None,
579            },
580            retry_policy: None,
581        }
582    }
583
584    /// Create a new Loop node builder.
585    pub fn loop_node(id: impl Into<String>) -> NodeBuilder {
586        NodeBuilder {
587            id: id.into(),
588            kind: NodeBuilderKind::Loop {
589                body: vec![],
590                max_iterations: 10,
591                until: None,
592            },
593            retry_policy: None,
594        }
595    }
596
597    /// Create a new Transform node builder.
598    pub fn transform(id: impl Into<String>) -> NodeBuilder {
599        NodeBuilder {
600            id: id.into(),
601            kind: NodeBuilderKind::Transform {
602                input_from: vec![],
603                transform_fn: None,
604                async_fn: None,
605            },
606            retry_policy: None,
607        }
608    }
609
610    /// Create a new AcpAgent node builder.
611    pub fn acp_agent(id: impl Into<String>) -> NodeBuilder {
612        NodeBuilder {
613            id: id.into(),
614            kind: NodeBuilderKind::AcpAgent {
615                agent_command: None,
616                skill: None,
617                context_files: vec![],
618                task_template: None,
619                session: AcpSessionMode::New,
620                on_unavailable: AcpFallback::Fail,
621                output_schema: None,
622                max_turns: None,
623                timeout_secs: None,
624                input_from: vec![],
625                capabilities: None,
626                working_dir: None,
627                mcp_servers: vec![],
628                enable_web_search: false,
629            },
630            retry_policy: None,
631        }
632    }
633
634    /// Create a new SubWorkflow node builder.
635    pub fn sub_workflow(id: impl Into<String>) -> NodeBuilder {
636        NodeBuilder {
637            id: id.into(),
638            kind: NodeBuilderKind::SubWorkflow {
639                workflow: None,
640                input_from: vec![],
641                foreach: None,
642                parallel: false,
643                max_parallel: None,
644                on_item_error: ForeachErrorMode::default(),
645            },
646            retry_policy: None,
647        }
648    }
649}
650
651enum NodeBuilderKind {
652    Agent {
653        name: Option<String>,
654        system_prompt: Option<String>,
655        tools: Vec<String>,
656        input_from: Vec<String>,
657        output_schema: Option<Value>,
658        skills: Vec<String>,
659        max_turns: Option<u32>,
660    },
661    Human {
662        prompt: Option<String>,
663        timeout_secs: Option<u64>,
664        options: Vec<String>,
665        timeout_action: Option<String>,
666    },
667    Condition {
668        input_from: Vec<String>,
669        branches: Vec<ConditionBranch>,
670        default_goto: Option<String>,
671    },
672    Loop {
673        body: Vec<String>,
674        max_iterations: usize,
675        until: Option<ConditionBranch>,
676    },
677    Transform {
678        input_from: Vec<String>,
679        transform_fn: Option<TransformFn>,
680        async_fn: Option<AsyncTransformFn>,
681    },
682    SubWorkflow {
683        workflow: Option<Workflow>,
684        input_from: Vec<String>,
685        foreach: Option<String>,
686        parallel: bool,
687        max_parallel: Option<usize>,
688        on_item_error: ForeachErrorMode,
689    },
690    AcpAgent {
691        agent_command: Option<String>,
692        skill: Option<AgencySkill>,
693        context_files: Vec<PathBuf>,
694        task_template: Option<String>,
695        session: AcpSessionMode,
696        on_unavailable: AcpFallback,
697        output_schema: Option<Value>,
698        max_turns: Option<u32>,
699        timeout_secs: Option<u64>,
700        input_from: Vec<String>,
701        capabilities: Option<AcpCapabilities>,
702        working_dir: Option<PathBuf>,
703        mcp_servers: Vec<McpServerConfig>,
704        enable_web_search: bool,
705    },
706}
707
708pub struct NodeBuilder {
709    id: String,
710    kind: NodeBuilderKind,
711    retry_policy: Option<RetryPolicy>,
712}
713
714impl NodeBuilder {
715    // --- Agent methods ---
716
717    pub fn name(mut self, name: impl Into<String>) -> Self {
718        if let NodeBuilderKind::Agent {
719            name: ref mut n, ..
720        } = self.kind
721        {
722            *n = Some(name.into());
723        }
724        self
725    }
726
727    pub fn system_prompt(mut self, prompt: impl Into<String>) -> Self {
728        if let NodeBuilderKind::Agent {
729            system_prompt: ref mut sp,
730            ..
731        } = self.kind
732        {
733            *sp = Some(prompt.into());
734        }
735        self
736    }
737
738    pub fn tools(mut self, tools: impl IntoIterator<Item = impl Into<String>>) -> Self {
739        if let NodeBuilderKind::Agent {
740            tools: ref mut t, ..
741        } = self.kind
742        {
743            *t = tools.into_iter().map(|s| s.into()).collect();
744        }
745        self
746    }
747
748    pub fn input_from(mut self, inputs: impl IntoInputIds) -> Self {
749        let collected: Vec<String> = inputs.into_input_ids();
750        match self.kind {
751            NodeBuilderKind::Agent {
752                ref mut input_from, ..
753            } => {
754                *input_from = collected;
755            }
756            NodeBuilderKind::Transform {
757                ref mut input_from, ..
758            } => {
759                *input_from = collected;
760            }
761            NodeBuilderKind::SubWorkflow {
762                ref mut input_from, ..
763            } => {
764                *input_from = collected;
765            }
766            NodeBuilderKind::AcpAgent {
767                ref mut input_from, ..
768            } => {
769                *input_from = collected;
770            }
771            _ => {}
772        }
773        self
774    }
775
776    pub fn output_schema(mut self, schema: Value) -> Self {
777        match &mut self.kind {
778            NodeBuilderKind::Agent { output_schema, .. }
779            | NodeBuilderKind::AcpAgent { output_schema, .. } => {
780                *output_schema = Some(schema);
781            }
782            _ => {}
783        }
784        self
785    }
786
787    /// Add a single skill (pushes to the skills vec). Can be called multiple times.
788    pub fn skill(mut self, skill: impl Into<String>) -> Self {
789        if let NodeBuilderKind::Agent {
790            skills: ref mut s, ..
791        } = self.kind
792        {
793            s.push(skill.into());
794        }
795        self
796    }
797
798    /// Set multiple skills at once (replaces any previously added skills).
799    pub fn skills(mut self, skills: impl IntoIterator<Item = impl Into<String>>) -> Self {
800        if let NodeBuilderKind::Agent {
801            skills: ref mut s, ..
802        } = self.kind
803        {
804            *s = skills.into_iter().map(|v| v.into()).collect();
805        }
806        self
807    }
808
809    // --- Human methods ---
810
811    pub fn prompt(mut self, prompt: impl Into<String>) -> Self {
812        if let NodeBuilderKind::Human {
813            prompt: ref mut p, ..
814        } = self.kind
815        {
816            *p = Some(prompt.into());
817        }
818        self
819    }
820
821    pub fn timeout_secs(mut self, secs: u64) -> Self {
822        if let NodeBuilderKind::Human {
823            timeout_secs: ref mut ts,
824            ..
825        } = self.kind
826        {
827            *ts = Some(secs);
828        }
829        self
830    }
831
832    pub fn options(mut self, options: impl IntoIterator<Item = impl Into<String>>) -> Self {
833        if let NodeBuilderKind::Human {
834            options: ref mut o, ..
835        } = self.kind
836        {
837            *o = options.into_iter().map(|s| s.into()).collect();
838        }
839        self
840    }
841
842    pub fn timeout_action(mut self, action: impl Into<String>) -> Self {
843        if let NodeBuilderKind::Human {
844            timeout_action: ref mut ta,
845            ..
846        } = self.kind
847        {
848            *ta = Some(action.into());
849        }
850        self
851    }
852
853    // --- Condition methods ---
854
855    /// Add an upstream input source for condition evaluation.
856    pub fn condition_input_from(mut self, inputs: impl IntoInputIds) -> Self {
857        if let NodeBuilderKind::Condition {
858            input_from: ref mut i,
859            ..
860        } = self.kind
861        {
862            *i = inputs.into_input_ids();
863        }
864        self
865    }
866
867    /// Add a condition branch.
868    pub fn branch(mut self, branch: ConditionBranch) -> Self {
869        if let NodeBuilderKind::Condition {
870            branches: ref mut b,
871            ..
872        } = self.kind
873        {
874            b.push(branch);
875        }
876        self
877    }
878
879    /// Set the default goto target when no branch matches.
880    pub fn default_goto(mut self, target: impl Into<String>) -> Self {
881        if let NodeBuilderKind::Condition {
882            default_goto: ref mut d,
883            ..
884        } = self.kind
885        {
886            *d = Some(target.into());
887        }
888        self
889    }
890
891    // --- Loop methods ---
892
893    /// Set the body node IDs for the loop.
894    pub fn body(mut self, nodes: impl IntoIterator<Item = impl Into<String>>) -> Self {
895        if let NodeBuilderKind::Loop {
896            body: ref mut b, ..
897        } = self.kind
898        {
899            *b = nodes.into_iter().map(|s| s.into()).collect();
900        }
901        self
902    }
903
904    /// Set the maximum iterations for the loop.
905    pub fn max_iterations(mut self, max: usize) -> Self {
906        if let NodeBuilderKind::Loop {
907            max_iterations: ref mut m,
908            ..
909        } = self.kind
910        {
911            *m = max;
912        }
913        self
914    }
915
916    /// Set the until condition for the loop.
917    pub fn until(mut self, condition: ConditionBranch) -> Self {
918        if let NodeBuilderKind::Loop {
919            until: ref mut u, ..
920        } = self.kind
921        {
922            *u = Some(condition);
923        }
924        self
925    }
926
927    // --- Transform methods ---
928
929    /// Set the input sources for a Transform node.
930    pub fn transform_input_from(mut self, inputs: impl IntoInputIds) -> Self {
931        if let NodeBuilderKind::Transform {
932            input_from: ref mut i,
933            ..
934        } = self.kind
935        {
936            *i = inputs.into_input_ids();
937        }
938        self
939    }
940
941    /// Set the transform function for a Transform node.
942    pub fn transform_fn(
943        mut self,
944        f: impl Fn(&Value) -> std::result::Result<Value, String> + Send + Sync + 'static,
945    ) -> Self {
946        if let NodeBuilderKind::Transform {
947            transform_fn: ref mut tf,
948            ..
949        } = self.kind
950        {
951            *tf = Some(Arc::new(f));
952        }
953        self
954    }
955
956    /// Set an async transform function. When set, the runtime uses this instead of the sync transform_fn.
957    /// Useful for HTTP calls, file I/O, or any async operation.
958    ///
959    /// # Example
960    ///
961    /// ```rust,ignore
962    /// Node::transform("fetch")
963    ///     .transform_input_from(["upstream"])
964    ///     .async_transform_fn(|input| async move {
965    ///         Ok(serde_json::json!({"result": "ok"}))
966    ///     })
967    ///     .build()
968    /// ```
969    pub fn async_transform_fn<F, Fut>(mut self, f: F) -> Self
970    where
971        F: Fn(Value) -> Fut + Send + Sync + 'static,
972        Fut: std::future::Future<Output = std::result::Result<Value, String>> + Send + 'static,
973    {
974        if let NodeBuilderKind::Transform {
975            ref mut async_fn, ..
976        } = self.kind
977        {
978            *async_fn = Some(Arc::new(move |input: Value| {
979                Box::pin(f(input))
980                    as std::pin::Pin<
981                        Box<
982                            dyn std::future::Future<Output = std::result::Result<Value, String>>
983                                + Send,
984                        >,
985                    >
986            }));
987        }
988        self
989    }
990
991    // --- AcpAgent methods ---
992
993    /// Set the agent command for an AcpAgent node (e.g., "claude", "aider").
994    pub fn agent_command(mut self, cmd: impl Into<String>) -> Self {
995        if let NodeBuilderKind::AcpAgent {
996            agent_command: ref mut ac,
997            ..
998        } = self.kind
999        {
1000            *ac = Some(cmd.into());
1001        }
1002        self
1003    }
1004
1005    /// Set the skill for an AcpAgent node.
1006    pub fn acp_skill(mut self, s: AgencySkill) -> Self {
1007        if let NodeBuilderKind::AcpAgent {
1008            skill: ref mut sk, ..
1009        } = self.kind
1010        {
1011            *sk = Some(s);
1012        }
1013        self
1014    }
1015
1016    /// Set context files for an AcpAgent node.
1017    pub fn context_files(mut self, files: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
1018        if let NodeBuilderKind::AcpAgent {
1019            context_files: ref mut cf,
1020            ..
1021        } = self.kind
1022        {
1023            *cf = files.into_iter().map(|f| f.into()).collect();
1024        }
1025        self
1026    }
1027
1028    /// Set the task template for an AcpAgent node.
1029    pub fn task_template(mut self, template: impl Into<String>) -> Self {
1030        if let NodeBuilderKind::AcpAgent {
1031            task_template: ref mut tt,
1032            ..
1033        } = self.kind
1034        {
1035            *tt = Some(template.into());
1036        }
1037        self
1038    }
1039
1040    /// Set the session mode for an AcpAgent node.
1041    pub fn session(mut self, mode: AcpSessionMode) -> Self {
1042        if let NodeBuilderKind::AcpAgent {
1043            session: ref mut s, ..
1044        } = self.kind
1045        {
1046            *s = mode;
1047        }
1048        self
1049    }
1050
1051    /// Set the fallback behavior when the ACP agent is unavailable.
1052    pub fn on_unavailable(mut self, fallback: AcpFallback) -> Self {
1053        if let NodeBuilderKind::AcpAgent {
1054            on_unavailable: ref mut ou,
1055            ..
1056        } = self.kind
1057        {
1058            *ou = fallback;
1059        }
1060        self
1061    }
1062
1063    /// Set the maximum turns for an Agent or AcpAgent node.
1064    pub fn max_turns(mut self, turns: u32) -> Self {
1065        match self.kind {
1066            NodeBuilderKind::Agent {
1067                ref mut max_turns, ..
1068            } => {
1069                *max_turns = Some(turns);
1070            }
1071            NodeBuilderKind::AcpAgent {
1072                max_turns: ref mut mt,
1073                ..
1074            } => {
1075                *mt = Some(turns);
1076            }
1077            _ => {}
1078        }
1079        self
1080    }
1081
1082    /// Set the capabilities for an AcpAgent node.
1083    pub fn capabilities(mut self, caps: AcpCapabilities) -> Self {
1084        if let NodeBuilderKind::AcpAgent {
1085            capabilities: ref mut c,
1086            ..
1087        } = self.kind
1088        {
1089            *c = Some(caps);
1090        }
1091        self
1092    }
1093
1094    /// Set the working directory for an AcpAgent node.
1095    pub fn working_dir(mut self, path: impl Into<PathBuf>) -> Self {
1096        if let NodeBuilderKind::AcpAgent {
1097            working_dir: ref mut wd,
1098            ..
1099        } = self.kind
1100        {
1101            *wd = Some(path.into());
1102        }
1103        self
1104    }
1105
1106    /// Set MCP servers for an AcpAgent node.
1107    pub fn mcp_servers(mut self, servers: Vec<McpServerConfig>) -> Self {
1108        if let NodeBuilderKind::AcpAgent {
1109            mcp_servers: ref mut ms,
1110            ..
1111        } = self.kind
1112        {
1113            *ms = servers;
1114        }
1115        self
1116    }
1117
1118    /// Enable web search capability for an AcpAgent node.
1119    pub fn enable_web_search(mut self, enable: bool) -> Self {
1120        if let NodeBuilderKind::AcpAgent {
1121            enable_web_search: ref mut ews,
1122            ..
1123        } = self.kind
1124        {
1125            *ews = enable;
1126        }
1127        self
1128    }
1129
1130    /// Set the timeout in seconds for an AcpAgent node.
1131    pub fn acp_timeout_secs(mut self, secs: u64) -> Self {
1132        if let NodeBuilderKind::AcpAgent {
1133            timeout_secs: ref mut ts,
1134            ..
1135        } = self.kind
1136        {
1137            *ts = Some(secs);
1138        }
1139        self
1140    }
1141
1142    // --- SubWorkflow methods ---
1143
1144    /// Set the nested workflow for a SubWorkflow node.
1145    pub fn workflow(mut self, workflow: Workflow) -> Self {
1146        if let NodeBuilderKind::SubWorkflow {
1147            workflow: ref mut w,
1148            ..
1149        } = self.kind
1150        {
1151            *w = Some(workflow);
1152        }
1153        self
1154    }
1155
1156    /// Set the foreach JSON pointer path for a SubWorkflow node.
1157    /// When set, the sub-workflow is executed once per element in the extracted array.
1158    pub fn foreach(mut self, path: impl Into<String>) -> Self {
1159        if let NodeBuilderKind::SubWorkflow {
1160            foreach: ref mut f, ..
1161        } = self.kind
1162        {
1163            *f = Some(path.into());
1164        }
1165        self
1166    }
1167
1168    /// Set whether foreach iterations run in parallel for a SubWorkflow node.
1169    pub fn parallel(mut self, parallel: bool) -> Self {
1170        if let NodeBuilderKind::SubWorkflow {
1171            parallel: ref mut p,
1172            ..
1173        } = self.kind
1174        {
1175            *p = parallel;
1176        }
1177        self
1178    }
1179
1180    /// Set the maximum number of concurrent foreach iterations for a SubWorkflow node.
1181    pub fn max_parallel(mut self, max: usize) -> Self {
1182        if let NodeBuilderKind::SubWorkflow {
1183            max_parallel: ref mut mp,
1184            ..
1185        } = self.kind
1186        {
1187            *mp = Some(max);
1188        }
1189        self
1190    }
1191
1192    /// Set the error handling mode for individual foreach items.
1193    pub fn on_item_error(mut self, mode: ForeachErrorMode) -> Self {
1194        if let NodeBuilderKind::SubWorkflow {
1195            on_item_error: ref mut oie,
1196            ..
1197        } = self.kind
1198        {
1199            *oie = mode;
1200        }
1201        self
1202    }
1203
1204    /// Set a retry policy for this node.
1205    pub fn retry_policy(mut self, policy: RetryPolicy) -> Self {
1206        self.retry_policy = Some(policy);
1207        self
1208    }
1209
1210    /// Set retry parameters directly (convenience method).
1211    pub fn retry(
1212        mut self,
1213        max_retries: u32,
1214        backoff_ms: u64,
1215        backoff_multiplier: f64,
1216        on_failure: FailureMode,
1217    ) -> Self {
1218        self.retry_policy = Some(RetryPolicy {
1219            max_retries,
1220            backoff_ms,
1221            backoff_multiplier,
1222            on_failure,
1223        });
1224        self
1225    }
1226
1227    pub fn build(self) -> Node {
1228        let kind = match self.kind {
1229            NodeBuilderKind::Agent {
1230                name,
1231                system_prompt,
1232                tools,
1233                input_from,
1234                output_schema,
1235                skills,
1236                max_turns,
1237            } => NodeKind::Agent(AgentConfig {
1238                name: name.unwrap_or_else(|| self.id.clone()),
1239                system_prompt: system_prompt.unwrap_or_default(),
1240                tools,
1241                input_from,
1242                output_schema,
1243                skills,
1244                tool_definitions: Vec::new(),
1245                max_turns,
1246            }),
1247            NodeBuilderKind::Human {
1248                prompt,
1249                timeout_secs,
1250                options,
1251                timeout_action,
1252            } => NodeKind::Human(HumanConfig {
1253                prompt: prompt.unwrap_or_else(|| "Awaiting human input".to_string()),
1254                timeout_secs,
1255                options,
1256                timeout_action,
1257            }),
1258            NodeBuilderKind::Condition {
1259                input_from,
1260                branches,
1261                default_goto,
1262            } => NodeKind::Condition(ConditionConfig {
1263                input_from,
1264                branches,
1265                default_goto,
1266            }),
1267            NodeBuilderKind::Loop {
1268                body,
1269                max_iterations,
1270                until,
1271            } => NodeKind::Loop(LoopConfig {
1272                body,
1273                max_iterations,
1274                until,
1275            }),
1276            NodeBuilderKind::Transform {
1277                input_from,
1278                transform_fn,
1279                async_fn,
1280            } => {
1281                let noop_fn: TransformFn = Arc::new(|_| Ok(Value::Null));
1282                NodeKind::Transform(TransformConfig {
1283                    input_from,
1284                    transform_fn: transform_fn.unwrap_or(noop_fn),
1285                    async_fn,
1286                    script: None, // Script is set via YAML loader, not builder
1287                })
1288            }
1289            NodeBuilderKind::SubWorkflow {
1290                workflow,
1291                input_from,
1292                foreach,
1293                parallel,
1294                max_parallel,
1295                on_item_error,
1296            } => NodeKind::SubWorkflow(SubWorkflowConfig {
1297                workflow: workflow.expect("sub_workflow node requires a workflow"),
1298                input_from,
1299                foreach,
1300                parallel,
1301                max_parallel,
1302                on_item_error,
1303            }),
1304            NodeBuilderKind::AcpAgent {
1305                agent_command,
1306                skill,
1307                context_files,
1308                task_template,
1309                session,
1310                on_unavailable,
1311                output_schema,
1312                max_turns,
1313                timeout_secs,
1314                input_from,
1315                capabilities,
1316                working_dir,
1317                mcp_servers,
1318                enable_web_search,
1319            } => NodeKind::AcpAgent(AcpNodeConfig {
1320                agent_command: agent_command.unwrap_or_else(|| "claude".to_string()),
1321                skill,
1322                context_files,
1323                task_template: task_template.unwrap_or_else(|| "{{input}}".to_string()),
1324                session,
1325                on_unavailable,
1326                output_schema,
1327                max_turns,
1328                timeout_secs,
1329                input_from,
1330                capabilities,
1331                working_dir,
1332                mcp_servers,
1333                enable_web_search,
1334            }),
1335        };
1336
1337        Node {
1338            id: self.id,
1339            kind,
1340            retry_policy: self.retry_policy,
1341        }
1342    }
1343}