Skip to main content

simple_agents_workflow/
runtime.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use serde_json::{json, Map, Value};
7use simple_agent_type::message::Message;
8use simple_agent_type::request::CompletionRequest;
9use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
10use thiserror::Error;
11use tokio::time::timeout;
12
13use crate::checkpoint::WorkflowCheckpoint;
14use crate::expressions;
15use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
16use crate::recorder::{TraceRecordError, TraceRecorder};
17use crate::replay::{replay_trace, ReplayError, ReplayReport};
18use crate::scheduler::DagScheduler;
19use crate::trace::{TraceTerminalStatus, WorkflowTrace, WorkflowTraceMetadata};
20use crate::validation::{validate_and_normalize, ValidationErrors};
21
22/// Runtime configuration for workflow execution.
23#[derive(Debug, Clone)]
24pub struct WorkflowRuntimeOptions {
25    /// Maximum number of node steps before aborting execution.
26    pub max_steps: usize,
27    /// Validate and normalize workflow before every run.
28    pub validate_before_run: bool,
29    /// Retry and timeout policy for LLM nodes.
30    pub llm_node_policy: NodeExecutionPolicy,
31    /// Retry and timeout policy for tool nodes.
32    pub tool_node_policy: NodeExecutionPolicy,
33    /// Enable deterministic trace recording for runtime events.
34    pub enable_trace_recording: bool,
35    /// Optional replay validation mode for recorded traces.
36    pub replay_mode: WorkflowReplayMode,
37    /// Global scheduler bound for node-level concurrent fan-out.
38    pub scheduler_max_in_flight: usize,
39    /// Named subgraph registry used by `subgraph` nodes.
40    pub subgraph_registry: BTreeMap<String, WorkflowDefinition>,
41    /// Runtime guardrails for expression and fan-out resource usage.
42    pub security_limits: RuntimeSecurityLimits,
43}
44
45/// Runtime-enforced security and resource limits.
46#[derive(Debug, Clone)]
47pub struct RuntimeSecurityLimits {
48    /// Maximum serialized bytes allowed for one expression evaluation scope.
49    pub max_expression_scope_bytes: usize,
50    /// Maximum array size accepted by `map` nodes.
51    pub max_map_items: usize,
52    /// Maximum branch count accepted by `parallel` nodes.
53    pub max_parallel_branches: usize,
54    /// Maximum array size accepted by `filter` nodes.
55    pub max_filter_items: usize,
56}
57
58/// Runtime replay behavior for deterministic runs.
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum WorkflowReplayMode {
61    /// Trace replay validation is disabled.
62    Disabled,
63    /// Validate the recorded trace via `replay_trace` before success return.
64    ValidateRecordedTrace,
65}
66
67/// Retry and timeout policy for runtime-owned node execution.
68#[derive(Debug, Clone, Default)]
69pub struct NodeExecutionPolicy {
70    /// Per-attempt timeout. `None` disables timeout enforcement.
71    pub timeout: Option<Duration>,
72    /// Number of retries after the first failed attempt.
73    pub max_retries: usize,
74}
75
76impl Default for WorkflowRuntimeOptions {
77    fn default() -> Self {
78        Self {
79            max_steps: 256,
80            validate_before_run: true,
81            llm_node_policy: NodeExecutionPolicy::default(),
82            tool_node_policy: NodeExecutionPolicy::default(),
83            enable_trace_recording: true,
84            replay_mode: WorkflowReplayMode::Disabled,
85            scheduler_max_in_flight: 8,
86            subgraph_registry: BTreeMap::new(),
87            security_limits: RuntimeSecurityLimits::default(),
88        }
89    }
90}
91
92impl Default for RuntimeSecurityLimits {
93    fn default() -> Self {
94        Self {
95            max_expression_scope_bytes: 128 * 1024,
96            max_map_items: 4096,
97            max_parallel_branches: 128,
98            max_filter_items: 8192,
99        }
100    }
101}
102
103/// Cooperative cancellation interface for workflow runs.
104pub trait CancellationSignal: Send + Sync {
105    /// Returns true if execution should stop.
106    fn is_cancelled(&self) -> bool;
107}
108
109impl CancellationSignal for AtomicBool {
110    fn is_cancelled(&self) -> bool {
111        self.load(Ordering::Relaxed)
112    }
113}
114
115impl CancellationSignal for bool {
116    fn is_cancelled(&self) -> bool {
117        *self
118    }
119}
120
121/// Input payload passed to the LLM executor adapter.
122#[derive(Debug, Clone, PartialEq)]
123pub struct LlmExecutionInput {
124    /// Current workflow node id.
125    pub node_id: String,
126    /// Requested model.
127    pub model: String,
128    /// Prompt configured on the workflow node.
129    pub prompt: String,
130    /// Deterministic scoped context for this node execution.
131    pub scoped_input: Value,
132}
133
134/// LLM output returned to the runtime.
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct LlmExecutionOutput {
137    /// Assistant text returned by the model.
138    pub content: String,
139}
140
141/// Typed LLM adapter errors.
142#[derive(Debug, Error, Clone, PartialEq, Eq)]
143pub enum LlmExecutionError {
144    /// CompletionRequest build/validation failure.
145    #[error("invalid completion request: {0}")]
146    InvalidRequest(String),
147    /// Core client call failed.
148    #[error("llm client execution failed: {0}")]
149    Client(String),
150    /// Unexpected completion mode returned by client.
151    #[error("unexpected completion outcome: {0}")]
152    UnexpectedOutcome(&'static str),
153    /// Response did not include assistant content.
154    #[error("llm response had no content")]
155    EmptyResponse,
156}
157
158/// Async runtime adapter for LLM calls.
159#[async_trait]
160pub trait LlmExecutor: Send + Sync {
161    /// Executes one workflow LLM node.
162    async fn execute(
163        &self,
164        input: LlmExecutionInput,
165    ) -> Result<LlmExecutionOutput, LlmExecutionError>;
166}
167
168#[async_trait]
169impl LlmExecutor for SimpleAgentsClient {
170    async fn execute(
171        &self,
172        input: LlmExecutionInput,
173    ) -> Result<LlmExecutionOutput, LlmExecutionError> {
174        let user_prompt = build_prompt_with_scope(&input.prompt, &input.scoped_input);
175        let request = CompletionRequest::builder()
176            .model(input.model)
177            .message(Message::user(user_prompt))
178            .build()
179            .map_err(|error| LlmExecutionError::InvalidRequest(error.to_string()))?;
180
181        let outcome = self
182            .complete(&request, CompletionOptions::default())
183            .await
184            .map_err(|error| LlmExecutionError::Client(error.to_string()))?;
185
186        let response = match outcome {
187            CompletionOutcome::Response(response) => response,
188            CompletionOutcome::Stream(_) => {
189                return Err(LlmExecutionError::UnexpectedOutcome("stream"));
190            }
191            CompletionOutcome::HealedJson(_) => {
192                return Err(LlmExecutionError::UnexpectedOutcome("healed_json"));
193            }
194            CompletionOutcome::CoercedSchema(_) => {
195                return Err(LlmExecutionError::UnexpectedOutcome("coerced_schema"));
196            }
197        };
198
199        let content = response
200            .content()
201            .ok_or(LlmExecutionError::EmptyResponse)?
202            .to_string();
203
204        Ok(LlmExecutionOutput { content })
205    }
206}
207
208/// Input payload for host-provided tool execution.
209#[derive(Debug, Clone, PartialEq)]
210pub struct ToolExecutionInput {
211    /// Current workflow node id.
212    pub node_id: String,
213    /// Tool name declared by the workflow.
214    pub tool: String,
215    /// Static node input payload.
216    pub input: Value,
217    /// Deterministic scoped context for this node execution.
218    pub scoped_input: Value,
219}
220
221/// Typed tool execution errors.
222#[derive(Debug, Error, Clone, PartialEq, Eq)]
223pub enum ToolExecutionError {
224    /// Tool implementation is unavailable in the host runtime.
225    #[error("tool handler not found: {tool}")]
226    NotFound {
227        /// Missing tool name.
228        tool: String,
229    },
230    /// Tool returned an execution failure.
231    #[error("tool execution failed: {0}")]
232    Failed(String),
233}
234
235/// Async host tool executor surface.
236#[async_trait]
237pub trait ToolExecutor: Send + Sync {
238    /// Executes one workflow tool node.
239    async fn execute_tool(&self, input: ToolExecutionInput) -> Result<Value, ToolExecutionError>;
240}
241
242/// Node-level execution result.
243#[derive(Debug, Clone, PartialEq)]
244pub struct NodeExecution {
245    /// Zero-based step index.
246    pub step: usize,
247    /// Executed node id.
248    pub node_id: String,
249    /// Structured execution payload.
250    pub data: NodeExecutionData,
251}
252
253/// Structured result data emitted for each node.
254#[derive(Debug, Clone, PartialEq)]
255pub enum NodeExecutionData {
256    /// Start node transition.
257    Start {
258        /// Next node id.
259        next: String,
260    },
261    /// LLM node output.
262    Llm {
263        /// Model used.
264        model: String,
265        /// Assistant output text.
266        output: String,
267        /// Next node id.
268        next: String,
269    },
270    /// Tool node output.
271    Tool {
272        /// Tool name.
273        tool: String,
274        /// Tool output JSON payload.
275        output: Value,
276        /// Next node id.
277        next: String,
278    },
279    /// Condition node decision.
280    Condition {
281        /// Original expression.
282        expression: String,
283        /// Evaluated bool.
284        evaluated: bool,
285        /// Chosen next node id.
286        next: String,
287    },
288    /// Debounce gate decision.
289    Debounce {
290        /// Resolved debounce key.
291        key: String,
292        /// Whether the event was suppressed.
293        suppressed: bool,
294        /// Chosen next node id.
295        next: String,
296    },
297    /// Throttle gate decision.
298    Throttle {
299        /// Resolved throttle key.
300        key: String,
301        /// Whether the event was throttled.
302        throttled: bool,
303        /// Chosen next node id.
304        next: String,
305    },
306    /// Explicit retry/compensation execution result.
307    RetryCompensate {
308        /// Primary tool name.
309        tool: String,
310        /// Total primary attempts executed.
311        attempts: usize,
312        /// Whether compensation path was used.
313        compensated: bool,
314        /// Node output payload.
315        output: Value,
316        /// Chosen next node id.
317        next: String,
318    },
319    /// Human decision result.
320    HumanInTheLoop {
321        /// True when approved.
322        approved: bool,
323        /// Optional human payload routed through the node.
324        response: Value,
325        /// Chosen next node id.
326        next: String,
327    },
328    /// Cache read result.
329    CacheRead {
330        /// Cache key.
331        key: String,
332        /// True when key was found.
333        hit: bool,
334        /// Returned value (null on miss).
335        value: Value,
336        /// Chosen next node id.
337        next: String,
338    },
339    /// Cache write result.
340    CacheWrite {
341        /// Cache key.
342        key: String,
343        /// Stored value.
344        value: Value,
345        /// Next node id.
346        next: String,
347    },
348    /// Event trigger evaluation result.
349    EventTrigger {
350        /// Expected event name.
351        event: String,
352        /// True when input event matched.
353        matched: bool,
354        /// Chosen next node id.
355        next: String,
356    },
357    /// Router/selector decision.
358    Router {
359        /// Selected route node id.
360        selected: String,
361        /// Chosen next node id.
362        next: String,
363    },
364    /// Transform node output.
365    Transform {
366        /// Original transform expression.
367        expression: String,
368        /// Evaluated output payload.
369        output: Value,
370        /// Next node id.
371        next: String,
372    },
373    /// Loop node decision.
374    Loop {
375        /// Original loop condition.
376        condition: String,
377        /// Evaluated bool.
378        evaluated: bool,
379        /// Current iteration when entering body. Zero when exiting loop.
380        iteration: u32,
381        /// Chosen next node id.
382        next: String,
383    },
384    /// Subgraph execution result.
385    Subgraph {
386        /// Subgraph registry key.
387        graph: String,
388        /// Subgraph terminal node id.
389        terminal_node_id: String,
390        /// Aggregated subgraph node outputs.
391        output: Value,
392        /// Next node id.
393        next: String,
394    },
395    /// Batch extraction result.
396    Batch {
397        /// Source path used for extraction.
398        items_path: String,
399        /// Number of extracted items.
400        item_count: usize,
401        /// Next node id.
402        next: String,
403    },
404    /// Filter evaluation result.
405    Filter {
406        /// Source path used for extraction.
407        items_path: String,
408        /// Filter predicate expression.
409        expression: String,
410        /// Number of retained items.
411        kept: usize,
412        /// Next node id.
413        next: String,
414    },
415    /// Parallel node aggregate result.
416    Parallel {
417        /// Branch node ids executed by this node.
418        branches: Vec<String>,
419        /// Branch outputs keyed by branch node id.
420        outputs: BTreeMap<String, Value>,
421        /// Next node id.
422        next: String,
423    },
424    /// Merge node aggregate output.
425    Merge {
426        /// Merge policy used.
427        policy: MergePolicy,
428        /// Source node ids consumed.
429        sources: Vec<String>,
430        /// Merged value.
431        output: Value,
432        /// Next node id.
433        next: String,
434    },
435    /// Map node output.
436    Map {
437        /// Number of input items mapped.
438        item_count: usize,
439        /// Collected mapped values in source order.
440        output: Value,
441        /// Next node id.
442        next: String,
443    },
444    /// Reduce node output.
445    Reduce {
446        /// Reduce operation executed.
447        operation: ReduceOperation,
448        /// Reduced result.
449        output: Value,
450        /// Next node id.
451        next: String,
452    },
453    /// End node reached.
454    End,
455}
456
457/// Runtime event stream payload for trace consumers.
458#[derive(Debug, Clone, PartialEq)]
459pub struct WorkflowEvent {
460    /// Zero-based step index.
461    pub step: usize,
462    /// Node id associated with this event.
463    pub node_id: String,
464    /// Event payload.
465    pub kind: WorkflowEventKind,
466}
467
468/// Event kinds emitted by the runtime.
469#[derive(Debug, Clone, PartialEq)]
470pub enum WorkflowEventKind {
471    /// Node execution started.
472    NodeStarted,
473    /// Node execution completed successfully.
474    NodeCompleted {
475        /// Node execution payload.
476        data: NodeExecutionData,
477    },
478    /// Node execution failed.
479    NodeFailed {
480        /// Error message.
481        message: String,
482    },
483}
484
485/// Final workflow execution report.
486#[derive(Debug, Clone, PartialEq)]
487pub struct WorkflowRunResult {
488    /// Workflow name.
489    pub workflow_name: String,
490    /// Final terminal node id.
491    pub terminal_node_id: String,
492    /// Ordered node execution records.
493    pub node_executions: Vec<NodeExecution>,
494    /// Ordered runtime events.
495    pub events: Vec<WorkflowEvent>,
496    /// Retry diagnostics captured during node execution.
497    pub retry_events: Vec<WorkflowRetryEvent>,
498    /// Node output map keyed by node id.
499    pub node_outputs: BTreeMap<String, Value>,
500    /// Optional deterministic trace captured during this run.
501    pub trace: Option<WorkflowTrace>,
502    /// Replay validation report when replay mode is enabled.
503    pub replay_report: Option<ReplayReport>,
504}
505
506/// Captures one retry decision and the reason it occurred.
507#[derive(Debug, Clone, PartialEq, Eq)]
508pub struct WorkflowRetryEvent {
509    /// Zero-based step index for the active node.
510    pub step: usize,
511    /// Node id that will be retried.
512    pub node_id: String,
513    /// Operation category (`llm` or `tool`).
514    pub operation: String,
515    /// Attempt that failed and triggered the retry.
516    pub failed_attempt: usize,
517    /// Human-readable retry reason.
518    pub reason: String,
519}
520
521/// Scope access capabilities used by runtime nodes.
522#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523enum ScopeCapability {
524    LlmRead,
525    ToolRead,
526    ConditionRead,
527    LlmWrite,
528    ToolWrite,
529    ConditionWrite,
530    MapRead,
531    MapWrite,
532    ReduceWrite,
533}
534
535impl ScopeCapability {
536    fn as_str(self) -> &'static str {
537        match self {
538            Self::LlmRead => "llm_read",
539            Self::ToolRead => "tool_read",
540            Self::ConditionRead => "condition_read",
541            Self::LlmWrite => "llm_write",
542            Self::ToolWrite => "tool_write",
543            Self::ConditionWrite => "condition_write",
544            Self::MapRead => "map_read",
545            Self::MapWrite => "map_write",
546            Self::ReduceWrite => "reduce_write",
547        }
548    }
549}
550
551/// Typed read/write boundary failures for scoped runtime state.
552#[derive(Debug, Clone, PartialEq, Eq, Error)]
553pub enum ScopeAccessError {
554    /// Node attempted to read scope with an invalid capability.
555    #[error("scope read denied for capability '{capability}'")]
556    ReadDenied {
557        /// Capability used for the read.
558        capability: &'static str,
559    },
560    /// Node attempted to write scope with an invalid capability.
561    #[error("scope write denied for capability '{capability}'")]
562    WriteDenied {
563        /// Capability used for the write.
564        capability: &'static str,
565    },
566}
567
568/// Runtime failures.
569#[derive(Debug, Error)]
570pub enum WorkflowRuntimeError {
571    /// Workflow failed structural validation.
572    #[error(transparent)]
573    Validation(#[from] ValidationErrors),
574    /// Workflow has no start node.
575    #[error("workflow has no start node")]
576    MissingStartNode,
577    /// Current node id not found in node index.
578    #[error("node not found: {node_id}")]
579    NodeNotFound {
580        /// Missing node id.
581        node_id: String,
582    },
583    /// Loop protection guard triggered.
584    #[error("workflow exceeded max step limit ({max_steps})")]
585    StepLimitExceeded {
586        /// Configured max steps.
587        max_steps: usize,
588    },
589    /// Execution cancelled by caller.
590    #[error("workflow execution cancelled")]
591    Cancelled,
592    /// LLM node failed.
593    #[error("llm node '{node_id}' failed: {source}")]
594    Llm {
595        /// Failing node id.
596        node_id: String,
597        /// Source error.
598        source: LlmExecutionError,
599    },
600    /// Tool node failed.
601    #[error("tool node '{node_id}' failed: {source}")]
602    Tool {
603        /// Failing node id.
604        node_id: String,
605        /// Source error.
606        source: ToolExecutionError,
607    },
608    /// LLM node exhausted all retry attempts.
609    #[error("llm node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
610    LlmRetryExhausted {
611        /// Failing node id.
612        node_id: String,
613        /// Number of attempts made.
614        attempts: usize,
615        /// Last attempt error.
616        last_error: LlmExecutionError,
617    },
618    /// Tool node exhausted all retry attempts.
619    #[error("tool node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
620    ToolRetryExhausted {
621        /// Failing node id.
622        node_id: String,
623        /// Number of attempts made.
624        attempts: usize,
625        /// Last attempt error.
626        last_error: ToolExecutionError,
627    },
628    /// LLM node timed out across all attempts.
629    #[error(
630        "llm node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
631    )]
632    LlmTimeout {
633        /// Failing node id.
634        node_id: String,
635        /// Per-attempt timeout in milliseconds.
636        timeout_ms: u128,
637        /// Number of attempts made.
638        attempts: usize,
639    },
640    /// Tool node timed out across all attempts.
641    #[error(
642        "tool node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
643    )]
644    ToolTimeout {
645        /// Failing node id.
646        node_id: String,
647        /// Per-attempt timeout in milliseconds.
648        timeout_ms: u128,
649        /// Number of attempts made.
650        attempts: usize,
651    },
652    /// Tool node reached without an executor.
653    #[error("tool node '{node_id}' requires a tool executor")]
654    MissingToolExecutor {
655        /// Failing node id.
656        node_id: String,
657    },
658    /// LLM/tool node missing its `next` edge.
659    #[error("node '{node_id}' is missing required next edge")]
660    MissingNextEdge {
661        /// Failing node id.
662        node_id: String,
663    },
664    /// Condition expression could not be evaluated.
665    #[error("condition node '{node_id}' has invalid expression '{expression}': {reason}")]
666    InvalidCondition {
667        /// Failing node id.
668        node_id: String,
669        /// Condition expression.
670        expression: String,
671        /// Detailed reason.
672        reason: String,
673    },
674    /// Loop condition expression could not be evaluated.
675    #[error("loop node '{node_id}' has invalid condition '{expression}': {reason}")]
676    InvalidLoopCondition {
677        /// Failing node id.
678        node_id: String,
679        /// Loop condition expression.
680        expression: String,
681        /// Detailed reason.
682        reason: String,
683    },
684    /// Loop exceeded configured max iterations.
685    #[error("loop node '{node_id}' exceeded max iterations ({max_iterations})")]
686    LoopIterationLimitExceeded {
687        /// Failing node id.
688        node_id: String,
689        /// Configured max iterations.
690        max_iterations: u32,
691    },
692    /// Non-terminal node did not return a next transition.
693    #[error("node '{node_id}' did not provide a next transition")]
694    MissingNextTransition {
695        /// Failing node id.
696        node_id: String,
697    },
698    /// Scoped state boundary check failed.
699    #[error("scope access failed on node '{node_id}': {source}")]
700    ScopeAccess {
701        /// Failing node id.
702        node_id: String,
703        /// Source access error.
704        source: ScopeAccessError,
705    },
706    /// Trace recorder operation failed.
707    #[error(transparent)]
708    TraceRecording(#[from] TraceRecordError),
709    /// Replay trace validation failed.
710    #[error(transparent)]
711    Replay(#[from] ReplayError),
712    /// Replay mode requested without trace recording.
713    #[error("replay validation requires trace recording to be enabled")]
714    ReplayRequiresTraceRecording,
715    /// Parallel branch references unsupported node kind.
716    #[error("parallel node '{node_id}' cannot execute branch '{branch_id}': {reason}")]
717    ParallelBranchUnsupported {
718        /// Parallel node id.
719        node_id: String,
720        /// Branch node id.
721        branch_id: String,
722        /// Reason for rejection.
723        reason: String,
724    },
725    /// Merge node cannot resolve one or more sources.
726    #[error("merge node '{node_id}' missing source output from '{source_id}'")]
727    MissingMergeSource {
728        /// Merge node id.
729        node_id: String,
730        /// Missing source id.
731        source_id: String,
732    },
733    /// Merge quorum policy could not be satisfied.
734    #[error("merge node '{node_id}' quorum not met: required {required}, resolved {resolved}")]
735    MergeQuorumNotMet {
736        /// Merge node id.
737        node_id: String,
738        /// Required number of sources.
739        required: usize,
740        /// Number of resolved sources.
741        resolved: usize,
742    },
743    /// Map node items path did not resolve to an array.
744    #[error("map node '{node_id}' items_path '{items_path}' did not resolve to an array")]
745    MapItemsNotArray {
746        /// Map node id.
747        node_id: String,
748        /// Configured path.
749        items_path: String,
750    },
751    /// Reduce node source value is not reducible.
752    #[error("reduce node '{node_id}' source '{source_node}' is not reducible: {reason}")]
753    InvalidReduceInput {
754        /// Reduce node id.
755        node_id: String,
756        /// Source node id.
757        source_node: String,
758        /// Detailed reason.
759        reason: String,
760    },
761    /// Subgraph registry key not found.
762    #[error("subgraph node '{node_id}' references unknown graph '{graph}'")]
763    SubgraphNotFound { node_id: String, graph: String },
764    /// Batch source path did not resolve to an array.
765    #[error("batch node '{node_id}' items_path '{items_path}' did not resolve to an array")]
766    BatchItemsNotArray { node_id: String, items_path: String },
767    /// Filter source path did not resolve to an array.
768    #[error("filter node '{node_id}' items_path '{items_path}' did not resolve to an array")]
769    FilterItemsNotArray { node_id: String, items_path: String },
770    /// Filter expression failed while evaluating one item.
771    #[error("filter node '{node_id}' expression '{expression}' failed: {reason}")]
772    InvalidFilterExpression {
773        node_id: String,
774        expression: String,
775        reason: String,
776    },
777    /// Serialized expression scope exceeded configured runtime limit.
778    #[error(
779        "expression scope too large on node '{node_id}': {actual_bytes} bytes exceeds limit {limit_bytes}"
780    )]
781    ExpressionScopeLimitExceeded {
782        node_id: String,
783        actual_bytes: usize,
784        limit_bytes: usize,
785    },
786    /// Parallel branch fan-out exceeded configured runtime limit.
787    #[error(
788        "parallel node '{node_id}' has {actual_branches} branches, exceeding limit {max_branches}"
789    )]
790    ParallelBranchLimitExceeded {
791        node_id: String,
792        actual_branches: usize,
793        max_branches: usize,
794    },
795    /// Map item count exceeded configured runtime limit.
796    #[error("map node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
797    MapItemLimitExceeded {
798        node_id: String,
799        actual_items: usize,
800        max_items: usize,
801    },
802    /// Filter item count exceeded configured runtime limit.
803    #[error("filter node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
804    FilterItemLimitExceeded {
805        node_id: String,
806        actual_items: usize,
807        max_items: usize,
808    },
809    /// A node-required path could not be resolved.
810    #[error("node '{node_id}' could not resolve required path '{path}'")]
811    MissingPath { node_id: String, path: String },
812    /// A cache key path did not resolve to a string.
813    #[error("node '{node_id}' path '{path}' did not resolve to a string cache key")]
814    CacheKeyNotString { node_id: String, path: String },
815    /// Human decision value is unsupported.
816    #[error("human node '{node_id}' has unsupported decision value at '{path}': {value}")]
817    InvalidHumanDecision {
818        node_id: String,
819        path: String,
820        value: String,
821    },
822    /// Event value did not resolve to a string.
823    #[error("event trigger node '{node_id}' path '{path}' did not resolve to an event string")]
824    InvalidEventValue { node_id: String, path: String },
825    /// Router expression evaluation failed.
826    #[error("router node '{node_id}' route expression '{expression}' failed: {reason}")]
827    InvalidRouterExpression {
828        node_id: String,
829        expression: String,
830        reason: String,
831    },
832    /// Transform expression evaluation failed.
833    #[error("transform node '{node_id}' expression '{expression}' failed: {reason}")]
834    InvalidTransformExpression {
835        node_id: String,
836        expression: String,
837        reason: String,
838    },
839    /// Explicit retry/compensate node exhausted primary and compensation failed.
840    #[error(
841        "retry_compensate node '{node_id}' failed after {attempts} primary attempt(s) and compensation error: {compensation_error}"
842    )]
843    RetryCompensateFailed {
844        node_id: String,
845        attempts: usize,
846        compensation_error: ToolExecutionError,
847    },
848}
849
850/// Deterministic minimal runtime for workflow execution.
851pub struct WorkflowRuntime<'a> {
852    definition: WorkflowDefinition,
853    llm_executor: &'a dyn LlmExecutor,
854    tool_executor: Option<&'a dyn ToolExecutor>,
855    options: WorkflowRuntimeOptions,
856}
857
858impl<'a> WorkflowRuntime<'a> {
859    /// Creates a runtime with host-provided LLM and tool adapters.
860    pub fn new(
861        definition: WorkflowDefinition,
862        llm_executor: &'a dyn LlmExecutor,
863        tool_executor: Option<&'a dyn ToolExecutor>,
864        options: WorkflowRuntimeOptions,
865    ) -> Self {
866        Self {
867            definition,
868            llm_executor,
869            tool_executor,
870            options,
871        }
872    }
873
874    /// Executes a workflow deterministically from `start` to `end`.
875    pub async fn execute(
876        &self,
877        input: Value,
878        cancellation: Option<&dyn CancellationSignal>,
879    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
880        let workflow = if self.options.validate_before_run {
881            validate_and_normalize(&self.definition)?
882        } else {
883            self.definition.normalized()
884        };
885
886        let start_id = find_start_node_id(&workflow)?;
887        self.execute_from_node(
888            workflow,
889            RuntimeScope::new(input),
890            start_id,
891            0,
892            cancellation,
893        )
894        .await
895    }
896
897    /// Resumes execution from a checkpoint created after a node failure.
898    pub async fn execute_resume_from_failure(
899        &self,
900        checkpoint: &WorkflowCheckpoint,
901        cancellation: Option<&dyn CancellationSignal>,
902    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
903        let workflow = if self.options.validate_before_run {
904            validate_and_normalize(&self.definition)?
905        } else {
906            self.definition.normalized()
907        };
908
909        let scope_input = checkpoint
910            .scope_snapshot
911            .get("input")
912            .cloned()
913            .unwrap_or_else(|| checkpoint.scope_snapshot.clone());
914
915        self.execute_from_node(
916            workflow,
917            RuntimeScope::new(scope_input),
918            checkpoint.next_node_id.clone(),
919            checkpoint.step,
920            cancellation,
921        )
922        .await
923    }
924
925    async fn execute_from_node(
926        &self,
927        workflow: WorkflowDefinition,
928        mut scope: RuntimeScope,
929        start_node_id: String,
930        starting_step: usize,
931        cancellation: Option<&dyn CancellationSignal>,
932    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
933        let node_index = build_node_index(&workflow);
934
935        if matches!(
936            self.options.replay_mode,
937            WorkflowReplayMode::ValidateRecordedTrace
938        ) && !self.options.enable_trace_recording
939        {
940            return Err(WorkflowRuntimeError::ReplayRequiresTraceRecording);
941        }
942
943        let trace_recorder = self.options.enable_trace_recording.then(|| {
944            TraceRecorder::new(WorkflowTraceMetadata {
945                trace_id: format!("{}-{}-trace", workflow.name, workflow.version),
946                workflow_name: workflow.name.clone(),
947                workflow_version: workflow.version.clone(),
948                started_at_unix_ms: 0,
949                finished_at_unix_ms: None,
950            })
951        });
952        let mut trace_clock = 0u64;
953        let mut events = Vec::new();
954        let mut retry_events = Vec::new();
955        let mut node_executions = Vec::new();
956        let mut current_id = start_node_id;
957
958        for step in starting_step..self.options.max_steps {
959            check_cancelled(cancellation)?;
960            let node = node_index.get(current_id.as_str()).ok_or_else(|| {
961                WorkflowRuntimeError::NodeNotFound {
962                    node_id: current_id.clone(),
963                }
964            })?;
965
966            events.push(WorkflowEvent {
967                step,
968                node_id: current_id.clone(),
969                kind: WorkflowEventKind::NodeStarted,
970            });
971
972            if let Some(recorder) = &trace_recorder {
973                recorder.record_node_enter(next_trace_timestamp(&mut trace_clock), &current_id)?;
974            }
975
976            let execution_result = self
977                .execute_node(
978                    node,
979                    &node_index,
980                    step,
981                    &mut scope,
982                    cancellation,
983                    &mut retry_events,
984                )
985                .await;
986            let execution = match execution_result {
987                Ok(execution) => execution,
988                Err(error) => {
989                    if let Some(recorder) = &trace_recorder {
990                        recorder.record_node_error(
991                            next_trace_timestamp(&mut trace_clock),
992                            &current_id,
993                            error.to_string(),
994                        )?;
995                        recorder.record_terminal(
996                            next_trace_timestamp(&mut trace_clock),
997                            TraceTerminalStatus::Failed,
998                        )?;
999                        let _ = recorder.finalize(next_trace_timestamp(&mut trace_clock))?;
1000                    }
1001                    events.push(WorkflowEvent {
1002                        step,
1003                        node_id: current_id,
1004                        kind: WorkflowEventKind::NodeFailed {
1005                            message: error.to_string(),
1006                        },
1007                    });
1008                    return Err(error);
1009                }
1010            };
1011
1012            events.push(WorkflowEvent {
1013                step,
1014                node_id: execution.node_id.clone(),
1015                kind: WorkflowEventKind::NodeCompleted {
1016                    data: execution.data.clone(),
1017                },
1018            });
1019
1020            if let Some(recorder) = &trace_recorder {
1021                recorder
1022                    .record_node_exit(next_trace_timestamp(&mut trace_clock), &execution.node_id)?;
1023            }
1024
1025            let is_terminal = matches!(execution.data, NodeExecutionData::End);
1026            let next_node = next_node_id(&execution.data);
1027            let executed_node_id = execution.node_id.clone();
1028            node_executions.push(execution);
1029
1030            if is_terminal {
1031                let (trace, replay_report) = if let Some(recorder) = &trace_recorder {
1032                    recorder.record_terminal(
1033                        next_trace_timestamp(&mut trace_clock),
1034                        TraceTerminalStatus::Completed,
1035                    )?;
1036                    let finalized_trace =
1037                        recorder.finalize(next_trace_timestamp(&mut trace_clock))?;
1038                    let replay_report = match self.options.replay_mode {
1039                        WorkflowReplayMode::Disabled => None,
1040                        WorkflowReplayMode::ValidateRecordedTrace => {
1041                            Some(replay_trace(&finalized_trace)?)
1042                        }
1043                    };
1044                    (Some(finalized_trace), replay_report)
1045                } else {
1046                    (None, None)
1047                };
1048
1049                return Ok(WorkflowRunResult {
1050                    workflow_name: workflow.name,
1051                    terminal_node_id: executed_node_id,
1052                    node_executions,
1053                    events,
1054                    retry_events,
1055                    node_outputs: scope.node_outputs,
1056                    trace,
1057                    replay_report,
1058                });
1059            }
1060
1061            current_id = next_node.ok_or(WorkflowRuntimeError::MissingNextTransition {
1062                node_id: executed_node_id,
1063            })?;
1064        }
1065
1066        Err(WorkflowRuntimeError::StepLimitExceeded {
1067            max_steps: self.options.max_steps,
1068        })
1069    }
1070
1071    async fn execute_node(
1072        &self,
1073        node: &Node,
1074        node_index: &HashMap<&str, &Node>,
1075        step: usize,
1076        scope: &mut RuntimeScope,
1077        cancellation: Option<&dyn CancellationSignal>,
1078        retry_events: &mut Vec<WorkflowRetryEvent>,
1079    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1080        match &node.kind {
1081            NodeKind::Start { next } => Ok(self.execute_start_node(step, node, next)),
1082            NodeKind::Llm {
1083                model,
1084                prompt,
1085                next,
1086            } => {
1087                self.execute_llm_node(
1088                    step,
1089                    node,
1090                    LlmNodeSpec {
1091                        model,
1092                        prompt,
1093                        next,
1094                    },
1095                    scope,
1096                    cancellation,
1097                    retry_events,
1098                )
1099                .await
1100            }
1101            NodeKind::Tool { tool, input, next } => {
1102                self.execute_tool_node(
1103                    step,
1104                    node,
1105                    ToolNodeSpec { tool, input, next },
1106                    scope,
1107                    cancellation,
1108                    retry_events,
1109                )
1110                .await
1111            }
1112            NodeKind::Condition {
1113                expression,
1114                on_true,
1115                on_false,
1116            } => self.execute_condition_node(
1117                step,
1118                node,
1119                ConditionNodeSpec {
1120                    expression,
1121                    on_true,
1122                    on_false,
1123                },
1124                scope,
1125                cancellation,
1126            ),
1127            NodeKind::Debounce {
1128                key_path,
1129                window_steps,
1130                next,
1131                on_suppressed,
1132            } => {
1133                let scoped_input =
1134                    scope
1135                        .scoped_input(ScopeCapability::ConditionRead)
1136                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1137                            node_id: node.id.clone(),
1138                            source,
1139                        })?;
1140                let key = resolve_string_path(&scoped_input, key_path).ok_or_else(|| {
1141                    WorkflowRuntimeError::CacheKeyNotString {
1142                        node_id: node.id.clone(),
1143                        path: key_path.clone(),
1144                    }
1145                })?;
1146                let suppressed = scope.debounce(&node.id, &key, step, *window_steps);
1147                let chosen_next = if suppressed {
1148                    on_suppressed.clone().unwrap_or_else(|| next.clone())
1149                } else {
1150                    next.clone()
1151                };
1152
1153                scope
1154                    .record_node_output(
1155                        &node.id,
1156                        json!({"key": key.clone(), "suppressed": suppressed}),
1157                        ScopeCapability::MapWrite,
1158                    )
1159                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1160                        node_id: node.id.clone(),
1161                        source,
1162                    })?;
1163
1164                Ok(NodeExecution {
1165                    step,
1166                    node_id: node.id.clone(),
1167                    data: NodeExecutionData::Debounce {
1168                        key,
1169                        suppressed,
1170                        next: chosen_next,
1171                    },
1172                })
1173            }
1174            NodeKind::Throttle {
1175                key_path,
1176                window_steps,
1177                next,
1178                on_throttled,
1179            } => {
1180                let scoped_input =
1181                    scope
1182                        .scoped_input(ScopeCapability::ConditionRead)
1183                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1184                            node_id: node.id.clone(),
1185                            source,
1186                        })?;
1187                let key = resolve_string_path(&scoped_input, key_path).ok_or_else(|| {
1188                    WorkflowRuntimeError::CacheKeyNotString {
1189                        node_id: node.id.clone(),
1190                        path: key_path.clone(),
1191                    }
1192                })?;
1193                let throttled = scope.throttle(&node.id, &key, step, *window_steps);
1194                let chosen_next = if throttled {
1195                    on_throttled.clone().unwrap_or_else(|| next.clone())
1196                } else {
1197                    next.clone()
1198                };
1199
1200                scope
1201                    .record_node_output(
1202                        &node.id,
1203                        json!({"key": key.clone(), "throttled": throttled}),
1204                        ScopeCapability::MapWrite,
1205                    )
1206                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1207                        node_id: node.id.clone(),
1208                        source,
1209                    })?;
1210
1211                Ok(NodeExecution {
1212                    step,
1213                    node_id: node.id.clone(),
1214                    data: NodeExecutionData::Throttle {
1215                        key,
1216                        throttled,
1217                        next: chosen_next,
1218                    },
1219                })
1220            }
1221            NodeKind::RetryCompensate {
1222                tool,
1223                input,
1224                max_retries,
1225                compensate_tool,
1226                compensate_input,
1227                next,
1228                on_compensated,
1229            } => {
1230                let executor = self.tool_executor.ok_or_else(|| {
1231                    WorkflowRuntimeError::MissingToolExecutor {
1232                        node_id: node.id.clone(),
1233                    }
1234                })?;
1235                let scoped_input =
1236                    scope
1237                        .scoped_input(ScopeCapability::ToolRead)
1238                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1239                            node_id: node.id.clone(),
1240                            source,
1241                        })?;
1242                let total_attempts = max_retries.saturating_add(1);
1243                let mut last_error = None;
1244                let mut output = Value::Null;
1245                let mut compensated = false;
1246
1247                for attempt in 1..=total_attempts {
1248                    check_cancelled(cancellation)?;
1249                    match executor
1250                        .execute_tool(ToolExecutionInput {
1251                            node_id: node.id.clone(),
1252                            tool: tool.clone(),
1253                            input: input.clone(),
1254                            scoped_input: scoped_input.clone(),
1255                        })
1256                        .await
1257                    {
1258                        Ok(value) => {
1259                            output = json!({"status": "ok", "attempt": attempt, "value": value});
1260                            break;
1261                        }
1262                        Err(error) => {
1263                            last_error = Some(error.clone());
1264                            if attempt < total_attempts {
1265                                retry_events.push(WorkflowRetryEvent {
1266                                    step,
1267                                    node_id: node.id.clone(),
1268                                    operation: "retry_compensate".to_string(),
1269                                    failed_attempt: attempt,
1270                                    reason: error.to_string(),
1271                                });
1272                            }
1273                        }
1274                    }
1275                }
1276
1277                if output.is_null() {
1278                    compensated = true;
1279                    let compensation = executor
1280                        .execute_tool(ToolExecutionInput {
1281                            node_id: node.id.clone(),
1282                            tool: compensate_tool.clone(),
1283                            input: compensate_input.clone(),
1284                            scoped_input,
1285                        })
1286                        .await
1287                        .map_err(|compensation_error| {
1288                            WorkflowRuntimeError::RetryCompensateFailed {
1289                                node_id: node.id.clone(),
1290                                attempts: total_attempts,
1291                                compensation_error,
1292                            }
1293                        })?;
1294                    output = json!({
1295                        "status": "compensated",
1296                        "attempts": total_attempts,
1297                        "last_error": last_error.map(|error| error.to_string()).unwrap_or_default(),
1298                        "compensation": compensation
1299                    });
1300                }
1301
1302                let chosen_next = if compensated {
1303                    on_compensated.clone().unwrap_or_else(|| next.clone())
1304                } else {
1305                    next.clone()
1306                };
1307
1308                scope
1309                    .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1310                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1311                        node_id: node.id.clone(),
1312                        source,
1313                    })?;
1314
1315                Ok(NodeExecution {
1316                    step,
1317                    node_id: node.id.clone(),
1318                    data: NodeExecutionData::RetryCompensate {
1319                        tool: tool.clone(),
1320                        attempts: total_attempts,
1321                        compensated,
1322                        output,
1323                        next: chosen_next,
1324                    },
1325                })
1326            }
1327            NodeKind::HumanInTheLoop {
1328                decision_path,
1329                response_path,
1330                on_approve,
1331                on_reject,
1332            } => {
1333                let scoped_input =
1334                    scope
1335                        .scoped_input(ScopeCapability::ConditionRead)
1336                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1337                            node_id: node.id.clone(),
1338                            source,
1339                        })?;
1340                let decision_value =
1341                    resolve_path(&scoped_input, decision_path).ok_or_else(|| {
1342                        WorkflowRuntimeError::MissingPath {
1343                            node_id: node.id.clone(),
1344                            path: decision_path.clone(),
1345                        }
1346                    })?;
1347                let approved = evaluate_human_decision(decision_value).ok_or_else(|| {
1348                    WorkflowRuntimeError::InvalidHumanDecision {
1349                        node_id: node.id.clone(),
1350                        path: decision_path.clone(),
1351                        value: decision_value.to_string(),
1352                    }
1353                })?;
1354                let response = if let Some(path) = response_path {
1355                    resolve_path(&scoped_input, path).cloned().ok_or_else(|| {
1356                        WorkflowRuntimeError::MissingPath {
1357                            node_id: node.id.clone(),
1358                            path: path.clone(),
1359                        }
1360                    })?
1361                } else {
1362                    Value::Null
1363                };
1364                let chosen_next = if approved {
1365                    on_approve.clone()
1366                } else {
1367                    on_reject.clone()
1368                };
1369
1370                scope
1371                    .record_node_output(
1372                        &node.id,
1373                        json!({"approved": approved, "response": response.clone()}),
1374                        ScopeCapability::MapWrite,
1375                    )
1376                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1377                        node_id: node.id.clone(),
1378                        source,
1379                    })?;
1380
1381                Ok(NodeExecution {
1382                    step,
1383                    node_id: node.id.clone(),
1384                    data: NodeExecutionData::HumanInTheLoop {
1385                        approved,
1386                        response,
1387                        next: chosen_next,
1388                    },
1389                })
1390            }
1391            NodeKind::CacheWrite {
1392                key_path,
1393                value_path,
1394                next,
1395            } => {
1396                let scoped_input =
1397                    scope
1398                        .scoped_input(ScopeCapability::ConditionRead)
1399                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1400                            node_id: node.id.clone(),
1401                            source,
1402                        })?;
1403                let key = resolve_string_path(&scoped_input, key_path).ok_or_else(|| {
1404                    WorkflowRuntimeError::CacheKeyNotString {
1405                        node_id: node.id.clone(),
1406                        path: key_path.clone(),
1407                    }
1408                })?;
1409                let value = resolve_path(&scoped_input, value_path)
1410                    .cloned()
1411                    .ok_or_else(|| WorkflowRuntimeError::MissingPath {
1412                        node_id: node.id.clone(),
1413                        path: value_path.clone(),
1414                    })?;
1415
1416                scope.put_cache(&key, value.clone());
1417                scope
1418                    .record_node_output(
1419                        &node.id,
1420                        json!({"key": key.clone(), "value": value.clone()}),
1421                        ScopeCapability::MapWrite,
1422                    )
1423                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1424                        node_id: node.id.clone(),
1425                        source,
1426                    })?;
1427
1428                Ok(NodeExecution {
1429                    step,
1430                    node_id: node.id.clone(),
1431                    data: NodeExecutionData::CacheWrite {
1432                        key,
1433                        value,
1434                        next: next.clone(),
1435                    },
1436                })
1437            }
1438            NodeKind::CacheRead {
1439                key_path,
1440                next,
1441                on_miss,
1442            } => {
1443                let scoped_input =
1444                    scope
1445                        .scoped_input(ScopeCapability::ConditionRead)
1446                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1447                            node_id: node.id.clone(),
1448                            source,
1449                        })?;
1450                let key = resolve_string_path(&scoped_input, key_path).ok_or_else(|| {
1451                    WorkflowRuntimeError::CacheKeyNotString {
1452                        node_id: node.id.clone(),
1453                        path: key_path.clone(),
1454                    }
1455                })?;
1456                let value = scope.cache_value(&key).cloned().unwrap_or(Value::Null);
1457                let hit = !value.is_null();
1458                let chosen_next = if hit {
1459                    next.clone()
1460                } else {
1461                    on_miss.clone().unwrap_or_else(|| next.clone())
1462                };
1463
1464                scope
1465                    .record_node_output(
1466                        &node.id,
1467                        json!({"key": key.clone(), "hit": hit, "value": value.clone()}),
1468                        ScopeCapability::MapWrite,
1469                    )
1470                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1471                        node_id: node.id.clone(),
1472                        source,
1473                    })?;
1474
1475                Ok(NodeExecution {
1476                    step,
1477                    node_id: node.id.clone(),
1478                    data: NodeExecutionData::CacheRead {
1479                        key,
1480                        hit,
1481                        value,
1482                        next: chosen_next,
1483                    },
1484                })
1485            }
1486            NodeKind::EventTrigger {
1487                event,
1488                event_path,
1489                next,
1490                on_mismatch,
1491            } => {
1492                let scoped_input =
1493                    scope
1494                        .scoped_input(ScopeCapability::ConditionRead)
1495                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1496                            node_id: node.id.clone(),
1497                            source,
1498                        })?;
1499                let actual = resolve_path(&scoped_input, event_path)
1500                    .and_then(Value::as_str)
1501                    .ok_or_else(|| WorkflowRuntimeError::InvalidEventValue {
1502                        node_id: node.id.clone(),
1503                        path: event_path.clone(),
1504                    })?;
1505                let matched = actual == event;
1506                let chosen_next = if matched {
1507                    next.clone()
1508                } else {
1509                    on_mismatch.clone().unwrap_or_else(|| next.clone())
1510                };
1511
1512                scope
1513                    .record_node_output(
1514                        &node.id,
1515                        json!({"event": event.clone(), "matched": matched, "actual": actual}),
1516                        ScopeCapability::MapWrite,
1517                    )
1518                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1519                        node_id: node.id.clone(),
1520                        source,
1521                    })?;
1522
1523                Ok(NodeExecution {
1524                    step,
1525                    node_id: node.id.clone(),
1526                    data: NodeExecutionData::EventTrigger {
1527                        event: event.clone(),
1528                        matched,
1529                        next: chosen_next,
1530                    },
1531                })
1532            }
1533            NodeKind::Router { routes, default } => {
1534                let scoped_input =
1535                    scope
1536                        .scoped_input(ScopeCapability::ConditionRead)
1537                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1538                            node_id: node.id.clone(),
1539                            source,
1540                        })?;
1541                enforce_expression_scope_budget(
1542                    &node.id,
1543                    &scoped_input,
1544                    self.options.security_limits.max_expression_scope_bytes,
1545                )?;
1546                let mut selected = default.clone();
1547                for route in routes {
1548                    let matched = expressions::evaluate_bool(&route.when, &scoped_input).map_err(
1549                        |reason| WorkflowRuntimeError::InvalidRouterExpression {
1550                            node_id: node.id.clone(),
1551                            expression: route.when.clone(),
1552                            reason: reason.to_string(),
1553                        },
1554                    )?;
1555                    if matched {
1556                        selected = route.next.clone();
1557                        break;
1558                    }
1559                }
1560
1561                scope
1562                    .record_node_output(
1563                        &node.id,
1564                        json!({"selected": selected.clone()}),
1565                        ScopeCapability::MapWrite,
1566                    )
1567                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1568                        node_id: node.id.clone(),
1569                        source,
1570                    })?;
1571
1572                Ok(NodeExecution {
1573                    step,
1574                    node_id: node.id.clone(),
1575                    data: NodeExecutionData::Router {
1576                        selected: selected.clone(),
1577                        next: selected,
1578                    },
1579                })
1580            }
1581            NodeKind::Transform { expression, next } => {
1582                let scoped_input =
1583                    scope
1584                        .scoped_input(ScopeCapability::ConditionRead)
1585                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1586                            node_id: node.id.clone(),
1587                            source,
1588                        })?;
1589                let output =
1590                    evaluate_transform_expression(expression, &scoped_input).map_err(|reason| {
1591                        WorkflowRuntimeError::InvalidTransformExpression {
1592                            node_id: node.id.clone(),
1593                            expression: expression.clone(),
1594                            reason,
1595                        }
1596                    })?;
1597
1598                scope
1599                    .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1600                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1601                        node_id: node.id.clone(),
1602                        source,
1603                    })?;
1604
1605                Ok(NodeExecution {
1606                    step,
1607                    node_id: node.id.clone(),
1608                    data: NodeExecutionData::Transform {
1609                        expression: expression.clone(),
1610                        output,
1611                        next: next.clone(),
1612                    },
1613                })
1614            }
1615            NodeKind::Loop {
1616                condition,
1617                body,
1618                next,
1619                max_iterations,
1620            } => {
1621                check_cancelled(cancellation)?;
1622                let scoped_input =
1623                    scope
1624                        .scoped_input(ScopeCapability::ConditionRead)
1625                        .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1626                            node_id: node.id.clone(),
1627                            source,
1628                        })?;
1629                enforce_expression_scope_budget(
1630                    &node.id,
1631                    &scoped_input,
1632                    self.options.security_limits.max_expression_scope_bytes,
1633                )?;
1634                let evaluated =
1635                    expressions::evaluate_bool(condition, &scoped_input).map_err(|reason| {
1636                        WorkflowRuntimeError::InvalidLoopCondition {
1637                            node_id: node.id.clone(),
1638                            expression: condition.clone(),
1639                            reason: reason.to_string(),
1640                        }
1641                    })?;
1642
1643                let (iteration, chosen_next) = if evaluated {
1644                    let iteration = scope.loop_iteration(&node.id).saturating_add(1);
1645                    if let Some(limit) = max_iterations {
1646                        if iteration > *limit {
1647                            return Err(WorkflowRuntimeError::LoopIterationLimitExceeded {
1648                                node_id: node.id.clone(),
1649                                max_iterations: *limit,
1650                            });
1651                        }
1652                    }
1653                    scope.set_loop_iteration(&node.id, iteration);
1654                    (iteration, body.clone())
1655                } else {
1656                    scope.clear_loop_iteration(&node.id);
1657                    (0, next.clone())
1658                };
1659
1660                scope
1661                    .record_condition_output(&node.id, evaluated, ScopeCapability::ConditionWrite)
1662                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1663                        node_id: node.id.clone(),
1664                        source,
1665                    })?;
1666
1667                Ok(NodeExecution {
1668                    step,
1669                    node_id: node.id.clone(),
1670                    data: NodeExecutionData::Loop {
1671                        condition: condition.clone(),
1672                        evaluated,
1673                        iteration,
1674                        next: chosen_next,
1675                    },
1676                })
1677            }
1678            NodeKind::Parallel {
1679                branches,
1680                next,
1681                max_in_flight,
1682            } => {
1683                self.execute_parallel_node(
1684                    step,
1685                    node,
1686                    ParallelNodeSpec {
1687                        node_index,
1688                        branches,
1689                        next,
1690                        max_in_flight: *max_in_flight,
1691                    },
1692                    scope,
1693                    cancellation,
1694                    retry_events,
1695                )
1696                .await
1697            }
1698            NodeKind::Merge {
1699                sources,
1700                policy,
1701                quorum,
1702                next,
1703            } => self.execute_merge_node(
1704                step,
1705                node,
1706                MergeNodeSpec {
1707                    sources,
1708                    policy,
1709                    quorum: *quorum,
1710                    next,
1711                },
1712                scope,
1713            ),
1714            NodeKind::Map {
1715                tool,
1716                items_path,
1717                next,
1718                max_in_flight,
1719            } => {
1720                self.execute_map_node(
1721                    step,
1722                    node,
1723                    MapNodeSpec {
1724                        tool,
1725                        items_path,
1726                        next,
1727                        max_in_flight: *max_in_flight,
1728                    },
1729                    scope,
1730                    cancellation,
1731                    retry_events,
1732                )
1733                .await
1734            }
1735            NodeKind::Reduce {
1736                source,
1737                operation,
1738                next,
1739            } => self.execute_reduce_node(step, node, source, operation, next, scope),
1740            NodeKind::Subgraph { graph, next } => {
1741                check_cancelled(cancellation)?;
1742                let next_node =
1743                    next.clone()
1744                        .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
1745                            node_id: node.id.clone(),
1746                        })?;
1747                let subgraph = self.options.subgraph_registry.get(graph).ok_or_else(|| {
1748                    WorkflowRuntimeError::SubgraphNotFound {
1749                        node_id: node.id.clone(),
1750                        graph: graph.clone(),
1751                    }
1752                })?;
1753
1754                let subgraph_runtime = WorkflowRuntime::new(
1755                    subgraph.clone(),
1756                    self.llm_executor,
1757                    self.tool_executor,
1758                    WorkflowRuntimeOptions {
1759                        replay_mode: WorkflowReplayMode::Disabled,
1760                        enable_trace_recording: false,
1761                        ..self.options.clone()
1762                    },
1763                );
1764                let subgraph_result = Box::pin(
1765                    subgraph_runtime.execute(
1766                        scope
1767                            .scoped_input(ScopeCapability::ConditionRead)
1768                            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1769                                node_id: node.id.clone(),
1770                                source,
1771                            })?,
1772                        cancellation,
1773                    ),
1774                )
1775                .await?;
1776
1777                let subgraph_output = json!({
1778                    "terminal_node_id": subgraph_result.terminal_node_id,
1779                    "node_outputs": subgraph_result.node_outputs,
1780                });
1781                scope
1782                    .record_node_output(
1783                        &node.id,
1784                        subgraph_output.clone(),
1785                        ScopeCapability::MapWrite,
1786                    )
1787                    .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1788                        node_id: node.id.clone(),
1789                        source,
1790                    })?;
1791
1792                Ok(NodeExecution {
1793                    step,
1794                    node_id: node.id.clone(),
1795                    data: NodeExecutionData::Subgraph {
1796                        graph: graph.clone(),
1797                        terminal_node_id: subgraph_result.terminal_node_id,
1798                        output: subgraph_output,
1799                        next: next_node,
1800                    },
1801                })
1802            }
1803            NodeKind::Batch { items_path, next } => {
1804                self.execute_batch_node(step, node, items_path, next, scope)
1805            }
1806            NodeKind::Filter {
1807                items_path,
1808                expression,
1809                next,
1810            } => self.execute_filter_node(step, node, items_path, expression, next, scope),
1811            NodeKind::End => Ok(NodeExecution {
1812                step,
1813                node_id: node.id.clone(),
1814                data: NodeExecutionData::End,
1815            }),
1816        }
1817    }
1818
1819    fn execute_start_node(&self, step: usize, node: &Node, next: &str) -> NodeExecution {
1820        NodeExecution {
1821            step,
1822            node_id: node.id.clone(),
1823            data: NodeExecutionData::Start {
1824                next: next.to_string(),
1825            },
1826        }
1827    }
1828
1829    async fn execute_llm_node(
1830        &self,
1831        step: usize,
1832        node: &Node,
1833        spec: LlmNodeSpec<'_>,
1834        scope: &mut RuntimeScope,
1835        cancellation: Option<&dyn CancellationSignal>,
1836        retry_events: &mut Vec<WorkflowRetryEvent>,
1837    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1838        let next_node = spec
1839            .next
1840            .clone()
1841            .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
1842                node_id: node.id.clone(),
1843            })?;
1844
1845        let (output, llm_retries) = self
1846            .execute_llm_with_policy(step, node, spec.model, spec.prompt, scope, cancellation)
1847            .await?;
1848        retry_events.extend(llm_retries);
1849
1850        scope
1851            .record_llm_output(&node.id, output.content.clone(), ScopeCapability::LlmWrite)
1852            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1853                node_id: node.id.clone(),
1854                source,
1855            })?;
1856
1857        Ok(NodeExecution {
1858            step,
1859            node_id: node.id.clone(),
1860            data: NodeExecutionData::Llm {
1861                model: spec.model.to_string(),
1862                output: output.content,
1863                next: next_node,
1864            },
1865        })
1866    }
1867
1868    async fn execute_tool_node(
1869        &self,
1870        step: usize,
1871        node: &Node,
1872        spec: ToolNodeSpec<'_>,
1873        scope: &mut RuntimeScope,
1874        cancellation: Option<&dyn CancellationSignal>,
1875        retry_events: &mut Vec<WorkflowRetryEvent>,
1876    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1877        let next_node = spec
1878            .next
1879            .clone()
1880            .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
1881                node_id: node.id.clone(),
1882            })?;
1883
1884        let executor =
1885            self.tool_executor
1886                .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
1887                    node_id: node.id.clone(),
1888                })?;
1889
1890        let scoped_input = scope
1891            .scoped_input(ScopeCapability::ToolRead)
1892            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1893                node_id: node.id.clone(),
1894                source,
1895            })?;
1896
1897        let (tool_output, tool_retries) = self
1898            .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1899                step,
1900                node,
1901                tool: spec.tool,
1902                input: spec.input,
1903                executor,
1904                scoped_input,
1905                cancellation,
1906            })
1907            .await?;
1908        retry_events.extend(tool_retries);
1909
1910        scope
1911            .record_tool_output(&node.id, tool_output.clone(), ScopeCapability::ToolWrite)
1912            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1913                node_id: node.id.clone(),
1914                source,
1915            })?;
1916
1917        Ok(NodeExecution {
1918            step,
1919            node_id: node.id.clone(),
1920            data: NodeExecutionData::Tool {
1921                tool: spec.tool.to_string(),
1922                output: tool_output,
1923                next: next_node,
1924            },
1925        })
1926    }
1927
1928    fn execute_condition_node(
1929        &self,
1930        step: usize,
1931        node: &Node,
1932        spec: ConditionNodeSpec<'_>,
1933        scope: &mut RuntimeScope,
1934        cancellation: Option<&dyn CancellationSignal>,
1935    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1936        check_cancelled(cancellation)?;
1937        let scoped_input =
1938            scope
1939                .scoped_input(ScopeCapability::ConditionRead)
1940                .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1941                    node_id: node.id.clone(),
1942                    source,
1943                })?;
1944        enforce_expression_scope_budget(
1945            &node.id,
1946            &scoped_input,
1947            self.options.security_limits.max_expression_scope_bytes,
1948        )?;
1949        let evaluated =
1950            expressions::evaluate_bool(spec.expression, &scoped_input).map_err(|reason| {
1951                WorkflowRuntimeError::InvalidCondition {
1952                    node_id: node.id.clone(),
1953                    expression: spec.expression.to_string(),
1954                    reason: reason.to_string(),
1955                }
1956            })?;
1957        let next = if evaluated {
1958            spec.on_true.to_string()
1959        } else {
1960            spec.on_false.to_string()
1961        };
1962
1963        scope
1964            .record_condition_output(&node.id, evaluated, ScopeCapability::ConditionWrite)
1965            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1966                node_id: node.id.clone(),
1967                source,
1968            })?;
1969
1970        Ok(NodeExecution {
1971            step,
1972            node_id: node.id.clone(),
1973            data: NodeExecutionData::Condition {
1974                expression: spec.expression.to_string(),
1975                evaluated,
1976                next,
1977            },
1978        })
1979    }
1980
1981    async fn execute_parallel_node(
1982        &self,
1983        step: usize,
1984        node: &Node,
1985        spec: ParallelNodeSpec<'_>,
1986        scope: &mut RuntimeScope,
1987        cancellation: Option<&dyn CancellationSignal>,
1988        retry_events: &mut Vec<WorkflowRetryEvent>,
1989    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1990        check_cancelled(cancellation)?;
1991        if spec.branches.len() > self.options.security_limits.max_parallel_branches {
1992            return Err(WorkflowRuntimeError::ParallelBranchLimitExceeded {
1993                node_id: node.id.clone(),
1994                actual_branches: spec.branches.len(),
1995                max_branches: self.options.security_limits.max_parallel_branches,
1996            });
1997        }
1998        let base_scope = scope
1999            .scoped_input(ScopeCapability::MapRead)
2000            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2001                node_id: node.id.clone(),
2002                source,
2003            })?;
2004        let scheduler = DagScheduler::new(
2005            spec.max_in_flight
2006                .unwrap_or(self.options.scheduler_max_in_flight),
2007        );
2008        let parallel_node_id = node.id.clone();
2009
2010        let branch_outputs: Vec<(String, Value, Vec<WorkflowRetryEvent>)> = scheduler
2011            .run_bounded(spec.branches.iter().cloned(), |branch_id| {
2012                let parallel_node_id = parallel_node_id.clone();
2013                let base_scope = base_scope.clone();
2014                async move {
2015                    let branch_node = spec.node_index.get(branch_id.as_str()).ok_or_else(|| {
2016                        WorkflowRuntimeError::NodeNotFound {
2017                            node_id: branch_id.clone(),
2018                        }
2019                    })?;
2020                    self.execute_parallel_branch(
2021                        step,
2022                        &parallel_node_id,
2023                        branch_node,
2024                        base_scope,
2025                        cancellation,
2026                    )
2027                    .await
2028                }
2029            })
2030            .await?;
2031
2032        let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
2033        for (branch_id, output, branch_retry_events) in branch_outputs {
2034            retry_events.extend(branch_retry_events);
2035            scope
2036                .record_node_output(&branch_id, output.clone(), ScopeCapability::MapWrite)
2037                .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2038                    node_id: node.id.clone(),
2039                    source,
2040                })?;
2041            outputs.insert(branch_id, output);
2042        }
2043
2044        scope
2045            .record_node_output(
2046                &node.id,
2047                Value::Object(
2048                    outputs
2049                        .iter()
2050                        .map(|(key, value)| (key.clone(), value.clone()))
2051                        .collect(),
2052                ),
2053                ScopeCapability::MapWrite,
2054            )
2055            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2056                node_id: node.id.clone(),
2057                source,
2058            })?;
2059
2060        Ok(NodeExecution {
2061            step,
2062            node_id: node.id.clone(),
2063            data: NodeExecutionData::Parallel {
2064                branches: spec.branches.to_vec(),
2065                outputs,
2066                next: spec.next.to_string(),
2067            },
2068        })
2069    }
2070
2071    fn execute_merge_node(
2072        &self,
2073        step: usize,
2074        node: &Node,
2075        spec: MergeNodeSpec<'_>,
2076        scope: &mut RuntimeScope,
2077    ) -> Result<NodeExecution, WorkflowRuntimeError> {
2078        let mut resolved = Vec::with_capacity(spec.sources.len());
2079        for source in spec.sources {
2080            let Some(value) = scope.node_output(source).cloned() else {
2081                return Err(WorkflowRuntimeError::MissingMergeSource {
2082                    node_id: node.id.clone(),
2083                    source_id: source.clone(),
2084                });
2085            };
2086            resolved.push((source.clone(), value));
2087        }
2088
2089        let output = match spec.policy {
2090            MergePolicy::First => resolved
2091                .first()
2092                .map(|(_, value)| value.clone())
2093                .unwrap_or(Value::Null),
2094            MergePolicy::All => Value::Array(
2095                resolved
2096                    .iter()
2097                    .map(|(_, value)| value.clone())
2098                    .collect::<Vec<_>>(),
2099            ),
2100            MergePolicy::Quorum => {
2101                let required = spec.quorum.unwrap_or_default();
2102                let resolved_count = resolved.len();
2103                if resolved_count < required {
2104                    return Err(WorkflowRuntimeError::MergeQuorumNotMet {
2105                        node_id: node.id.clone(),
2106                        required,
2107                        resolved: resolved_count,
2108                    });
2109                }
2110                Value::Array(
2111                    resolved
2112                        .iter()
2113                        .take(required)
2114                        .map(|(_, value)| value.clone())
2115                        .collect::<Vec<_>>(),
2116                )
2117            }
2118        };
2119
2120        scope
2121            .record_node_output(&node.id, output.clone(), ScopeCapability::ReduceWrite)
2122            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2123                node_id: node.id.clone(),
2124                source,
2125            })?;
2126
2127        Ok(NodeExecution {
2128            step,
2129            node_id: node.id.clone(),
2130            data: NodeExecutionData::Merge {
2131                policy: spec.policy.clone(),
2132                sources: spec.sources.to_vec(),
2133                output,
2134                next: spec.next.to_string(),
2135            },
2136        })
2137    }
2138
2139    async fn execute_map_node(
2140        &self,
2141        step: usize,
2142        node: &Node,
2143        spec: MapNodeSpec<'_>,
2144        scope: &mut RuntimeScope,
2145        cancellation: Option<&dyn CancellationSignal>,
2146        retry_events: &mut Vec<WorkflowRetryEvent>,
2147    ) -> Result<NodeExecution, WorkflowRuntimeError> {
2148        let executor =
2149            self.tool_executor
2150                .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
2151                    node_id: node.id.clone(),
2152                })?;
2153
2154        let scoped_input = scope
2155            .scoped_input(ScopeCapability::MapRead)
2156            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2157                node_id: node.id.clone(),
2158                source,
2159            })?;
2160        let items = resolve_path(&scoped_input, spec.items_path)
2161            .and_then(Value::as_array)
2162            .ok_or_else(|| WorkflowRuntimeError::MapItemsNotArray {
2163                node_id: node.id.clone(),
2164                items_path: spec.items_path.to_string(),
2165            })?
2166            .clone();
2167        if items.len() > self.options.security_limits.max_map_items {
2168            return Err(WorkflowRuntimeError::MapItemLimitExceeded {
2169                node_id: node.id.clone(),
2170                actual_items: items.len(),
2171                max_items: self.options.security_limits.max_map_items,
2172            });
2173        }
2174
2175        let scheduler = DagScheduler::new(
2176            spec.max_in_flight
2177                .unwrap_or(self.options.scheduler_max_in_flight),
2178        );
2179        let map_node = node.clone();
2180        let mapped: Vec<(Value, Vec<WorkflowRetryEvent>)> = scheduler
2181            .run_bounded(items.into_iter().enumerate(), |(index, item)| {
2182                let scoped_input = scoped_input.clone();
2183                let map_node = map_node.clone();
2184                async move {
2185                    let item_scope = map_item_scoped_input(&scoped_input, &item, index);
2186                    let (output, retries) = self
2187                        .execute_tool_with_policy_for_scope(ToolPolicyRequest {
2188                            step,
2189                            node: &map_node,
2190                            tool: spec.tool,
2191                            input: &item,
2192                            executor,
2193                            scoped_input: item_scope,
2194                            cancellation,
2195                        })
2196                        .await?;
2197                    Ok::<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError>((output, retries))
2198                }
2199            })
2200            .await?;
2201
2202        let mut outputs = Vec::with_capacity(mapped.len());
2203        for (output, local_retries) in mapped {
2204            outputs.push(output);
2205            retry_events.extend(local_retries);
2206        }
2207
2208        let output = Value::Array(outputs);
2209        scope
2210            .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
2211            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2212                node_id: node.id.clone(),
2213                source,
2214            })?;
2215
2216        Ok(NodeExecution {
2217            step,
2218            node_id: node.id.clone(),
2219            data: NodeExecutionData::Map {
2220                item_count: output.as_array().map_or(0, Vec::len),
2221                output,
2222                next: spec.next.to_string(),
2223            },
2224        })
2225    }
2226
2227    fn execute_reduce_node(
2228        &self,
2229        step: usize,
2230        node: &Node,
2231        source: &str,
2232        operation: &ReduceOperation,
2233        next: &str,
2234        scope: &mut RuntimeScope,
2235    ) -> Result<NodeExecution, WorkflowRuntimeError> {
2236        let source_value = scope.node_output(source).cloned().ok_or_else(|| {
2237            WorkflowRuntimeError::MissingMergeSource {
2238                node_id: node.id.clone(),
2239                source_id: source.to_string(),
2240            }
2241        })?;
2242
2243        let reduced = reduce_value(&node.id, source, operation, source_value)?;
2244        scope
2245            .record_node_output(&node.id, reduced.clone(), ScopeCapability::ReduceWrite)
2246            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2247                node_id: node.id.clone(),
2248                source,
2249            })?;
2250
2251        Ok(NodeExecution {
2252            step,
2253            node_id: node.id.clone(),
2254            data: NodeExecutionData::Reduce {
2255                operation: operation.clone(),
2256                output: reduced,
2257                next: next.to_string(),
2258            },
2259        })
2260    }
2261
2262    fn execute_batch_node(
2263        &self,
2264        step: usize,
2265        node: &Node,
2266        items_path: &str,
2267        next: &str,
2268        scope: &mut RuntimeScope,
2269    ) -> Result<NodeExecution, WorkflowRuntimeError> {
2270        let scoped = scope
2271            .scoped_input(ScopeCapability::MapRead)
2272            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2273                node_id: node.id.clone(),
2274                source,
2275            })?;
2276        let items = resolve_path(&scoped, items_path).ok_or_else(|| {
2277            WorkflowRuntimeError::BatchItemsNotArray {
2278                node_id: node.id.clone(),
2279                items_path: items_path.to_string(),
2280            }
2281        })?;
2282        let array = items
2283            .as_array()
2284            .ok_or_else(|| WorkflowRuntimeError::BatchItemsNotArray {
2285                node_id: node.id.clone(),
2286                items_path: items_path.to_string(),
2287            })?;
2288
2289        let output = Value::Array(array.clone());
2290        scope
2291            .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
2292            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2293                node_id: node.id.clone(),
2294                source,
2295            })?;
2296
2297        Ok(NodeExecution {
2298            step,
2299            node_id: node.id.clone(),
2300            data: NodeExecutionData::Batch {
2301                items_path: items_path.to_string(),
2302                item_count: array.len(),
2303                next: next.to_string(),
2304            },
2305        })
2306    }
2307
2308    fn execute_filter_node(
2309        &self,
2310        step: usize,
2311        node: &Node,
2312        items_path: &str,
2313        expression: &str,
2314        next: &str,
2315        scope: &mut RuntimeScope,
2316    ) -> Result<NodeExecution, WorkflowRuntimeError> {
2317        let scoped = scope
2318            .scoped_input(ScopeCapability::MapRead)
2319            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2320                node_id: node.id.clone(),
2321                source,
2322            })?;
2323        let items_value = resolve_path(&scoped, items_path).ok_or_else(|| {
2324            WorkflowRuntimeError::FilterItemsNotArray {
2325                node_id: node.id.clone(),
2326                items_path: items_path.to_string(),
2327            }
2328        })?;
2329        let array =
2330            items_value
2331                .as_array()
2332                .ok_or_else(|| WorkflowRuntimeError::FilterItemsNotArray {
2333                    node_id: node.id.clone(),
2334                    items_path: items_path.to_string(),
2335                })?;
2336        if array.len() > self.options.security_limits.max_filter_items {
2337            return Err(WorkflowRuntimeError::FilterItemLimitExceeded {
2338                node_id: node.id.clone(),
2339                actual_items: array.len(),
2340                max_items: self.options.security_limits.max_filter_items,
2341            });
2342        }
2343
2344        let mut kept = Vec::new();
2345        for (index, item) in array.iter().enumerate() {
2346            let mut eval_scope = scoped.clone();
2347            if let Some(object) = eval_scope.as_object_mut() {
2348                object.insert("item".to_string(), item.clone());
2349                object.insert("item_index".to_string(), Value::from(index as u64));
2350            }
2351            enforce_expression_scope_budget(
2352                &node.id,
2353                &eval_scope,
2354                self.options.security_limits.max_expression_scope_bytes,
2355            )?;
2356            let include =
2357                expressions::evaluate_bool(expression, &eval_scope).map_err(|reason| {
2358                    WorkflowRuntimeError::InvalidFilterExpression {
2359                        node_id: node.id.clone(),
2360                        expression: expression.to_string(),
2361                        reason: reason.to_string(),
2362                    }
2363                })?;
2364            if include {
2365                kept.push(item.clone());
2366            }
2367        }
2368        let output = Value::Array(kept.clone());
2369        scope
2370            .record_node_output(&node.id, output, ScopeCapability::MapWrite)
2371            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2372                node_id: node.id.clone(),
2373                source,
2374            })?;
2375
2376        Ok(NodeExecution {
2377            step,
2378            node_id: node.id.clone(),
2379            data: NodeExecutionData::Filter {
2380                items_path: items_path.to_string(),
2381                expression: expression.to_string(),
2382                kept: kept.len(),
2383                next: next.to_string(),
2384            },
2385        })
2386    }
2387
2388    async fn execute_llm_with_policy(
2389        &self,
2390        step: usize,
2391        node: &Node,
2392        model: &str,
2393        prompt: &str,
2394        scope: &RuntimeScope,
2395        cancellation: Option<&dyn CancellationSignal>,
2396    ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
2397        let scoped_input = scope
2398            .scoped_input(ScopeCapability::LlmRead)
2399            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2400                node_id: node.id.clone(),
2401                source,
2402            })?;
2403
2404        self.execute_llm_with_policy_for_scope(
2405            step,
2406            node,
2407            model,
2408            prompt,
2409            scoped_input,
2410            cancellation,
2411        )
2412        .await
2413    }
2414
2415    async fn execute_parallel_branch(
2416        &self,
2417        step: usize,
2418        parallel_node_id: &str,
2419        branch_node: &Node,
2420        scoped_input: Value,
2421        cancellation: Option<&dyn CancellationSignal>,
2422    ) -> Result<(String, Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
2423        match &branch_node.kind {
2424            NodeKind::Llm {
2425                model,
2426                prompt,
2427                next: _,
2428            } => {
2429                let (output, retries) = self
2430                    .execute_llm_with_policy_for_scope(
2431                        step,
2432                        branch_node,
2433                        model,
2434                        prompt,
2435                        scoped_input,
2436                        cancellation,
2437                    )
2438                    .await?;
2439                Ok((
2440                    branch_node.id.clone(),
2441                    Value::String(output.content),
2442                    retries,
2443                ))
2444            }
2445            NodeKind::Tool { tool, input, .. } => {
2446                let executor = self.tool_executor.ok_or_else(|| {
2447                    WorkflowRuntimeError::MissingToolExecutor {
2448                        node_id: branch_node.id.clone(),
2449                    }
2450                })?;
2451                let (output, retries) = self
2452                    .execute_tool_with_policy_for_scope(ToolPolicyRequest {
2453                        step,
2454                        node: branch_node,
2455                        tool,
2456                        input,
2457                        executor,
2458                        scoped_input,
2459                        cancellation,
2460                    })
2461                    .await?;
2462                Ok((branch_node.id.clone(), output, retries))
2463            }
2464            _ => Err(WorkflowRuntimeError::ParallelBranchUnsupported {
2465                node_id: parallel_node_id.to_string(),
2466                branch_id: branch_node.id.clone(),
2467                reason: "only llm/tool branches are supported".to_string(),
2468            }),
2469        }
2470    }
2471
2472    async fn execute_llm_with_policy_for_scope(
2473        &self,
2474        step: usize,
2475        node: &Node,
2476        model: &str,
2477        prompt: &str,
2478        scoped_input: Value,
2479        cancellation: Option<&dyn CancellationSignal>,
2480    ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
2481        let max_attempts = self.options.llm_node_policy.max_retries.saturating_add(1);
2482        let mut retry_events = Vec::new();
2483
2484        for attempt in 1..=max_attempts {
2485            check_cancelled(cancellation)?;
2486
2487            let execution = self.llm_executor.execute(LlmExecutionInput {
2488                node_id: node.id.clone(),
2489                model: model.to_string(),
2490                prompt: prompt.to_string(),
2491                scoped_input: scoped_input.clone(),
2492            });
2493
2494            let outcome = if let Some(timeout_duration) = self.options.llm_node_policy.timeout {
2495                match timeout(timeout_duration, execution).await {
2496                    Ok(result) => result,
2497                    Err(_) => {
2498                        if attempt == max_attempts {
2499                            return Err(WorkflowRuntimeError::LlmTimeout {
2500                                node_id: node.id.clone(),
2501                                timeout_ms: timeout_duration.as_millis(),
2502                                attempts: attempt,
2503                            });
2504                        }
2505                        retry_events.push(WorkflowRetryEvent {
2506                            step,
2507                            node_id: node.id.clone(),
2508                            operation: "llm".to_string(),
2509                            failed_attempt: attempt,
2510                            reason: format!(
2511                                "attempt {} timed out after {} ms",
2512                                attempt,
2513                                timeout_duration.as_millis()
2514                            ),
2515                        });
2516                        check_cancelled(cancellation)?;
2517                        continue;
2518                    }
2519                }
2520            } else {
2521                execution.await
2522            };
2523
2524            match outcome {
2525                Ok(output) => return Ok((output, retry_events)),
2526                Err(last_error) => {
2527                    if attempt == max_attempts {
2528                        return Err(WorkflowRuntimeError::LlmRetryExhausted {
2529                            node_id: node.id.clone(),
2530                            attempts: attempt,
2531                            last_error,
2532                        });
2533                    }
2534                    retry_events.push(WorkflowRetryEvent {
2535                        step,
2536                        node_id: node.id.clone(),
2537                        operation: "llm".to_string(),
2538                        failed_attempt: attempt,
2539                        reason: last_error.to_string(),
2540                    });
2541                    check_cancelled(cancellation)?;
2542                }
2543            }
2544        }
2545
2546        unreachable!("llm attempts loop always returns")
2547    }
2548
2549    async fn execute_tool_with_policy_for_scope(
2550        &self,
2551        request: ToolPolicyRequest<'_>,
2552    ) -> Result<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
2553        let ToolPolicyRequest {
2554            step,
2555            node,
2556            tool,
2557            input,
2558            executor,
2559            scoped_input,
2560            cancellation,
2561        } = request;
2562        let max_attempts = self.options.tool_node_policy.max_retries.saturating_add(1);
2563        let mut retry_events = Vec::new();
2564
2565        for attempt in 1..=max_attempts {
2566            check_cancelled(cancellation)?;
2567
2568            let execution = executor.execute_tool(ToolExecutionInput {
2569                node_id: node.id.clone(),
2570                tool: tool.to_string(),
2571                input: input.clone(),
2572                scoped_input: scoped_input.clone(),
2573            });
2574
2575            let outcome = if let Some(timeout_duration) = self.options.tool_node_policy.timeout {
2576                match timeout(timeout_duration, execution).await {
2577                    Ok(result) => result,
2578                    Err(_) => {
2579                        if attempt == max_attempts {
2580                            return Err(WorkflowRuntimeError::ToolTimeout {
2581                                node_id: node.id.clone(),
2582                                timeout_ms: timeout_duration.as_millis(),
2583                                attempts: attempt,
2584                            });
2585                        }
2586                        retry_events.push(WorkflowRetryEvent {
2587                            step,
2588                            node_id: node.id.clone(),
2589                            operation: "tool".to_string(),
2590                            failed_attempt: attempt,
2591                            reason: format!(
2592                                "attempt {} timed out after {} ms",
2593                                attempt,
2594                                timeout_duration.as_millis()
2595                            ),
2596                        });
2597                        check_cancelled(cancellation)?;
2598                        continue;
2599                    }
2600                }
2601            } else {
2602                execution.await
2603            };
2604
2605            match outcome {
2606                Ok(output) => return Ok((output, retry_events)),
2607                Err(last_error) => {
2608                    if attempt == max_attempts {
2609                        return Err(WorkflowRuntimeError::ToolRetryExhausted {
2610                            node_id: node.id.clone(),
2611                            attempts: attempt,
2612                            last_error,
2613                        });
2614                    }
2615                    retry_events.push(WorkflowRetryEvent {
2616                        step,
2617                        node_id: node.id.clone(),
2618                        operation: "tool".to_string(),
2619                        failed_attempt: attempt,
2620                        reason: last_error.to_string(),
2621                    });
2622                    check_cancelled(cancellation)?;
2623                }
2624            }
2625        }
2626
2627        unreachable!("tool attempts loop always returns")
2628    }
2629}
2630
2631#[derive(Debug)]
2632struct RuntimeScope {
2633    workflow_input: Value,
2634    node_outputs: BTreeMap<String, Value>,
2635    loop_iterations: HashMap<String, u32>,
2636    debounce_last_seen: HashMap<String, usize>,
2637    throttle_last_pass: HashMap<String, usize>,
2638    cache_entries: BTreeMap<String, Value>,
2639    last_llm_output: Option<String>,
2640    last_tool_output: Option<Value>,
2641}
2642
2643struct ToolPolicyRequest<'a> {
2644    step: usize,
2645    node: &'a Node,
2646    tool: &'a str,
2647    input: &'a Value,
2648    executor: &'a dyn ToolExecutor,
2649    scoped_input: Value,
2650    cancellation: Option<&'a dyn CancellationSignal>,
2651}
2652
2653struct LlmNodeSpec<'a> {
2654    model: &'a str,
2655    prompt: &'a str,
2656    next: &'a Option<String>,
2657}
2658
2659struct ToolNodeSpec<'a> {
2660    tool: &'a str,
2661    input: &'a Value,
2662    next: &'a Option<String>,
2663}
2664
2665struct ConditionNodeSpec<'a> {
2666    expression: &'a str,
2667    on_true: &'a str,
2668    on_false: &'a str,
2669}
2670
2671struct ParallelNodeSpec<'a> {
2672    node_index: &'a HashMap<&'a str, &'a Node>,
2673    branches: &'a [String],
2674    next: &'a str,
2675    max_in_flight: Option<usize>,
2676}
2677
2678struct MergeNodeSpec<'a> {
2679    sources: &'a [String],
2680    policy: &'a MergePolicy,
2681    quorum: Option<usize>,
2682    next: &'a str,
2683}
2684
2685struct MapNodeSpec<'a> {
2686    tool: &'a str,
2687    items_path: &'a str,
2688    next: &'a str,
2689    max_in_flight: Option<usize>,
2690}
2691
2692impl RuntimeScope {
2693    fn new(workflow_input: Value) -> Self {
2694        Self {
2695            workflow_input,
2696            node_outputs: BTreeMap::new(),
2697            loop_iterations: HashMap::new(),
2698            debounce_last_seen: HashMap::new(),
2699            throttle_last_pass: HashMap::new(),
2700            cache_entries: BTreeMap::new(),
2701            last_llm_output: None,
2702            last_tool_output: None,
2703        }
2704    }
2705
2706    fn scoped_input(&self, capability: ScopeCapability) -> Result<Value, ScopeAccessError> {
2707        if !matches!(
2708            capability,
2709            ScopeCapability::LlmRead
2710                | ScopeCapability::ToolRead
2711                | ScopeCapability::ConditionRead
2712                | ScopeCapability::MapRead
2713        ) {
2714            return Err(ScopeAccessError::ReadDenied {
2715                capability: capability.as_str(),
2716            });
2717        }
2718
2719        let mut object = Map::new();
2720        object.insert("input".to_string(), self.workflow_input.clone());
2721        object.insert(
2722            "last_llm_output".to_string(),
2723            self.last_llm_output
2724                .as_ref()
2725                .map_or(Value::Null, |value| Value::String(value.clone())),
2726        );
2727        object.insert(
2728            "last_tool_output".to_string(),
2729            self.last_tool_output.clone().unwrap_or(Value::Null),
2730        );
2731        object.insert(
2732            "node_outputs".to_string(),
2733            Value::Object(
2734                self.node_outputs
2735                    .iter()
2736                    .map(|(key, value)| (key.clone(), value.clone()))
2737                    .collect(),
2738            ),
2739        );
2740        Ok(Value::Object(object))
2741    }
2742
2743    fn record_llm_output(
2744        &mut self,
2745        node_id: &str,
2746        output: String,
2747        capability: ScopeCapability,
2748    ) -> Result<(), ScopeAccessError> {
2749        if capability != ScopeCapability::LlmWrite {
2750            return Err(ScopeAccessError::WriteDenied {
2751                capability: capability.as_str(),
2752            });
2753        }
2754
2755        self.last_llm_output = Some(output.clone());
2756        self.node_outputs
2757            .insert(node_id.to_string(), Value::String(output));
2758        Ok(())
2759    }
2760
2761    fn record_tool_output(
2762        &mut self,
2763        node_id: &str,
2764        output: Value,
2765        capability: ScopeCapability,
2766    ) -> Result<(), ScopeAccessError> {
2767        if capability != ScopeCapability::ToolWrite {
2768            return Err(ScopeAccessError::WriteDenied {
2769                capability: capability.as_str(),
2770            });
2771        }
2772
2773        self.last_tool_output = Some(output.clone());
2774        self.node_outputs.insert(node_id.to_string(), output);
2775        Ok(())
2776    }
2777
2778    fn record_condition_output(
2779        &mut self,
2780        node_id: &str,
2781        evaluated: bool,
2782        capability: ScopeCapability,
2783    ) -> Result<(), ScopeAccessError> {
2784        if capability != ScopeCapability::ConditionWrite {
2785            return Err(ScopeAccessError::WriteDenied {
2786                capability: capability.as_str(),
2787            });
2788        }
2789
2790        self.node_outputs
2791            .insert(node_id.to_string(), Value::Bool(evaluated));
2792        Ok(())
2793    }
2794
2795    fn record_node_output(
2796        &mut self,
2797        node_id: &str,
2798        output: Value,
2799        capability: ScopeCapability,
2800    ) -> Result<(), ScopeAccessError> {
2801        if !matches!(
2802            capability,
2803            ScopeCapability::MapWrite | ScopeCapability::ReduceWrite
2804        ) {
2805            return Err(ScopeAccessError::WriteDenied {
2806                capability: capability.as_str(),
2807            });
2808        }
2809
2810        self.node_outputs.insert(node_id.to_string(), output);
2811        Ok(())
2812    }
2813
2814    fn node_output(&self, node_id: &str) -> Option<&Value> {
2815        self.node_outputs.get(node_id)
2816    }
2817
2818    fn loop_iteration(&self, node_id: &str) -> u32 {
2819        self.loop_iterations.get(node_id).copied().unwrap_or(0)
2820    }
2821
2822    fn set_loop_iteration(&mut self, node_id: &str, iteration: u32) {
2823        self.loop_iterations.insert(node_id.to_string(), iteration);
2824    }
2825
2826    fn clear_loop_iteration(&mut self, node_id: &str) {
2827        self.loop_iterations.remove(node_id);
2828    }
2829
2830    fn debounce(&mut self, node_id: &str, key: &str, step: usize, window_steps: u32) -> bool {
2831        let namespaced = format!("{node_id}:{key}");
2832        let window = window_steps as usize;
2833        let suppressed = self
2834            .debounce_last_seen
2835            .get(&namespaced)
2836            .is_some_and(|last| step.saturating_sub(*last) < window);
2837        self.debounce_last_seen.insert(namespaced, step);
2838        suppressed
2839    }
2840
2841    fn throttle(&mut self, node_id: &str, key: &str, step: usize, window_steps: u32) -> bool {
2842        let namespaced = format!("{node_id}:{key}");
2843        let window = window_steps as usize;
2844        let throttled = self
2845            .throttle_last_pass
2846            .get(&namespaced)
2847            .is_some_and(|last| step.saturating_sub(*last) < window);
2848        if !throttled {
2849            self.throttle_last_pass.insert(namespaced, step);
2850        }
2851        throttled
2852    }
2853
2854    fn put_cache(&mut self, key: &str, value: Value) {
2855        self.cache_entries.insert(key.to_string(), value);
2856    }
2857
2858    fn cache_value(&self, key: &str) -> Option<&Value> {
2859        self.cache_entries.get(key)
2860    }
2861}
2862
2863fn check_cancelled(
2864    cancellation: Option<&dyn CancellationSignal>,
2865) -> Result<(), WorkflowRuntimeError> {
2866    if cancellation.is_some_and(CancellationSignal::is_cancelled) {
2867        Err(WorkflowRuntimeError::Cancelled)
2868    } else {
2869        Ok(())
2870    }
2871}
2872
2873fn next_trace_timestamp(clock: &mut u64) -> u64 {
2874    let timestamp = *clock;
2875    *clock = clock.saturating_add(1);
2876    timestamp
2877}
2878
2879fn build_prompt_with_scope(prompt: &str, scoped_input: &Value) -> String {
2880    format!("{}\n\nScoped context:\n{}", prompt, scoped_input)
2881}
2882
2883fn build_node_index(workflow: &WorkflowDefinition) -> HashMap<&str, &Node> {
2884    let mut index = HashMap::with_capacity(workflow.nodes.len());
2885    for node in &workflow.nodes {
2886        index.insert(node.id.as_str(), node);
2887    }
2888    index
2889}
2890
2891fn find_start_node_id(workflow: &WorkflowDefinition) -> Result<String, WorkflowRuntimeError> {
2892    workflow
2893        .nodes
2894        .iter()
2895        .find_map(|node| match node.kind {
2896            NodeKind::Start { .. } => Some(node.id.clone()),
2897            _ => None,
2898        })
2899        .ok_or(WorkflowRuntimeError::MissingStartNode)
2900}
2901
2902fn next_node_id(data: &NodeExecutionData) -> Option<String> {
2903    match data {
2904        NodeExecutionData::Start { next }
2905        | NodeExecutionData::Llm { next, .. }
2906        | NodeExecutionData::Tool { next, .. }
2907        | NodeExecutionData::Condition { next, .. }
2908        | NodeExecutionData::Debounce { next, .. }
2909        | NodeExecutionData::Throttle { next, .. }
2910        | NodeExecutionData::RetryCompensate { next, .. }
2911        | NodeExecutionData::HumanInTheLoop { next, .. }
2912        | NodeExecutionData::CacheRead { next, .. }
2913        | NodeExecutionData::CacheWrite { next, .. }
2914        | NodeExecutionData::EventTrigger { next, .. }
2915        | NodeExecutionData::Router { next, .. }
2916        | NodeExecutionData::Transform { next, .. }
2917        | NodeExecutionData::Loop { next, .. }
2918        | NodeExecutionData::Subgraph { next, .. }
2919        | NodeExecutionData::Batch { next, .. }
2920        | NodeExecutionData::Filter { next, .. }
2921        | NodeExecutionData::Parallel { next, .. }
2922        | NodeExecutionData::Merge { next, .. }
2923        | NodeExecutionData::Map { next, .. }
2924        | NodeExecutionData::Reduce { next, .. } => Some(next.clone()),
2925        NodeExecutionData::End => None,
2926    }
2927}
2928
2929fn resolve_path<'a>(scope: &'a Value, path: &str) -> Option<&'a Value> {
2930    let mut current = scope;
2931    for segment in path.split('.') {
2932        if segment.is_empty() {
2933            continue;
2934        }
2935        current = current.get(segment)?;
2936    }
2937    Some(current)
2938}
2939
2940fn resolve_string_path(scope: &Value, path: &str) -> Option<String> {
2941    resolve_path(scope, path)
2942        .and_then(Value::as_str)
2943        .map(str::to_string)
2944}
2945
2946fn evaluate_human_decision(value: &Value) -> Option<bool> {
2947    match value {
2948        Value::Bool(flag) => Some(*flag),
2949        Value::String(text) => {
2950            let normalized = text.trim().to_ascii_lowercase();
2951            match normalized.as_str() {
2952                "approve" | "approved" | "yes" | "true" => Some(true),
2953                "reject" | "rejected" | "no" | "false" => Some(false),
2954                _ => None,
2955            }
2956        }
2957        _ => None,
2958    }
2959}
2960
2961fn evaluate_transform_expression(expression: &str, scope: &Value) -> Result<Value, String> {
2962    let trimmed = expression.trim();
2963    if trimmed.is_empty() {
2964        return Err("expression is empty".to_string());
2965    }
2966
2967    if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
2968        return Ok(value);
2969    }
2970
2971    let path = trimmed.strip_prefix("$.").unwrap_or(trimmed);
2972    resolve_path(scope, path)
2973        .cloned()
2974        .ok_or_else(|| format!("path '{path}' not found in scoped input"))
2975}
2976
2977fn map_item_scoped_input(base_scope: &Value, item: &Value, index: usize) -> Value {
2978    let mut object = match base_scope {
2979        Value::Object(map) => map.clone(),
2980        _ => Map::new(),
2981    };
2982    object.insert("map_item".to_string(), item.clone());
2983    object.insert("map_index".to_string(), Value::from(index as u64));
2984    Value::Object(object)
2985}
2986
2987fn enforce_expression_scope_budget(
2988    node_id: &str,
2989    scoped_input: &Value,
2990    max_expression_scope_bytes: usize,
2991) -> Result<(), WorkflowRuntimeError> {
2992    let size = serde_json::to_vec(scoped_input)
2993        .map(|bytes| bytes.len())
2994        .unwrap_or(max_expression_scope_bytes.saturating_add(1));
2995    if size > max_expression_scope_bytes {
2996        return Err(WorkflowRuntimeError::ExpressionScopeLimitExceeded {
2997            node_id: node_id.to_string(),
2998            actual_bytes: size,
2999            limit_bytes: max_expression_scope_bytes,
3000        });
3001    }
3002    Ok(())
3003}
3004
3005fn reduce_value(
3006    node_id: &str,
3007    source: &str,
3008    operation: &ReduceOperation,
3009    source_value: Value,
3010) -> Result<Value, WorkflowRuntimeError> {
3011    let items =
3012        source_value
3013            .as_array()
3014            .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
3015                node_id: node_id.to_string(),
3016                source_node: source.to_string(),
3017                reason: "expected source output to be an array".to_string(),
3018            })?;
3019
3020    match operation {
3021        ReduceOperation::Count => Ok(Value::from(items.len() as u64)),
3022        ReduceOperation::Sum => {
3023            let mut sum = 0.0f64;
3024            for value in items {
3025                let number =
3026                    value
3027                        .as_f64()
3028                        .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
3029                            node_id: node_id.to_string(),
3030                            source_node: source.to_string(),
3031                            reason: "sum operation requires numeric array values".to_string(),
3032                        })?;
3033                sum += number;
3034            }
3035            let number = serde_json::Number::from_f64(sum).ok_or_else(|| {
3036                WorkflowRuntimeError::InvalidReduceInput {
3037                    node_id: node_id.to_string(),
3038                    source_node: source.to_string(),
3039                    reason: "sum produced non-finite value".to_string(),
3040                }
3041            })?;
3042            Ok(Value::Number(number))
3043        }
3044    }
3045}
3046
3047#[cfg(test)]
3048mod tests {
3049    use std::sync::atomic::{AtomicUsize, Ordering};
3050    use std::sync::{Arc, Mutex};
3051    use std::time::Duration;
3052
3053    use async_trait::async_trait;
3054    use serde_json::json;
3055    use tokio::time::sleep;
3056
3057    use super::*;
3058    use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
3059
3060    struct MockLlmExecutor {
3061        output: String,
3062    }
3063
3064    #[async_trait]
3065    impl LlmExecutor for MockLlmExecutor {
3066        async fn execute(
3067            &self,
3068            _input: LlmExecutionInput,
3069        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3070            Ok(LlmExecutionOutput {
3071                content: self.output.clone(),
3072            })
3073        }
3074    }
3075
3076    struct MockToolExecutor {
3077        output: Value,
3078        fail: bool,
3079    }
3080
3081    #[async_trait]
3082    impl ToolExecutor for MockToolExecutor {
3083        async fn execute_tool(
3084            &self,
3085            input: ToolExecutionInput,
3086        ) -> Result<Value, ToolExecutionError> {
3087            if self.fail {
3088                return Err(ToolExecutionError::Failed(format!(
3089                    "tool '{}' failed intentionally",
3090                    input.tool
3091                )));
3092            }
3093            Ok(self.output.clone())
3094        }
3095    }
3096
3097    struct SequencedLlmExecutor {
3098        responses: Mutex<Vec<Result<LlmExecutionOutput, LlmExecutionError>>>,
3099        calls: AtomicUsize,
3100    }
3101
3102    #[async_trait]
3103    impl LlmExecutor for SequencedLlmExecutor {
3104        async fn execute(
3105            &self,
3106            _input: LlmExecutionInput,
3107        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3108            self.calls.fetch_add(1, Ordering::Relaxed);
3109            self.responses
3110                .lock()
3111                .expect("sequenced llm lock poisoned")
3112                .remove(0)
3113        }
3114    }
3115
3116    struct SlowToolExecutor {
3117        delay: Duration,
3118    }
3119
3120    #[async_trait]
3121    impl ToolExecutor for SlowToolExecutor {
3122        async fn execute_tool(
3123            &self,
3124            _input: ToolExecutionInput,
3125        ) -> Result<Value, ToolExecutionError> {
3126            sleep(self.delay).await;
3127            Ok(json!({"status": "slow-ok"}))
3128        }
3129    }
3130
3131    struct IncrementingToolExecutor {
3132        value: AtomicUsize,
3133    }
3134
3135    #[async_trait]
3136    impl ToolExecutor for IncrementingToolExecutor {
3137        async fn execute_tool(
3138            &self,
3139            _input: ToolExecutionInput,
3140        ) -> Result<Value, ToolExecutionError> {
3141            let next = self.value.fetch_add(1, Ordering::Relaxed) + 1;
3142            Ok(json!(next))
3143        }
3144    }
3145
3146    struct EchoInputToolExecutor;
3147
3148    #[async_trait]
3149    impl ToolExecutor for EchoInputToolExecutor {
3150        async fn execute_tool(
3151            &self,
3152            input: ToolExecutionInput,
3153        ) -> Result<Value, ToolExecutionError> {
3154            Ok(input.input)
3155        }
3156    }
3157
3158    struct RetryCompensateToolExecutor {
3159        attempts: AtomicUsize,
3160    }
3161
3162    #[async_trait]
3163    impl ToolExecutor for RetryCompensateToolExecutor {
3164        async fn execute_tool(
3165            &self,
3166            input: ToolExecutionInput,
3167        ) -> Result<Value, ToolExecutionError> {
3168            match input.tool.as_str() {
3169                "unstable_primary" => {
3170                    let current = self.attempts.fetch_add(1, Ordering::Relaxed) + 1;
3171                    if current <= 1 {
3172                        Err(ToolExecutionError::Failed("primary failed".to_string()))
3173                    } else {
3174                        Ok(json!({"primary_attempt": current}))
3175                    }
3176                }
3177                "always_fail" => Err(ToolExecutionError::Failed("always fail".to_string())),
3178                "compensate" => Ok(json!({"compensated": true})),
3179                _ => Err(ToolExecutionError::NotFound {
3180                    tool: input.tool.clone(),
3181                }),
3182            }
3183        }
3184    }
3185
3186    struct CancellingLlmExecutor {
3187        cancel_flag: Arc<AtomicBool>,
3188        calls: AtomicUsize,
3189    }
3190
3191    #[async_trait]
3192    impl LlmExecutor for CancellingLlmExecutor {
3193        async fn execute(
3194            &self,
3195            _input: LlmExecutionInput,
3196        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3197            self.calls.fetch_add(1, Ordering::Relaxed);
3198            self.cancel_flag.store(true, Ordering::Relaxed);
3199            Err(LlmExecutionError::Client("transient failure".to_string()))
3200        }
3201    }
3202
3203    fn linear_workflow() -> WorkflowDefinition {
3204        WorkflowDefinition {
3205            version: "v0".to_string(),
3206            name: "linear".to_string(),
3207            nodes: vec![
3208                Node {
3209                    id: "start".to_string(),
3210                    kind: NodeKind::Start {
3211                        next: "llm".to_string(),
3212                    },
3213                },
3214                Node {
3215                    id: "llm".to_string(),
3216                    kind: NodeKind::Llm {
3217                        model: "gpt-4".to_string(),
3218                        prompt: "Summarize".to_string(),
3219                        next: Some("tool".to_string()),
3220                    },
3221                },
3222                Node {
3223                    id: "tool".to_string(),
3224                    kind: NodeKind::Tool {
3225                        tool: "extract".to_string(),
3226                        input: json!({"k": "v"}),
3227                        next: Some("end".to_string()),
3228                    },
3229                },
3230                Node {
3231                    id: "end".to_string(),
3232                    kind: NodeKind::End,
3233                },
3234            ],
3235        }
3236    }
3237
3238    fn llm_only_workflow() -> WorkflowDefinition {
3239        WorkflowDefinition {
3240            version: "v0".to_string(),
3241            name: "llm-only".to_string(),
3242            nodes: vec![
3243                Node {
3244                    id: "start".to_string(),
3245                    kind: NodeKind::Start {
3246                        next: "llm".to_string(),
3247                    },
3248                },
3249                Node {
3250                    id: "llm".to_string(),
3251                    kind: NodeKind::Llm {
3252                        model: "gpt-4".to_string(),
3253                        prompt: "Summarize".to_string(),
3254                        next: Some("end".to_string()),
3255                    },
3256                },
3257                Node {
3258                    id: "end".to_string(),
3259                    kind: NodeKind::End,
3260                },
3261            ],
3262        }
3263    }
3264
3265    fn loop_workflow(max_iterations: Option<u32>) -> WorkflowDefinition {
3266        WorkflowDefinition {
3267            version: "v0".to_string(),
3268            name: "loop-workflow".to_string(),
3269            nodes: vec![
3270                Node {
3271                    id: "start".to_string(),
3272                    kind: NodeKind::Start {
3273                        next: "loop".to_string(),
3274                    },
3275                },
3276                Node {
3277                    id: "loop".to_string(),
3278                    kind: NodeKind::Loop {
3279                        condition: "last_tool_output != 3".to_string(),
3280                        body: "counter".to_string(),
3281                        next: "end".to_string(),
3282                        max_iterations,
3283                    },
3284                },
3285                Node {
3286                    id: "counter".to_string(),
3287                    kind: NodeKind::Tool {
3288                        tool: "counter".to_string(),
3289                        input: json!({}),
3290                        next: Some("loop".to_string()),
3291                    },
3292                },
3293                Node {
3294                    id: "end".to_string(),
3295                    kind: NodeKind::End,
3296                },
3297            ],
3298        }
3299    }
3300
3301    fn parallel_merge_workflow(policy: MergePolicy, quorum: Option<usize>) -> WorkflowDefinition {
3302        WorkflowDefinition {
3303            version: "v0".to_string(),
3304            name: "parallel-merge".to_string(),
3305            nodes: vec![
3306                Node {
3307                    id: "start".to_string(),
3308                    kind: NodeKind::Start {
3309                        next: "parallel".to_string(),
3310                    },
3311                },
3312                Node {
3313                    id: "parallel".to_string(),
3314                    kind: NodeKind::Parallel {
3315                        branches: vec!["tool_a".to_string(), "tool_b".to_string()],
3316                        next: "merge".to_string(),
3317                        max_in_flight: Some(2),
3318                    },
3319                },
3320                Node {
3321                    id: "tool_a".to_string(),
3322                    kind: NodeKind::Tool {
3323                        tool: "extract".to_string(),
3324                        input: json!({"value": 1}),
3325                        next: Some("end".to_string()),
3326                    },
3327                },
3328                Node {
3329                    id: "tool_b".to_string(),
3330                    kind: NodeKind::Tool {
3331                        tool: "extract".to_string(),
3332                        input: json!({"value": 2}),
3333                        next: Some("end".to_string()),
3334                    },
3335                },
3336                Node {
3337                    id: "merge".to_string(),
3338                    kind: NodeKind::Merge {
3339                        sources: vec!["tool_a".to_string(), "tool_b".to_string()],
3340                        policy,
3341                        quorum,
3342                        next: "end".to_string(),
3343                    },
3344                },
3345                Node {
3346                    id: "end".to_string(),
3347                    kind: NodeKind::End,
3348                },
3349            ],
3350        }
3351    }
3352
3353    fn map_reduce_workflow(operation: ReduceOperation) -> WorkflowDefinition {
3354        WorkflowDefinition {
3355            version: "v0".to_string(),
3356            name: "map-reduce".to_string(),
3357            nodes: vec![
3358                Node {
3359                    id: "start".to_string(),
3360                    kind: NodeKind::Start {
3361                        next: "map".to_string(),
3362                    },
3363                },
3364                Node {
3365                    id: "map".to_string(),
3366                    kind: NodeKind::Map {
3367                        tool: "counter".to_string(),
3368                        items_path: "input.values".to_string(),
3369                        next: "reduce".to_string(),
3370                        max_in_flight: Some(3),
3371                    },
3372                },
3373                Node {
3374                    id: "reduce".to_string(),
3375                    kind: NodeKind::Reduce {
3376                        source: "map".to_string(),
3377                        operation,
3378                        next: "end".to_string(),
3379                    },
3380                },
3381                Node {
3382                    id: "end".to_string(),
3383                    kind: NodeKind::End,
3384                },
3385            ],
3386        }
3387    }
3388
3389    fn debounce_and_throttle_workflow() -> WorkflowDefinition {
3390        WorkflowDefinition {
3391            version: "v0".to_string(),
3392            name: "debounce-throttle".to_string(),
3393            nodes: vec![
3394                Node {
3395                    id: "start".to_string(),
3396                    kind: NodeKind::Start {
3397                        next: "debounce_a".to_string(),
3398                    },
3399                },
3400                Node {
3401                    id: "debounce_a".to_string(),
3402                    kind: NodeKind::Debounce {
3403                        key_path: "input.key".to_string(),
3404                        window_steps: 3,
3405                        next: "debounce_a".to_string(),
3406                        on_suppressed: Some("throttle_a".to_string()),
3407                    },
3408                },
3409                Node {
3410                    id: "throttle_a".to_string(),
3411                    kind: NodeKind::Throttle {
3412                        key_path: "input.key".to_string(),
3413                        window_steps: 3,
3414                        next: "throttle_a".to_string(),
3415                        on_throttled: Some("end_throttled".to_string()),
3416                    },
3417                },
3418                Node {
3419                    id: "end_throttled".to_string(),
3420                    kind: NodeKind::End,
3421                },
3422            ],
3423        }
3424    }
3425
3426    fn extended_nodes_workflow() -> WorkflowDefinition {
3427        WorkflowDefinition {
3428            version: "v0".to_string(),
3429            name: "extended-nodes".to_string(),
3430            nodes: vec![
3431                Node {
3432                    id: "start".to_string(),
3433                    kind: NodeKind::Start {
3434                        next: "event".to_string(),
3435                    },
3436                },
3437                Node {
3438                    id: "event".to_string(),
3439                    kind: NodeKind::EventTrigger {
3440                        event: "webhook".to_string(),
3441                        event_path: "input.event_type".to_string(),
3442                        next: "cache_write".to_string(),
3443                        on_mismatch: Some("end_mismatch".to_string()),
3444                    },
3445                },
3446                Node {
3447                    id: "cache_write".to_string(),
3448                    kind: NodeKind::CacheWrite {
3449                        key_path: "input.cache_key".to_string(),
3450                        value_path: "input.payload".to_string(),
3451                        next: "cache_read".to_string(),
3452                    },
3453                },
3454                Node {
3455                    id: "cache_read".to_string(),
3456                    kind: NodeKind::CacheRead {
3457                        key_path: "input.cache_key".to_string(),
3458                        next: "router".to_string(),
3459                        on_miss: Some("end_miss".to_string()),
3460                    },
3461                },
3462                Node {
3463                    id: "router".to_string(),
3464                    kind: NodeKind::Router {
3465                        routes: vec![crate::ir::RouterRoute {
3466                            when: "input.mode == 'manual'".to_string(),
3467                            next: "human".to_string(),
3468                        }],
3469                        default: "transform".to_string(),
3470                    },
3471                },
3472                Node {
3473                    id: "human".to_string(),
3474                    kind: NodeKind::HumanInTheLoop {
3475                        decision_path: "input.approval".to_string(),
3476                        response_path: Some("input.review_notes".to_string()),
3477                        on_approve: "transform".to_string(),
3478                        on_reject: "end_rejected".to_string(),
3479                    },
3480                },
3481                Node {
3482                    id: "transform".to_string(),
3483                    kind: NodeKind::Transform {
3484                        expression: "node_outputs.cache_read.value".to_string(),
3485                        next: "end".to_string(),
3486                    },
3487                },
3488                Node {
3489                    id: "end".to_string(),
3490                    kind: NodeKind::End,
3491                },
3492                Node {
3493                    id: "end_mismatch".to_string(),
3494                    kind: NodeKind::End,
3495                },
3496                Node {
3497                    id: "end_miss".to_string(),
3498                    kind: NodeKind::End,
3499                },
3500                Node {
3501                    id: "end_rejected".to_string(),
3502                    kind: NodeKind::End,
3503                },
3504            ],
3505        }
3506    }
3507
3508    fn retry_compensate_workflow(primary_tool: &str) -> WorkflowDefinition {
3509        WorkflowDefinition {
3510            version: "v0".to_string(),
3511            name: "retry-compensate".to_string(),
3512            nodes: vec![
3513                Node {
3514                    id: "start".to_string(),
3515                    kind: NodeKind::Start {
3516                        next: "retry_comp".to_string(),
3517                    },
3518                },
3519                Node {
3520                    id: "retry_comp".to_string(),
3521                    kind: NodeKind::RetryCompensate {
3522                        tool: primary_tool.to_string(),
3523                        input: json!({"job": "run"}),
3524                        max_retries: 1,
3525                        compensate_tool: "compensate".to_string(),
3526                        compensate_input: json!({"job": "rollback"}),
3527                        next: "end".to_string(),
3528                        on_compensated: Some("end_compensated".to_string()),
3529                    },
3530                },
3531                Node {
3532                    id: "end".to_string(),
3533                    kind: NodeKind::End,
3534                },
3535                Node {
3536                    id: "end_compensated".to_string(),
3537                    kind: NodeKind::End,
3538                },
3539            ],
3540        }
3541    }
3542
3543    #[tokio::test]
3544    async fn executes_happy_path_linear_flow() {
3545        let llm = MockLlmExecutor {
3546            output: "ok".to_string(),
3547        };
3548        let tools = MockToolExecutor {
3549            output: json!({"status": "done"}),
3550            fail: false,
3551        };
3552        let runtime = WorkflowRuntime::new(
3553            linear_workflow(),
3554            &llm,
3555            Some(&tools),
3556            WorkflowRuntimeOptions::default(),
3557        );
3558
3559        let result = runtime
3560            .execute(json!({"request_id": "r1"}), None)
3561            .await
3562            .expect("linear workflow should succeed");
3563
3564        assert_eq!(result.workflow_name, "linear");
3565        assert_eq!(result.terminal_node_id, "end");
3566        assert_eq!(result.node_executions.len(), 4);
3567        assert_eq!(
3568            result.node_outputs.get("llm"),
3569            Some(&Value::String("ok".to_string()))
3570        );
3571        assert_eq!(
3572            result.node_outputs.get("tool"),
3573            Some(&json!({"status": "done"}))
3574        );
3575        assert_eq!(result.events.len(), 8);
3576        assert!(result.retry_events.is_empty());
3577        assert!(result.trace.is_some());
3578        assert_eq!(result.replay_report, None);
3579    }
3580
3581    #[tokio::test]
3582    async fn linear_flow_records_expected_node_execution_payloads() {
3583        let llm = MockLlmExecutor {
3584            output: "ok".to_string(),
3585        };
3586        let tools = MockToolExecutor {
3587            output: json!({"status": "done"}),
3588            fail: false,
3589        };
3590        let runtime = WorkflowRuntime::new(
3591            linear_workflow(),
3592            &llm,
3593            Some(&tools),
3594            WorkflowRuntimeOptions::default(),
3595        );
3596
3597        let result = runtime
3598            .execute(json!({"request_id": "r1"}), None)
3599            .await
3600            .expect("linear workflow should succeed");
3601
3602        assert!(matches!(
3603            result.node_executions[0].data,
3604            NodeExecutionData::Start { ref next } if next == "llm"
3605        ));
3606        assert!(matches!(
3607            result.node_executions[1].data,
3608            NodeExecutionData::Llm {
3609                ref model,
3610                ref output,
3611                ref next
3612            } if model == "gpt-4" && output == "ok" && next == "tool"
3613        ));
3614        assert!(matches!(
3615            result.node_executions[2].data,
3616            NodeExecutionData::Tool {
3617                ref tool,
3618                ref next,
3619                ..
3620            } if tool == "extract" && next == "end"
3621        ));
3622    }
3623
3624    #[tokio::test]
3625    async fn executes_conditional_branching() {
3626        let workflow = WorkflowDefinition {
3627            version: "v0".to_string(),
3628            name: "conditional".to_string(),
3629            nodes: vec![
3630                Node {
3631                    id: "start".to_string(),
3632                    kind: NodeKind::Start {
3633                        next: "condition".to_string(),
3634                    },
3635                },
3636                Node {
3637                    id: "condition".to_string(),
3638                    kind: NodeKind::Condition {
3639                        expression: "input.approved".to_string(),
3640                        on_true: "end_true".to_string(),
3641                        on_false: "end_false".to_string(),
3642                    },
3643                },
3644                Node {
3645                    id: "end_true".to_string(),
3646                    kind: NodeKind::End,
3647                },
3648                Node {
3649                    id: "end_false".to_string(),
3650                    kind: NodeKind::End,
3651                },
3652            ],
3653        };
3654
3655        let llm = MockLlmExecutor {
3656            output: "unused".to_string(),
3657        };
3658        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3659
3660        let result = runtime
3661            .execute(json!({"approved": true}), None)
3662            .await
3663            .expect("conditional workflow should succeed");
3664
3665        assert_eq!(result.terminal_node_id, "end_true");
3666    }
3667
3668    #[tokio::test]
3669    async fn executes_conditional_false_branch() {
3670        let workflow = WorkflowDefinition {
3671            version: "v0".to_string(),
3672            name: "conditional-false".to_string(),
3673            nodes: vec![
3674                Node {
3675                    id: "start".to_string(),
3676                    kind: NodeKind::Start {
3677                        next: "condition".to_string(),
3678                    },
3679                },
3680                Node {
3681                    id: "condition".to_string(),
3682                    kind: NodeKind::Condition {
3683                        expression: "input.approved".to_string(),
3684                        on_true: "end_true".to_string(),
3685                        on_false: "end_false".to_string(),
3686                    },
3687                },
3688                Node {
3689                    id: "end_true".to_string(),
3690                    kind: NodeKind::End,
3691                },
3692                Node {
3693                    id: "end_false".to_string(),
3694                    kind: NodeKind::End,
3695                },
3696            ],
3697        };
3698        let llm = MockLlmExecutor {
3699            output: "unused".to_string(),
3700        };
3701        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3702
3703        let result = runtime
3704            .execute(json!({"approved": false}), None)
3705            .await
3706            .expect("conditional workflow should take false branch");
3707
3708        assert_eq!(result.terminal_node_id, "end_false");
3709    }
3710
3711    #[tokio::test]
3712    async fn fails_when_tool_executor_is_missing() {
3713        let llm = MockLlmExecutor {
3714            output: "ok".to_string(),
3715        };
3716        let runtime = WorkflowRuntime::new(
3717            linear_workflow(),
3718            &llm,
3719            None,
3720            WorkflowRuntimeOptions::default(),
3721        );
3722
3723        let error = runtime
3724            .execute(json!({}), None)
3725            .await
3726            .expect_err("workflow should fail without tool executor");
3727
3728        assert!(matches!(
3729            error,
3730            WorkflowRuntimeError::MissingToolExecutor { node_id } if node_id == "tool"
3731        ));
3732    }
3733
3734    #[tokio::test]
3735    async fn fails_on_tool_execution_error() {
3736        let llm = MockLlmExecutor {
3737            output: "ok".to_string(),
3738        };
3739        let tools = MockToolExecutor {
3740            output: json!({"status": "unused"}),
3741            fail: true,
3742        };
3743        let runtime = WorkflowRuntime::new(
3744            linear_workflow(),
3745            &llm,
3746            Some(&tools),
3747            WorkflowRuntimeOptions::default(),
3748        );
3749
3750        let error = runtime
3751            .execute(json!({}), None)
3752            .await
3753            .expect_err("workflow should fail on tool error");
3754
3755        assert!(matches!(
3756            error,
3757            WorkflowRuntimeError::ToolRetryExhausted { node_id, attempts: 1, .. }
3758                if node_id == "tool"
3759        ));
3760    }
3761
3762    #[tokio::test]
3763    async fn retries_llm_after_transient_failure() {
3764        let llm = SequencedLlmExecutor {
3765            responses: Mutex::new(vec![
3766                Err(LlmExecutionError::Client("temporary".to_string())),
3767                Ok(LlmExecutionOutput {
3768                    content: "recovered".to_string(),
3769                }),
3770            ]),
3771            calls: AtomicUsize::new(0),
3772        };
3773        let runtime = WorkflowRuntime::new(
3774            llm_only_workflow(),
3775            &llm,
3776            None,
3777            WorkflowRuntimeOptions {
3778                llm_node_policy: NodeExecutionPolicy {
3779                    timeout: None,
3780                    max_retries: 1,
3781                },
3782                ..WorkflowRuntimeOptions::default()
3783            },
3784        );
3785
3786        let result = runtime
3787            .execute(json!({"request_id": "r2"}), None)
3788            .await
3789            .expect("llm retry should recover");
3790
3791        assert_eq!(result.terminal_node_id, "end");
3792        assert_eq!(result.node_outputs.get("llm"), Some(&json!("recovered")));
3793        assert_eq!(result.retry_events.len(), 1);
3794        assert_eq!(result.retry_events[0].operation, "llm");
3795        assert_eq!(llm.calls.load(Ordering::Relaxed), 2);
3796    }
3797
3798    #[tokio::test]
3799    async fn times_out_tool_execution_per_policy() {
3800        let llm = MockLlmExecutor {
3801            output: "ok".to_string(),
3802        };
3803        let tool = SlowToolExecutor {
3804            delay: Duration::from_millis(50),
3805        };
3806        let runtime = WorkflowRuntime::new(
3807            linear_workflow(),
3808            &llm,
3809            Some(&tool),
3810            WorkflowRuntimeOptions {
3811                tool_node_policy: NodeExecutionPolicy {
3812                    timeout: Some(Duration::from_millis(5)),
3813                    max_retries: 0,
3814                },
3815                ..WorkflowRuntimeOptions::default()
3816            },
3817        );
3818
3819        let error = runtime
3820            .execute(json!({}), None)
3821            .await
3822            .expect_err("tool execution should time out");
3823
3824        assert!(matches!(
3825            error,
3826            WorkflowRuntimeError::ToolTimeout {
3827                node_id,
3828                timeout_ms: 5,
3829                attempts: 1,
3830            } if node_id == "tool"
3831        ));
3832    }
3833
3834    #[tokio::test]
3835    async fn cancels_between_retry_attempts() {
3836        let cancel_flag = Arc::new(AtomicBool::new(false));
3837        let llm = CancellingLlmExecutor {
3838            cancel_flag: Arc::clone(&cancel_flag),
3839            calls: AtomicUsize::new(0),
3840        };
3841        let runtime = WorkflowRuntime::new(
3842            llm_only_workflow(),
3843            &llm,
3844            None,
3845            WorkflowRuntimeOptions {
3846                llm_node_policy: NodeExecutionPolicy {
3847                    timeout: None,
3848                    max_retries: 3,
3849                },
3850                ..WorkflowRuntimeOptions::default()
3851            },
3852        );
3853
3854        let error = runtime
3855            .execute(json!({}), Some(cancel_flag.as_ref()))
3856            .await
3857            .expect_err("workflow should stop when cancellation is observed");
3858
3859        assert!(matches!(error, WorkflowRuntimeError::Cancelled));
3860        assert_eq!(llm.calls.load(Ordering::Relaxed), 1);
3861    }
3862
3863    #[tokio::test]
3864    async fn enforces_step_limit_guard() {
3865        let workflow = WorkflowDefinition {
3866            version: "v0".to_string(),
3867            name: "loop".to_string(),
3868            nodes: vec![
3869                Node {
3870                    id: "start".to_string(),
3871                    kind: NodeKind::Start {
3872                        next: "condition".to_string(),
3873                    },
3874                },
3875                Node {
3876                    id: "condition".to_string(),
3877                    kind: NodeKind::Condition {
3878                        expression: "true".to_string(),
3879                        on_true: "condition".to_string(),
3880                        on_false: "end".to_string(),
3881                    },
3882                },
3883                Node {
3884                    id: "end".to_string(),
3885                    kind: NodeKind::End,
3886                },
3887            ],
3888        };
3889
3890        let llm = MockLlmExecutor {
3891            output: "unused".to_string(),
3892        };
3893        let runtime = WorkflowRuntime::new(
3894            workflow,
3895            &llm,
3896            None,
3897            WorkflowRuntimeOptions {
3898                max_steps: 3,
3899                ..WorkflowRuntimeOptions::default()
3900            },
3901        );
3902
3903        let error = runtime
3904            .execute(json!({}), None)
3905            .await
3906            .expect_err("workflow should fail on step limit");
3907
3908        assert!(matches!(
3909            error,
3910            WorkflowRuntimeError::StepLimitExceeded { max_steps: 3 }
3911        ));
3912    }
3913
3914    #[tokio::test]
3915    async fn validates_recorded_trace_in_replay_mode() {
3916        let llm = MockLlmExecutor {
3917            output: "ok".to_string(),
3918        };
3919        let tools = MockToolExecutor {
3920            output: json!({"status": "done"}),
3921            fail: false,
3922        };
3923        let runtime = WorkflowRuntime::new(
3924            linear_workflow(),
3925            &llm,
3926            Some(&tools),
3927            WorkflowRuntimeOptions {
3928                replay_mode: WorkflowReplayMode::ValidateRecordedTrace,
3929                ..WorkflowRuntimeOptions::default()
3930            },
3931        );
3932
3933        let result = runtime
3934            .execute(json!({"request_id": "r1"}), None)
3935            .await
3936            .expect("replay validation should pass");
3937
3938        assert!(result.trace.is_some());
3939        assert_eq!(
3940            result.replay_report.as_ref().map(|r| r.total_events),
3941            Some(9)
3942        );
3943    }
3944
3945    #[tokio::test]
3946    async fn executes_loop_until_condition_fails() {
3947        let llm = MockLlmExecutor {
3948            output: "unused".to_string(),
3949        };
3950        let counter = IncrementingToolExecutor {
3951            value: AtomicUsize::new(0),
3952        };
3953        let runtime = WorkflowRuntime::new(
3954            loop_workflow(Some(8)),
3955            &llm,
3956            Some(&counter),
3957            WorkflowRuntimeOptions::default(),
3958        );
3959
3960        let result = runtime
3961            .execute(json!({}), None)
3962            .await
3963            .expect("loop workflow should terminate at end");
3964
3965        assert_eq!(result.terminal_node_id, "end");
3966        assert!(result.node_executions.iter().any(|step| matches!(
3967            step.data,
3968            NodeExecutionData::Loop {
3969                evaluated: false,
3970                ..
3971            }
3972        )));
3973    }
3974
3975    #[tokio::test]
3976    async fn fails_when_loop_exceeds_max_iterations() {
3977        let llm = MockLlmExecutor {
3978            output: "unused".to_string(),
3979        };
3980        let counter = MockToolExecutor {
3981            output: json!(0),
3982            fail: false,
3983        };
3984        let runtime = WorkflowRuntime::new(
3985            loop_workflow(Some(2)),
3986            &llm,
3987            Some(&counter),
3988            WorkflowRuntimeOptions {
3989                max_steps: 20,
3990                ..WorkflowRuntimeOptions::default()
3991            },
3992        );
3993
3994        let error = runtime
3995            .execute(json!({}), None)
3996            .await
3997            .expect_err("loop should fail once max iterations are exceeded");
3998
3999        assert!(matches!(
4000            error,
4001            WorkflowRuntimeError::LoopIterationLimitExceeded {
4002                node_id,
4003                max_iterations: 2
4004            } if node_id == "loop"
4005        ));
4006    }
4007
4008    #[tokio::test]
4009    async fn executes_parallel_then_merge_all() {
4010        let llm = MockLlmExecutor {
4011            output: "unused".to_string(),
4012        };
4013        let tool = EchoInputToolExecutor;
4014        let runtime = WorkflowRuntime::new(
4015            parallel_merge_workflow(MergePolicy::All, None),
4016            &llm,
4017            Some(&tool),
4018            WorkflowRuntimeOptions::default(),
4019        );
4020
4021        let result = runtime
4022            .execute(json!({}), None)
4023            .await
4024            .expect("parallel merge workflow should succeed");
4025
4026        assert_eq!(result.terminal_node_id, "end");
4027        assert_eq!(
4028            result.node_outputs.get("tool_a"),
4029            Some(&json!({"value": 1}))
4030        );
4031        assert_eq!(
4032            result.node_outputs.get("tool_b"),
4033            Some(&json!({"value": 2}))
4034        );
4035        assert_eq!(
4036            result.node_outputs.get("merge"),
4037            Some(&json!([{"value": 1}, {"value": 2}]))
4038        );
4039    }
4040
4041    #[tokio::test]
4042    async fn executes_map_reduce_sum() {
4043        let llm = MockLlmExecutor {
4044            output: "unused".to_string(),
4045        };
4046        let tool = EchoInputToolExecutor;
4047        let runtime = WorkflowRuntime::new(
4048            map_reduce_workflow(ReduceOperation::Sum),
4049            &llm,
4050            Some(&tool),
4051            WorkflowRuntimeOptions::default(),
4052        );
4053
4054        let result = runtime
4055            .execute(json!({"values": [1, 2, 3]}), None)
4056            .await
4057            .expect("map reduce workflow should succeed");
4058
4059        assert_eq!(result.node_outputs.get("map"), Some(&json!([1, 2, 3])));
4060        assert_eq!(result.node_outputs.get("reduce"), Some(&json!(6.0)));
4061    }
4062
4063    #[tokio::test]
4064    async fn fails_map_when_items_path_is_not_array() {
4065        let llm = MockLlmExecutor {
4066            output: "unused".to_string(),
4067        };
4068        let tool = EchoInputToolExecutor;
4069        let runtime = WorkflowRuntime::new(
4070            map_reduce_workflow(ReduceOperation::Count),
4071            &llm,
4072            Some(&tool),
4073            WorkflowRuntimeOptions::default(),
4074        );
4075
4076        let error = runtime
4077            .execute(json!({"values": {"not": "array"}}), None)
4078            .await
4079            .expect_err("map node should fail on non-array path");
4080
4081        assert!(matches!(
4082            error,
4083            WorkflowRuntimeError::MapItemsNotArray {
4084                node_id,
4085                items_path
4086            } if node_id == "map" && items_path == "input.values"
4087        ));
4088    }
4089
4090    #[tokio::test]
4091    async fn executes_subgraph_via_registry() {
4092        let llm = MockLlmExecutor {
4093            output: "nested-ok".to_string(),
4094        };
4095        let subgraph = WorkflowDefinition {
4096            version: "v0".to_string(),
4097            name: "child".to_string(),
4098            nodes: vec![
4099                Node {
4100                    id: "start".to_string(),
4101                    kind: NodeKind::Start {
4102                        next: "llm".to_string(),
4103                    },
4104                },
4105                Node {
4106                    id: "llm".to_string(),
4107                    kind: NodeKind::Llm {
4108                        model: "gpt-4".to_string(),
4109                        prompt: "child".to_string(),
4110                        next: Some("end".to_string()),
4111                    },
4112                },
4113                Node {
4114                    id: "end".to_string(),
4115                    kind: NodeKind::End,
4116                },
4117            ],
4118        };
4119        let parent = WorkflowDefinition {
4120            version: "v0".to_string(),
4121            name: "parent".to_string(),
4122            nodes: vec![
4123                Node {
4124                    id: "start".to_string(),
4125                    kind: NodeKind::Start {
4126                        next: "sub".to_string(),
4127                    },
4128                },
4129                Node {
4130                    id: "sub".to_string(),
4131                    kind: NodeKind::Subgraph {
4132                        graph: "child_graph".to_string(),
4133                        next: Some("end".to_string()),
4134                    },
4135                },
4136                Node {
4137                    id: "end".to_string(),
4138                    kind: NodeKind::End,
4139                },
4140            ],
4141        };
4142
4143        let mut registry = BTreeMap::new();
4144        registry.insert("child_graph".to_string(), subgraph);
4145        let runtime = WorkflowRuntime::new(
4146            parent,
4147            &llm,
4148            None,
4149            WorkflowRuntimeOptions {
4150                subgraph_registry: registry,
4151                ..WorkflowRuntimeOptions::default()
4152            },
4153        );
4154
4155        let result = runtime
4156            .execute(json!({"approved": true}), None)
4157            .await
4158            .expect("subgraph workflow should execute");
4159
4160        assert_eq!(result.terminal_node_id, "end");
4161        assert!(matches!(
4162            result.node_executions[1].data,
4163            NodeExecutionData::Subgraph { .. }
4164        ));
4165    }
4166
4167    #[tokio::test]
4168    async fn executes_batch_and_filter_nodes() {
4169        let llm = MockLlmExecutor {
4170            output: "unused".to_string(),
4171        };
4172        let workflow = WorkflowDefinition {
4173            version: "v0".to_string(),
4174            name: "batch-filter".to_string(),
4175            nodes: vec![
4176                Node {
4177                    id: "start".to_string(),
4178                    kind: NodeKind::Start {
4179                        next: "batch".to_string(),
4180                    },
4181                },
4182                Node {
4183                    id: "batch".to_string(),
4184                    kind: NodeKind::Batch {
4185                        items_path: "input.items".to_string(),
4186                        next: "filter".to_string(),
4187                    },
4188                },
4189                Node {
4190                    id: "filter".to_string(),
4191                    kind: NodeKind::Filter {
4192                        items_path: "node_outputs.batch".to_string(),
4193                        expression: "item.keep == true".to_string(),
4194                        next: "end".to_string(),
4195                    },
4196                },
4197                Node {
4198                    id: "end".to_string(),
4199                    kind: NodeKind::End,
4200                },
4201            ],
4202        };
4203
4204        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
4205        let result = runtime
4206            .execute(
4207                json!({
4208                    "items": [
4209                        {"id": 1, "keep": true},
4210                        {"id": 2, "keep": false},
4211                        {"id": 3, "keep": true}
4212                    ]
4213                }),
4214                None,
4215            )
4216            .await
4217            .expect("batch/filter workflow should execute");
4218
4219        assert_eq!(
4220            result
4221                .node_outputs
4222                .get("batch")
4223                .and_then(Value::as_array)
4224                .map(Vec::len),
4225            Some(3)
4226        );
4227        assert_eq!(
4228            result
4229                .node_outputs
4230                .get("filter")
4231                .and_then(Value::as_array)
4232                .map(Vec::len),
4233            Some(2)
4234        );
4235    }
4236
4237    #[tokio::test]
4238    async fn executes_debounce_and_throttle_nodes() {
4239        let llm = MockLlmExecutor {
4240            output: "unused".to_string(),
4241        };
4242        let runtime = WorkflowRuntime::new(
4243            debounce_and_throttle_workflow(),
4244            &llm,
4245            None,
4246            WorkflowRuntimeOptions::default(),
4247        );
4248
4249        let result = runtime
4250            .execute(json!({"key": "k1"}), None)
4251            .await
4252            .expect("debounce/throttle workflow should execute");
4253
4254        assert_eq!(result.terminal_node_id, "end_throttled");
4255        assert_eq!(
4256            result.node_outputs.get("debounce_a"),
4257            Some(&json!({"key": "k1", "suppressed": true}))
4258        );
4259        assert_eq!(
4260            result.node_outputs.get("throttle_a"),
4261            Some(&json!({"key": "k1", "throttled": true}))
4262        );
4263    }
4264
4265    #[tokio::test]
4266    async fn executes_retry_compensate_successfully_without_compensation() {
4267        let llm = MockLlmExecutor {
4268            output: "unused".to_string(),
4269        };
4270        let tools = RetryCompensateToolExecutor {
4271            attempts: AtomicUsize::new(0),
4272        };
4273        let runtime = WorkflowRuntime::new(
4274            retry_compensate_workflow("unstable_primary"),
4275            &llm,
4276            Some(&tools),
4277            WorkflowRuntimeOptions::default(),
4278        );
4279
4280        let result = runtime
4281            .execute(json!({}), None)
4282            .await
4283            .expect("retry_compensate should recover by retry");
4284
4285        assert_eq!(result.terminal_node_id, "end");
4286        assert_eq!(tools.attempts.load(Ordering::Relaxed), 2);
4287        assert_eq!(result.retry_events.len(), 1);
4288    }
4289
4290    #[tokio::test]
4291    async fn executes_retry_compensate_with_compensation_route() {
4292        let llm = MockLlmExecutor {
4293            output: "unused".to_string(),
4294        };
4295        let tools = RetryCompensateToolExecutor {
4296            attempts: AtomicUsize::new(0),
4297        };
4298        let runtime = WorkflowRuntime::new(
4299            retry_compensate_workflow("always_fail"),
4300            &llm,
4301            Some(&tools),
4302            WorkflowRuntimeOptions::default(),
4303        );
4304
4305        let result = runtime
4306            .execute(json!({}), None)
4307            .await
4308            .expect("retry_compensate should use compensation");
4309
4310        assert_eq!(result.terminal_node_id, "end_compensated");
4311        assert_eq!(result.retry_events.len(), 1);
4312        assert_eq!(
4313            result
4314                .node_outputs
4315                .get("retry_comp")
4316                .and_then(|value| value.get("status")),
4317            Some(&json!("compensated"))
4318        );
4319    }
4320
4321    #[tokio::test]
4322    async fn executes_event_cache_router_human_transform_nodes() {
4323        let llm = MockLlmExecutor {
4324            output: "unused".to_string(),
4325        };
4326        let runtime = WorkflowRuntime::new(
4327            extended_nodes_workflow(),
4328            &llm,
4329            None,
4330            WorkflowRuntimeOptions::default(),
4331        );
4332
4333        let result = runtime
4334            .execute(
4335                json!({
4336                    "event_type": "webhook",
4337                    "cache_key": "customer-1",
4338                    "payload": {"value": 99},
4339                    "mode": "manual",
4340                    "approval": "approve",
4341                    "review_notes": {"editor": "ops"}
4342                }),
4343                None,
4344            )
4345            .await
4346            .expect("extended nodes workflow should execute");
4347
4348        assert_eq!(result.terminal_node_id, "end");
4349        assert_eq!(
4350            result.node_outputs.get("cache_read"),
4351            Some(&json!({"key": "customer-1", "hit": true, "value": {"value": 99}}))
4352        );
4353        assert_eq!(
4354            result.node_outputs.get("router"),
4355            Some(&json!({"selected": "human"}))
4356        );
4357        assert_eq!(
4358            result.node_outputs.get("transform"),
4359            Some(&json!({"value": 99}))
4360        );
4361    }
4362
4363    #[tokio::test]
4364    async fn routes_event_trigger_mismatch_to_fallback() {
4365        let llm = MockLlmExecutor {
4366            output: "unused".to_string(),
4367        };
4368        let runtime = WorkflowRuntime::new(
4369            extended_nodes_workflow(),
4370            &llm,
4371            None,
4372            WorkflowRuntimeOptions::default(),
4373        );
4374
4375        let result = runtime
4376            .execute(
4377                json!({
4378                    "event_type": "cron",
4379                    "cache_key": "customer-1",
4380                    "payload": {"value": 99},
4381                    "mode": "manual",
4382                    "approval": "approve"
4383                }),
4384                None,
4385            )
4386            .await
4387            .expect("event mismatch should route to fallback");
4388
4389        assert_eq!(result.terminal_node_id, "end_mismatch");
4390    }
4391
4392    #[tokio::test]
4393    async fn cache_read_routes_to_on_miss_when_value_absent() {
4394        let llm = MockLlmExecutor {
4395            output: "unused".to_string(),
4396        };
4397        let workflow = WorkflowDefinition {
4398            version: "v0".to_string(),
4399            name: "cache-read-miss".to_string(),
4400            nodes: vec![
4401                Node {
4402                    id: "start".to_string(),
4403                    kind: NodeKind::Start {
4404                        next: "cache_read".to_string(),
4405                    },
4406                },
4407                Node {
4408                    id: "cache_read".to_string(),
4409                    kind: NodeKind::CacheRead {
4410                        key_path: "input.cache_key".to_string(),
4411                        next: "end_hit".to_string(),
4412                        on_miss: Some("end_miss".to_string()),
4413                    },
4414                },
4415                Node {
4416                    id: "end_hit".to_string(),
4417                    kind: NodeKind::End,
4418                },
4419                Node {
4420                    id: "end_miss".to_string(),
4421                    kind: NodeKind::End,
4422                },
4423            ],
4424        };
4425        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
4426
4427        let result = runtime
4428            .execute(json!({"cache_key": "new-customer"}), None)
4429            .await
4430            .expect("cache read miss should still execute via on_miss route");
4431
4432        assert_eq!(
4433            result.node_outputs.get("cache_read"),
4434            Some(&json!({"key": "new-customer", "hit": false, "value": null}))
4435        );
4436        assert_eq!(result.terminal_node_id, "end_miss");
4437    }
4438
4439    #[tokio::test]
4440    async fn rejects_condition_when_expression_scope_exceeds_limit() {
4441        let llm = MockLlmExecutor {
4442            output: "unused".to_string(),
4443        };
4444        let workflow = WorkflowDefinition {
4445            version: "v0".to_string(),
4446            name: "scope-limit".to_string(),
4447            nodes: vec![
4448                Node {
4449                    id: "start".to_string(),
4450                    kind: NodeKind::Start {
4451                        next: "condition".to_string(),
4452                    },
4453                },
4454                Node {
4455                    id: "condition".to_string(),
4456                    kind: NodeKind::Condition {
4457                        expression: "input.flag == true".to_string(),
4458                        on_true: "end".to_string(),
4459                        on_false: "end".to_string(),
4460                    },
4461                },
4462                Node {
4463                    id: "end".to_string(),
4464                    kind: NodeKind::End,
4465                },
4466            ],
4467        };
4468
4469        let runtime = WorkflowRuntime::new(
4470            workflow,
4471            &llm,
4472            None,
4473            WorkflowRuntimeOptions {
4474                security_limits: RuntimeSecurityLimits {
4475                    max_expression_scope_bytes: 32,
4476                    ..RuntimeSecurityLimits::default()
4477                },
4478                ..WorkflowRuntimeOptions::default()
4479            },
4480        );
4481
4482        let error = runtime
4483            .execute(
4484                json!({"flag": true, "payload": "this-is-too-large-for-limit"}),
4485                None,
4486            )
4487            .await
4488            .expect_err("condition should fail when scope budget is exceeded");
4489        assert!(matches!(
4490            error,
4491            WorkflowRuntimeError::ExpressionScopeLimitExceeded { node_id, .. }
4492                if node_id == "condition"
4493        ));
4494    }
4495
4496    #[tokio::test]
4497    async fn rejects_map_when_item_count_exceeds_limit() {
4498        let llm = MockLlmExecutor {
4499            output: "unused".to_string(),
4500        };
4501        let tool = EchoInputToolExecutor;
4502        let runtime = WorkflowRuntime::new(
4503            map_reduce_workflow(ReduceOperation::Count),
4504            &llm,
4505            Some(&tool),
4506            WorkflowRuntimeOptions {
4507                security_limits: RuntimeSecurityLimits {
4508                    max_map_items: 2,
4509                    ..RuntimeSecurityLimits::default()
4510                },
4511                ..WorkflowRuntimeOptions::default()
4512            },
4513        );
4514
4515        let error = runtime
4516            .execute(json!({"values": [1, 2, 3]}), None)
4517            .await
4518            .expect_err("map should fail when item guard is exceeded");
4519        assert!(matches!(
4520            error,
4521            WorkflowRuntimeError::MapItemLimitExceeded {
4522                node_id,
4523                actual_items: 3,
4524                max_items: 2,
4525            } if node_id == "map"
4526        ));
4527    }
4528
4529    #[tokio::test]
4530    async fn resumes_from_checkpoint() {
4531        let llm = MockLlmExecutor {
4532            output: "unused".to_string(),
4533        };
4534        let tool = MockToolExecutor {
4535            output: json!({"ok": true}),
4536            fail: false,
4537        };
4538        let runtime = WorkflowRuntime::new(
4539            linear_workflow(),
4540            &llm,
4541            Some(&tool),
4542            WorkflowRuntimeOptions::default(),
4543        );
4544
4545        let checkpoint = WorkflowCheckpoint {
4546            run_id: "run-1".to_string(),
4547            workflow_name: "linear".to_string(),
4548            step: 2,
4549            next_node_id: "tool".to_string(),
4550            scope_snapshot: json!({"input": {"request_id": "r-resume"}}),
4551        };
4552
4553        let resumed = runtime
4554            .execute_resume_from_failure(&checkpoint, None)
4555            .await
4556            .expect("resume should continue from checkpoint node");
4557        assert_eq!(resumed.terminal_node_id, "end");
4558        assert_eq!(resumed.node_executions[0].node_id, "tool");
4559    }
4560
4561    #[test]
4562    fn scope_capabilities_enforce_read_write_boundaries() {
4563        let mut scope = RuntimeScope::new(json!({"k": "v"}));
4564
4565        let read_error = scope
4566            .scoped_input(ScopeCapability::LlmWrite)
4567            .expect_err("write capability should not read scope");
4568        assert!(matches!(read_error, ScopeAccessError::ReadDenied { .. }));
4569
4570        let write_error = scope
4571            .record_tool_output("tool", json!({"ok": true}), ScopeCapability::LlmWrite)
4572            .expect_err("llm write capability should not write tool output");
4573        assert!(matches!(write_error, ScopeAccessError::WriteDenied { .. }));
4574    }
4575}