Skip to main content

simple_agents_workflow/
runtime.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use serde_json::{json, Map, Value};
7use simple_agent_type::message::Message;
8use simple_agent_type::request::CompletionRequest;
9use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
10use thiserror::Error;
11use tokio::time::timeout;
12
13use crate::checkpoint::WorkflowCheckpoint;
14use crate::expressions;
15use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
16use crate::recorder::{TraceRecordError, TraceRecorder};
17use crate::replay::{replay_trace, ReplayError, ReplayReport};
18use crate::scheduler::DagScheduler;
19pub use crate::state::ScopeAccessError;
20use crate::trace::{TraceTerminalStatus, WorkflowTrace, WorkflowTraceMetadata};
21use crate::validation::{validate_and_normalize, ValidationErrors};
22
23mod engine;
24mod scope;
25use scope::{RuntimeScope, ScopeCapability};
26
27/// Runtime configuration for workflow execution.
28#[derive(Debug, Clone)]
29pub struct WorkflowRuntimeOptions {
30    /// Maximum number of node steps before aborting execution.
31    pub max_steps: usize,
32    /// Validate and normalize workflow before every run.
33    pub validate_before_run: bool,
34    /// Retry and timeout policy for LLM nodes.
35    pub llm_node_policy: NodeExecutionPolicy,
36    /// Retry and timeout policy for tool nodes.
37    pub tool_node_policy: NodeExecutionPolicy,
38    /// Enable deterministic trace recording for runtime events.
39    pub enable_trace_recording: bool,
40    /// Optional replay validation mode for recorded traces.
41    pub replay_mode: WorkflowReplayMode,
42    /// Global scheduler bound for node-level concurrent fan-out.
43    pub scheduler_max_in_flight: usize,
44    /// Named subgraph registry used by `subgraph` nodes.
45    pub subgraph_registry: BTreeMap<String, WorkflowDefinition>,
46    /// Runtime guardrails for expression and fan-out resource usage.
47    pub security_limits: RuntimeSecurityLimits,
48}
49
50/// Runtime-enforced security and resource limits.
51#[derive(Debug, Clone)]
52pub struct RuntimeSecurityLimits {
53    /// Maximum serialized bytes allowed for one expression evaluation scope.
54    pub max_expression_scope_bytes: usize,
55    /// Maximum array size accepted by `map` nodes.
56    pub max_map_items: usize,
57    /// Maximum branch count accepted by `parallel` nodes.
58    pub max_parallel_branches: usize,
59    /// Maximum array size accepted by `filter` nodes.
60    pub max_filter_items: usize,
61}
62
63/// Runtime replay behavior for deterministic runs.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum WorkflowReplayMode {
66    /// Trace replay validation is disabled.
67    Disabled,
68    /// Validate the recorded trace via `replay_trace` before success return.
69    ValidateRecordedTrace,
70}
71
72/// Retry and timeout policy for runtime-owned node execution.
73#[derive(Debug, Clone, Default)]
74pub struct NodeExecutionPolicy {
75    /// Per-attempt timeout. `None` disables timeout enforcement.
76    pub timeout: Option<Duration>,
77    /// Number of retries after the first failed attempt.
78    pub max_retries: usize,
79}
80
81impl Default for WorkflowRuntimeOptions {
82    fn default() -> Self {
83        Self {
84            max_steps: 256,
85            validate_before_run: true,
86            llm_node_policy: NodeExecutionPolicy::default(),
87            tool_node_policy: NodeExecutionPolicy::default(),
88            enable_trace_recording: true,
89            replay_mode: WorkflowReplayMode::Disabled,
90            scheduler_max_in_flight: 8,
91            subgraph_registry: BTreeMap::new(),
92            security_limits: RuntimeSecurityLimits::default(),
93        }
94    }
95}
96
97impl Default for RuntimeSecurityLimits {
98    fn default() -> Self {
99        Self {
100            max_expression_scope_bytes: 128 * 1024,
101            max_map_items: 4096,
102            max_parallel_branches: 128,
103            max_filter_items: 8192,
104        }
105    }
106}
107
108/// Cooperative cancellation interface for workflow runs.
109pub trait CancellationSignal: Send + Sync {
110    /// Returns true if execution should stop.
111    fn is_cancelled(&self) -> bool;
112}
113
114impl CancellationSignal for AtomicBool {
115    fn is_cancelled(&self) -> bool {
116        self.load(Ordering::Relaxed)
117    }
118}
119
120impl CancellationSignal for bool {
121    fn is_cancelled(&self) -> bool {
122        *self
123    }
124}
125
126/// Input payload passed to the LLM executor adapter.
127#[derive(Debug, Clone, PartialEq)]
128pub struct LlmExecutionInput {
129    /// Current workflow node id.
130    pub node_id: String,
131    /// Requested model.
132    pub model: String,
133    /// Prompt configured on the workflow node.
134    pub prompt: String,
135    /// Deterministic scoped context for this node execution.
136    pub scoped_input: Value,
137}
138
139/// LLM output returned to the runtime.
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct LlmExecutionOutput {
142    /// Assistant text returned by the model.
143    pub content: String,
144}
145
146/// Typed LLM adapter errors.
147#[derive(Debug, Error, Clone, PartialEq, Eq)]
148pub enum LlmExecutionError {
149    /// CompletionRequest build/validation failure.
150    #[error("invalid completion request: {0}")]
151    InvalidRequest(String),
152    /// Core client call failed.
153    #[error("llm client execution failed: {0}")]
154    Client(String),
155    /// Unexpected completion mode returned by client.
156    #[error("unexpected completion outcome: {0}")]
157    UnexpectedOutcome(&'static str),
158    /// Response did not include assistant content.
159    #[error("llm response had no content")]
160    EmptyResponse,
161}
162
163/// Async runtime adapter for LLM calls.
164#[async_trait]
165pub trait LlmExecutor: Send + Sync {
166    /// Executes one workflow LLM node.
167    async fn execute(
168        &self,
169        input: LlmExecutionInput,
170    ) -> Result<LlmExecutionOutput, LlmExecutionError>;
171}
172
173#[async_trait]
174impl LlmExecutor for SimpleAgentsClient {
175    async fn execute(
176        &self,
177        input: LlmExecutionInput,
178    ) -> Result<LlmExecutionOutput, LlmExecutionError> {
179        let user_prompt = build_prompt_with_scope(&input.prompt, &input.scoped_input);
180        let request = CompletionRequest::builder()
181            .model(input.model)
182            .message(Message::user(user_prompt))
183            .build()
184            .map_err(|error| LlmExecutionError::InvalidRequest(error.to_string()))?;
185
186        let outcome = self
187            .complete(&request, CompletionOptions::default())
188            .await
189            .map_err(|error| LlmExecutionError::Client(error.to_string()))?;
190
191        let response = match outcome {
192            CompletionOutcome::Response(response) => response,
193            CompletionOutcome::Stream(_) => {
194                return Err(LlmExecutionError::UnexpectedOutcome("stream"));
195            }
196            CompletionOutcome::HealedJson(_) => {
197                return Err(LlmExecutionError::UnexpectedOutcome("healed_json"));
198            }
199            CompletionOutcome::CoercedSchema(_) => {
200                return Err(LlmExecutionError::UnexpectedOutcome("coerced_schema"));
201            }
202        };
203
204        let content = response
205            .content()
206            .ok_or(LlmExecutionError::EmptyResponse)?
207            .to_string();
208
209        Ok(LlmExecutionOutput { content })
210    }
211}
212
213/// Input payload for host-provided tool execution.
214#[derive(Debug, Clone, PartialEq)]
215pub struct ToolExecutionInput {
216    /// Current workflow node id.
217    pub node_id: String,
218    /// Tool name declared by the workflow.
219    pub tool: String,
220    /// Static node input payload.
221    pub input: Value,
222    /// Deterministic scoped context for this node execution.
223    pub scoped_input: Value,
224}
225
226/// Typed tool execution errors.
227#[derive(Debug, Error, Clone, PartialEq, Eq)]
228pub enum ToolExecutionError {
229    /// Tool implementation is unavailable in the host runtime.
230    #[error("tool handler not found: {tool}")]
231    NotFound {
232        /// Missing tool name.
233        tool: String,
234    },
235    /// Tool returned an execution failure.
236    #[error("tool execution failed: {0}")]
237    Failed(String),
238}
239
240/// Async host tool executor surface.
241#[async_trait]
242pub trait ToolExecutor: Send + Sync {
243    /// Executes one workflow tool node.
244    async fn execute_tool(&self, input: ToolExecutionInput) -> Result<Value, ToolExecutionError>;
245}
246
247/// Node-level execution result.
248#[derive(Debug, Clone, PartialEq)]
249pub struct NodeExecution {
250    /// Zero-based step index.
251    pub step: usize,
252    /// Executed node id.
253    pub node_id: String,
254    /// Structured execution payload.
255    pub data: NodeExecutionData,
256}
257
258/// Structured result data emitted for each node.
259#[derive(Debug, Clone, PartialEq)]
260pub enum NodeExecutionData {
261    /// Start node transition.
262    Start {
263        /// Next node id.
264        next: String,
265    },
266    /// LLM node output.
267    Llm {
268        /// Model used.
269        model: String,
270        /// Assistant output text.
271        output: String,
272        /// Next node id.
273        next: String,
274    },
275    /// Tool node output.
276    Tool {
277        /// Tool name.
278        tool: String,
279        /// Tool output JSON payload.
280        output: Value,
281        /// Next node id.
282        next: String,
283    },
284    /// Condition node decision.
285    Condition {
286        /// Original expression.
287        expression: String,
288        /// Evaluated bool.
289        evaluated: bool,
290        /// Chosen next node id.
291        next: String,
292    },
293    /// Debounce gate decision.
294    Debounce {
295        /// Resolved debounce key.
296        key: String,
297        /// Whether the event was suppressed.
298        suppressed: bool,
299        /// Chosen next node id.
300        next: String,
301    },
302    /// Throttle gate decision.
303    Throttle {
304        /// Resolved throttle key.
305        key: String,
306        /// Whether the event was throttled.
307        throttled: bool,
308        /// Chosen next node id.
309        next: String,
310    },
311    /// Explicit retry/compensation execution result.
312    RetryCompensate {
313        /// Primary tool name.
314        tool: String,
315        /// Total primary attempts executed.
316        attempts: usize,
317        /// Whether compensation path was used.
318        compensated: bool,
319        /// Node output payload.
320        output: Value,
321        /// Chosen next node id.
322        next: String,
323    },
324    /// Human decision result.
325    HumanInTheLoop {
326        /// True when approved.
327        approved: bool,
328        /// Optional human payload routed through the node.
329        response: Value,
330        /// Chosen next node id.
331        next: String,
332    },
333    /// Cache read result.
334    CacheRead {
335        /// Cache key.
336        key: String,
337        /// True when key was found.
338        hit: bool,
339        /// Returned value (null on miss).
340        value: Value,
341        /// Chosen next node id.
342        next: String,
343    },
344    /// Cache write result.
345    CacheWrite {
346        /// Cache key.
347        key: String,
348        /// Stored value.
349        value: Value,
350        /// Next node id.
351        next: String,
352    },
353    /// Event trigger evaluation result.
354    EventTrigger {
355        /// Expected event name.
356        event: String,
357        /// True when input event matched.
358        matched: bool,
359        /// Chosen next node id.
360        next: String,
361    },
362    /// Router/selector decision.
363    Router {
364        /// Selected route node id.
365        selected: String,
366        /// Chosen next node id.
367        next: String,
368    },
369    /// Transform node output.
370    Transform {
371        /// Original transform expression.
372        expression: String,
373        /// Evaluated output payload.
374        output: Value,
375        /// Next node id.
376        next: String,
377    },
378    /// Loop node decision.
379    Loop {
380        /// Original loop condition.
381        condition: String,
382        /// Evaluated bool.
383        evaluated: bool,
384        /// Current iteration when entering body. Zero when exiting loop.
385        iteration: u32,
386        /// Chosen next node id.
387        next: String,
388    },
389    /// Subgraph execution result.
390    Subgraph {
391        /// Subgraph registry key.
392        graph: String,
393        /// Subgraph terminal node id.
394        terminal_node_id: String,
395        /// Aggregated subgraph node outputs.
396        output: Value,
397        /// Next node id.
398        next: String,
399    },
400    /// Batch extraction result.
401    Batch {
402        /// Source path used for extraction.
403        items_path: String,
404        /// Number of extracted items.
405        item_count: usize,
406        /// Next node id.
407        next: String,
408    },
409    /// Filter evaluation result.
410    Filter {
411        /// Source path used for extraction.
412        items_path: String,
413        /// Filter predicate expression.
414        expression: String,
415        /// Number of retained items.
416        kept: usize,
417        /// Next node id.
418        next: String,
419    },
420    /// Parallel node aggregate result.
421    Parallel {
422        /// Branch node ids executed by this node.
423        branches: Vec<String>,
424        /// Branch outputs keyed by branch node id.
425        outputs: BTreeMap<String, Value>,
426        /// Next node id.
427        next: String,
428    },
429    /// Merge node aggregate output.
430    Merge {
431        /// Merge policy used.
432        policy: MergePolicy,
433        /// Source node ids consumed.
434        sources: Vec<String>,
435        /// Merged value.
436        output: Value,
437        /// Next node id.
438        next: String,
439    },
440    /// Map node output.
441    Map {
442        /// Number of input items mapped.
443        item_count: usize,
444        /// Collected mapped values in source order.
445        output: Value,
446        /// Next node id.
447        next: String,
448    },
449    /// Reduce node output.
450    Reduce {
451        /// Reduce operation executed.
452        operation: ReduceOperation,
453        /// Reduced result.
454        output: Value,
455        /// Next node id.
456        next: String,
457    },
458    /// End node reached.
459    End,
460}
461
462/// Runtime event stream payload for trace consumers.
463#[derive(Debug, Clone, PartialEq)]
464pub struct WorkflowEvent {
465    /// Zero-based step index.
466    pub step: usize,
467    /// Node id associated with this event.
468    pub node_id: String,
469    /// Event payload.
470    pub kind: WorkflowEventKind,
471}
472
473/// Event kinds emitted by the runtime.
474#[derive(Debug, Clone, PartialEq)]
475pub enum WorkflowEventKind {
476    /// Node execution started.
477    NodeStarted,
478    /// Node execution completed successfully.
479    NodeCompleted {
480        /// Node execution payload.
481        data: NodeExecutionData,
482    },
483    /// Node execution failed.
484    NodeFailed {
485        /// Error message.
486        message: String,
487    },
488}
489
490/// Final workflow execution report.
491#[derive(Debug, Clone, PartialEq)]
492pub struct WorkflowRunResult {
493    /// Workflow name.
494    pub workflow_name: String,
495    /// Final terminal node id.
496    pub terminal_node_id: String,
497    /// Ordered node execution records.
498    pub node_executions: Vec<NodeExecution>,
499    /// Ordered runtime events.
500    pub events: Vec<WorkflowEvent>,
501    /// Retry diagnostics captured during node execution.
502    pub retry_events: Vec<WorkflowRetryEvent>,
503    /// Node output map keyed by node id.
504    pub node_outputs: BTreeMap<String, Value>,
505    /// Optional deterministic trace captured during this run.
506    pub trace: Option<WorkflowTrace>,
507    /// Replay validation report when replay mode is enabled.
508    pub replay_report: Option<ReplayReport>,
509}
510
511/// Captures one retry decision and the reason it occurred.
512#[derive(Debug, Clone, PartialEq, Eq)]
513pub struct WorkflowRetryEvent {
514    /// Zero-based step index for the active node.
515    pub step: usize,
516    /// Node id that will be retried.
517    pub node_id: String,
518    /// Operation category (`llm` or `tool`).
519    pub operation: String,
520    /// Attempt that failed and triggered the retry.
521    pub failed_attempt: usize,
522    /// Human-readable retry reason.
523    pub reason: String,
524}
525
526/// Runtime failures.
527#[derive(Debug, Error)]
528pub enum WorkflowRuntimeError {
529    /// Workflow failed structural validation.
530    #[error(transparent)]
531    Validation(#[from] ValidationErrors),
532    /// Workflow has no start node.
533    #[error("workflow has no start node")]
534    MissingStartNode,
535    /// Current node id not found in node index.
536    #[error("node not found: {node_id}")]
537    NodeNotFound {
538        /// Missing node id.
539        node_id: String,
540    },
541    /// Loop protection guard triggered.
542    #[error("workflow exceeded max step limit ({max_steps})")]
543    StepLimitExceeded {
544        /// Configured max steps.
545        max_steps: usize,
546    },
547    /// Execution cancelled by caller.
548    #[error("workflow execution cancelled")]
549    Cancelled,
550    /// LLM node failed.
551    #[error("llm node '{node_id}' failed: {source}")]
552    Llm {
553        /// Failing node id.
554        node_id: String,
555        /// Source error.
556        source: LlmExecutionError,
557    },
558    /// Tool node failed.
559    #[error("tool node '{node_id}' failed: {source}")]
560    Tool {
561        /// Failing node id.
562        node_id: String,
563        /// Source error.
564        source: ToolExecutionError,
565    },
566    /// LLM node exhausted all retry attempts.
567    #[error("llm node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
568    LlmRetryExhausted {
569        /// Failing node id.
570        node_id: String,
571        /// Number of attempts made.
572        attempts: usize,
573        /// Last attempt error.
574        last_error: LlmExecutionError,
575    },
576    /// Tool node exhausted all retry attempts.
577    #[error("tool node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
578    ToolRetryExhausted {
579        /// Failing node id.
580        node_id: String,
581        /// Number of attempts made.
582        attempts: usize,
583        /// Last attempt error.
584        last_error: ToolExecutionError,
585    },
586    /// LLM node timed out across all attempts.
587    #[error(
588        "llm node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
589    )]
590    LlmTimeout {
591        /// Failing node id.
592        node_id: String,
593        /// Per-attempt timeout in milliseconds.
594        timeout_ms: u128,
595        /// Number of attempts made.
596        attempts: usize,
597    },
598    /// Tool node timed out across all attempts.
599    #[error(
600        "tool node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
601    )]
602    ToolTimeout {
603        /// Failing node id.
604        node_id: String,
605        /// Per-attempt timeout in milliseconds.
606        timeout_ms: u128,
607        /// Number of attempts made.
608        attempts: usize,
609    },
610    /// Tool node reached without an executor.
611    #[error("tool node '{node_id}' requires a tool executor")]
612    MissingToolExecutor {
613        /// Failing node id.
614        node_id: String,
615    },
616    /// LLM/tool node missing its `next` edge.
617    #[error("node '{node_id}' is missing required next edge")]
618    MissingNextEdge {
619        /// Failing node id.
620        node_id: String,
621    },
622    /// Condition expression could not be evaluated.
623    #[error("condition node '{node_id}' has invalid expression '{expression}': {reason}")]
624    InvalidCondition {
625        /// Failing node id.
626        node_id: String,
627        /// Condition expression.
628        expression: String,
629        /// Detailed reason.
630        reason: String,
631    },
632    /// Loop condition expression could not be evaluated.
633    #[error("loop node '{node_id}' has invalid condition '{expression}': {reason}")]
634    InvalidLoopCondition {
635        /// Failing node id.
636        node_id: String,
637        /// Loop condition expression.
638        expression: String,
639        /// Detailed reason.
640        reason: String,
641    },
642    /// Loop exceeded configured max iterations.
643    #[error("loop node '{node_id}' exceeded max iterations ({max_iterations})")]
644    LoopIterationLimitExceeded {
645        /// Failing node id.
646        node_id: String,
647        /// Configured max iterations.
648        max_iterations: u32,
649    },
650    /// Non-terminal node did not return a next transition.
651    #[error("node '{node_id}' did not provide a next transition")]
652    MissingNextTransition {
653        /// Failing node id.
654        node_id: String,
655    },
656    /// Runtime dispatch invariant failed.
657    #[error("runtime dispatch invariant failed for node '{node_id}': {reason}")]
658    DispatchInvariant {
659        /// Node id involved in the invariant failure.
660        node_id: String,
661        /// Human-readable invariant failure reason.
662        reason: String,
663    },
664    /// Scoped state boundary check failed.
665    #[error("scope access failed on node '{node_id}': {source}")]
666    ScopeAccess {
667        /// Failing node id.
668        node_id: String,
669        /// Source access error.
670        source: ScopeAccessError,
671    },
672    /// Trace recorder operation failed.
673    #[error(transparent)]
674    TraceRecording(#[from] TraceRecordError),
675    /// Replay trace validation failed.
676    #[error(transparent)]
677    Replay(#[from] ReplayError),
678    /// Replay mode requested without trace recording.
679    #[error("replay validation requires trace recording to be enabled")]
680    ReplayRequiresTraceRecording,
681    /// Parallel branch references unsupported node kind.
682    #[error("parallel node '{node_id}' cannot execute branch '{branch_id}': {reason}")]
683    ParallelBranchUnsupported {
684        /// Parallel node id.
685        node_id: String,
686        /// Branch node id.
687        branch_id: String,
688        /// Reason for rejection.
689        reason: String,
690    },
691    /// Merge node cannot resolve one or more sources.
692    #[error("merge node '{node_id}' missing source output from '{source_id}'")]
693    MissingMergeSource {
694        /// Merge node id.
695        node_id: String,
696        /// Missing source id.
697        source_id: String,
698    },
699    /// Merge quorum policy could not be satisfied.
700    #[error("merge node '{node_id}' quorum not met: required {required}, resolved {resolved}")]
701    MergeQuorumNotMet {
702        /// Merge node id.
703        node_id: String,
704        /// Required number of sources.
705        required: usize,
706        /// Number of resolved sources.
707        resolved: usize,
708    },
709    /// Map node items path did not resolve to an array.
710    #[error("map node '{node_id}' items_path '{items_path}' did not resolve to an array")]
711    MapItemsNotArray {
712        /// Map node id.
713        node_id: String,
714        /// Configured path.
715        items_path: String,
716    },
717    /// Reduce node source value is not reducible.
718    #[error("reduce node '{node_id}' source '{source_node}' is not reducible: {reason}")]
719    InvalidReduceInput {
720        /// Reduce node id.
721        node_id: String,
722        /// Source node id.
723        source_node: String,
724        /// Detailed reason.
725        reason: String,
726    },
727    /// Subgraph registry key not found.
728    #[error("subgraph node '{node_id}' references unknown graph '{graph}'")]
729    SubgraphNotFound { node_id: String, graph: String },
730    /// Batch source path did not resolve to an array.
731    #[error("batch node '{node_id}' items_path '{items_path}' did not resolve to an array")]
732    BatchItemsNotArray { node_id: String, items_path: String },
733    /// Filter source path did not resolve to an array.
734    #[error("filter node '{node_id}' items_path '{items_path}' did not resolve to an array")]
735    FilterItemsNotArray { node_id: String, items_path: String },
736    /// Filter expression failed while evaluating one item.
737    #[error("filter node '{node_id}' expression '{expression}' failed: {reason}")]
738    InvalidFilterExpression {
739        node_id: String,
740        expression: String,
741        reason: String,
742    },
743    /// Serialized expression scope exceeded configured runtime limit.
744    #[error(
745        "expression scope too large on node '{node_id}': {actual_bytes} bytes exceeds limit {limit_bytes}"
746    )]
747    ExpressionScopeLimitExceeded {
748        node_id: String,
749        actual_bytes: usize,
750        limit_bytes: usize,
751    },
752    /// Parallel branch fan-out exceeded configured runtime limit.
753    #[error(
754        "parallel node '{node_id}' has {actual_branches} branches, exceeding limit {max_branches}"
755    )]
756    ParallelBranchLimitExceeded {
757        node_id: String,
758        actual_branches: usize,
759        max_branches: usize,
760    },
761    /// Map item count exceeded configured runtime limit.
762    #[error("map node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
763    MapItemLimitExceeded {
764        node_id: String,
765        actual_items: usize,
766        max_items: usize,
767    },
768    /// Filter item count exceeded configured runtime limit.
769    #[error("filter node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
770    FilterItemLimitExceeded {
771        node_id: String,
772        actual_items: usize,
773        max_items: usize,
774    },
775    /// A node-required path could not be resolved.
776    #[error("node '{node_id}' could not resolve required path '{path}'")]
777    MissingPath { node_id: String, path: String },
778    /// A cache key path did not resolve to a string.
779    #[error("node '{node_id}' path '{path}' did not resolve to a string cache key")]
780    CacheKeyNotString { node_id: String, path: String },
781    /// Human decision value is unsupported.
782    #[error("human node '{node_id}' has unsupported decision value at '{path}': {value}")]
783    InvalidHumanDecision {
784        node_id: String,
785        path: String,
786        value: String,
787    },
788    /// Event value did not resolve to a string.
789    #[error("event trigger node '{node_id}' path '{path}' did not resolve to an event string")]
790    InvalidEventValue { node_id: String, path: String },
791    /// Router expression evaluation failed.
792    #[error("router node '{node_id}' route expression '{expression}' failed: {reason}")]
793    InvalidRouterExpression {
794        node_id: String,
795        expression: String,
796        reason: String,
797    },
798    /// Transform expression evaluation failed.
799    #[error("transform node '{node_id}' expression '{expression}' failed: {reason}")]
800    InvalidTransformExpression {
801        node_id: String,
802        expression: String,
803        reason: String,
804    },
805    /// Explicit retry/compensate node exhausted primary and compensation failed.
806    #[error(
807        "retry_compensate node '{node_id}' failed after {attempts} primary attempt(s) and compensation error: {compensation_error}"
808    )]
809    RetryCompensateFailed {
810        node_id: String,
811        attempts: usize,
812        compensation_error: ToolExecutionError,
813    },
814}
815
816/// Deterministic minimal runtime for workflow execution.
817pub struct WorkflowRuntime<'a> {
818    definition: WorkflowDefinition,
819    llm_executor: &'a dyn LlmExecutor,
820    tool_executor: Option<&'a dyn ToolExecutor>,
821    options: WorkflowRuntimeOptions,
822}
823
824impl<'a> WorkflowRuntime<'a> {
825    /// Creates a runtime with host-provided LLM and tool adapters.
826    pub fn new(
827        definition: WorkflowDefinition,
828        llm_executor: &'a dyn LlmExecutor,
829        tool_executor: Option<&'a dyn ToolExecutor>,
830        options: WorkflowRuntimeOptions,
831    ) -> Self {
832        Self {
833            definition,
834            llm_executor,
835            tool_executor,
836            options,
837        }
838    }
839
840    /// Executes a workflow deterministically from `start` to `end`.
841    pub async fn execute(
842        &self,
843        input: Value,
844        cancellation: Option<&dyn CancellationSignal>,
845    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
846        let workflow = if self.options.validate_before_run {
847            validate_and_normalize(&self.definition)?
848        } else {
849            self.definition.normalized()
850        };
851
852        let start_id = find_start_node_id(&workflow)?;
853        self.execute_from_node(
854            workflow,
855            RuntimeScope::new(input),
856            start_id,
857            0,
858            cancellation,
859        )
860        .await
861    }
862
863    /// Resumes execution from a checkpoint created after a node failure.
864    pub async fn execute_resume_from_failure(
865        &self,
866        checkpoint: &WorkflowCheckpoint,
867        cancellation: Option<&dyn CancellationSignal>,
868    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
869        let workflow = if self.options.validate_before_run {
870            validate_and_normalize(&self.definition)?
871        } else {
872            self.definition.normalized()
873        };
874
875        let scope_input = checkpoint
876            .scope_snapshot
877            .get("input")
878            .cloned()
879            .unwrap_or_else(|| checkpoint.scope_snapshot.clone());
880
881        self.execute_from_node(
882            workflow,
883            RuntimeScope::new(scope_input),
884            checkpoint.next_node_id.clone(),
885            checkpoint.step,
886            cancellation,
887        )
888        .await
889    }
890
891    async fn execute_from_node(
892        &self,
893        workflow: WorkflowDefinition,
894        scope: RuntimeScope,
895        start_node_id: String,
896        starting_step: usize,
897        cancellation: Option<&dyn CancellationSignal>,
898    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
899        engine::execute_from_node(
900            self,
901            workflow,
902            scope,
903            start_node_id,
904            starting_step,
905            cancellation,
906        )
907        .await
908    }
909
910    async fn execute_node(
911        &self,
912        node: &Node,
913        node_index: &HashMap<&str, &Node>,
914        step: usize,
915        scope: &mut RuntimeScope,
916        cancellation: Option<&dyn CancellationSignal>,
917        retry_events: &mut Vec<WorkflowRetryEvent>,
918    ) -> Result<NodeExecution, WorkflowRuntimeError> {
919        engine::execute_node(
920            self,
921            node,
922            node_index,
923            step,
924            scope,
925            cancellation,
926            retry_events,
927        )
928        .await
929    }
930
931    fn execute_start_node(&self, step: usize, node: &Node, next: &str) -> NodeExecution {
932        NodeExecution {
933            step,
934            node_id: node.id.clone(),
935            data: NodeExecutionData::Start {
936                next: next.to_string(),
937            },
938        }
939    }
940
941    async fn execute_llm_node(
942        &self,
943        step: usize,
944        node: &Node,
945        spec: LlmNodeSpec<'_>,
946        scope: &mut RuntimeScope,
947        cancellation: Option<&dyn CancellationSignal>,
948        retry_events: &mut Vec<WorkflowRetryEvent>,
949    ) -> Result<NodeExecution, WorkflowRuntimeError> {
950        let next_node = spec
951            .next
952            .clone()
953            .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
954                node_id: node.id.clone(),
955            })?;
956
957        let (output, llm_retries) = self
958            .execute_llm_with_policy(step, node, spec.model, spec.prompt, scope, cancellation)
959            .await?;
960        retry_events.extend(llm_retries);
961
962        scope
963            .record_llm_output(&node.id, output.content.clone(), ScopeCapability::LlmWrite)
964            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
965                node_id: node.id.clone(),
966                source,
967            })?;
968
969        Ok(NodeExecution {
970            step,
971            node_id: node.id.clone(),
972            data: NodeExecutionData::Llm {
973                model: spec.model.to_string(),
974                output: output.content,
975                next: next_node,
976            },
977        })
978    }
979
980    async fn execute_tool_node(
981        &self,
982        step: usize,
983        node: &Node,
984        spec: ToolNodeSpec<'_>,
985        scope: &mut RuntimeScope,
986        cancellation: Option<&dyn CancellationSignal>,
987        retry_events: &mut Vec<WorkflowRetryEvent>,
988    ) -> Result<NodeExecution, WorkflowRuntimeError> {
989        let next_node = spec
990            .next
991            .clone()
992            .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
993                node_id: node.id.clone(),
994            })?;
995
996        let executor =
997            self.tool_executor
998                .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
999                    node_id: node.id.clone(),
1000                })?;
1001
1002        let scoped_input = scope
1003            .scoped_input(ScopeCapability::ToolRead)
1004            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1005                node_id: node.id.clone(),
1006                source,
1007            })?;
1008
1009        let (tool_output, tool_retries) = self
1010            .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1011                step,
1012                node,
1013                tool: spec.tool,
1014                input: spec.input,
1015                executor,
1016                scoped_input,
1017                cancellation,
1018            })
1019            .await?;
1020        retry_events.extend(tool_retries);
1021
1022        scope
1023            .record_tool_output(&node.id, tool_output.clone(), ScopeCapability::ToolWrite)
1024            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1025                node_id: node.id.clone(),
1026                source,
1027            })?;
1028
1029        Ok(NodeExecution {
1030            step,
1031            node_id: node.id.clone(),
1032            data: NodeExecutionData::Tool {
1033                tool: spec.tool.to_string(),
1034                output: tool_output,
1035                next: next_node,
1036            },
1037        })
1038    }
1039
1040    fn execute_condition_node(
1041        &self,
1042        step: usize,
1043        node: &Node,
1044        spec: ConditionNodeSpec<'_>,
1045        scope: &mut RuntimeScope,
1046        cancellation: Option<&dyn CancellationSignal>,
1047    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1048        check_cancelled(cancellation)?;
1049        let scoped_input =
1050            scope
1051                .scoped_input(ScopeCapability::ConditionRead)
1052                .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1053                    node_id: node.id.clone(),
1054                    source,
1055                })?;
1056        enforce_expression_scope_budget(
1057            &node.id,
1058            &scoped_input,
1059            self.options.security_limits.max_expression_scope_bytes,
1060        )?;
1061        let evaluated =
1062            expressions::evaluate_bool(spec.expression, &scoped_input).map_err(|reason| {
1063                WorkflowRuntimeError::InvalidCondition {
1064                    node_id: node.id.clone(),
1065                    expression: spec.expression.to_string(),
1066                    reason: reason.to_string(),
1067                }
1068            })?;
1069        let next = if evaluated {
1070            spec.on_true.to_string()
1071        } else {
1072            spec.on_false.to_string()
1073        };
1074
1075        scope
1076            .record_condition_output(&node.id, evaluated, ScopeCapability::ConditionWrite)
1077            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1078                node_id: node.id.clone(),
1079                source,
1080            })?;
1081
1082        Ok(NodeExecution {
1083            step,
1084            node_id: node.id.clone(),
1085            data: NodeExecutionData::Condition {
1086                expression: spec.expression.to_string(),
1087                evaluated,
1088                next,
1089            },
1090        })
1091    }
1092
1093    async fn execute_parallel_node(
1094        &self,
1095        step: usize,
1096        node: &Node,
1097        spec: ParallelNodeSpec<'_>,
1098        scope: &mut RuntimeScope,
1099        cancellation: Option<&dyn CancellationSignal>,
1100        retry_events: &mut Vec<WorkflowRetryEvent>,
1101    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1102        check_cancelled(cancellation)?;
1103        if spec.branches.len() > self.options.security_limits.max_parallel_branches {
1104            return Err(WorkflowRuntimeError::ParallelBranchLimitExceeded {
1105                node_id: node.id.clone(),
1106                actual_branches: spec.branches.len(),
1107                max_branches: self.options.security_limits.max_parallel_branches,
1108            });
1109        }
1110        let base_scope = scope
1111            .scoped_input(ScopeCapability::MapRead)
1112            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1113                node_id: node.id.clone(),
1114                source,
1115            })?;
1116        let scheduler = DagScheduler::new(
1117            spec.max_in_flight
1118                .unwrap_or(self.options.scheduler_max_in_flight),
1119        );
1120        let parallel_node_id = node.id.clone();
1121
1122        let branch_outputs: Vec<(String, Value, Vec<WorkflowRetryEvent>)> = scheduler
1123            .run_bounded(spec.branches.iter().cloned(), |branch_id| {
1124                let parallel_node_id = parallel_node_id.clone();
1125                let base_scope = base_scope.clone();
1126                async move {
1127                    let branch_node = spec.node_index.get(branch_id.as_str()).ok_or_else(|| {
1128                        WorkflowRuntimeError::NodeNotFound {
1129                            node_id: branch_id.clone(),
1130                        }
1131                    })?;
1132                    self.execute_parallel_branch(
1133                        step,
1134                        &parallel_node_id,
1135                        branch_node,
1136                        base_scope,
1137                        cancellation,
1138                    )
1139                    .await
1140                }
1141            })
1142            .await?;
1143
1144        let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
1145        for (branch_id, output, branch_retry_events) in branch_outputs {
1146            retry_events.extend(branch_retry_events);
1147            scope
1148                .record_node_output(&branch_id, output.clone(), ScopeCapability::MapWrite)
1149                .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1150                    node_id: node.id.clone(),
1151                    source,
1152                })?;
1153            outputs.insert(branch_id, output);
1154        }
1155
1156        scope
1157            .record_node_output(
1158                &node.id,
1159                Value::Object(
1160                    outputs
1161                        .iter()
1162                        .map(|(key, value)| (key.clone(), value.clone()))
1163                        .collect(),
1164                ),
1165                ScopeCapability::MapWrite,
1166            )
1167            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1168                node_id: node.id.clone(),
1169                source,
1170            })?;
1171
1172        Ok(NodeExecution {
1173            step,
1174            node_id: node.id.clone(),
1175            data: NodeExecutionData::Parallel {
1176                branches: spec.branches.to_vec(),
1177                outputs,
1178                next: spec.next.to_string(),
1179            },
1180        })
1181    }
1182
1183    fn execute_merge_node(
1184        &self,
1185        step: usize,
1186        node: &Node,
1187        spec: MergeNodeSpec<'_>,
1188        scope: &mut RuntimeScope,
1189    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1190        let mut resolved = Vec::with_capacity(spec.sources.len());
1191        for source in spec.sources {
1192            let Some(value) = scope.node_output(source).cloned() else {
1193                return Err(WorkflowRuntimeError::MissingMergeSource {
1194                    node_id: node.id.clone(),
1195                    source_id: source.clone(),
1196                });
1197            };
1198            resolved.push((source.clone(), value));
1199        }
1200
1201        let output = match spec.policy {
1202            MergePolicy::First => resolved
1203                .first()
1204                .map(|(_, value)| value.clone())
1205                .unwrap_or(Value::Null),
1206            MergePolicy::All => Value::Array(
1207                resolved
1208                    .iter()
1209                    .map(|(_, value)| value.clone())
1210                    .collect::<Vec<_>>(),
1211            ),
1212            MergePolicy::Quorum => {
1213                let required = spec.quorum.unwrap_or_default();
1214                let resolved_count = resolved.len();
1215                if resolved_count < required {
1216                    return Err(WorkflowRuntimeError::MergeQuorumNotMet {
1217                        node_id: node.id.clone(),
1218                        required,
1219                        resolved: resolved_count,
1220                    });
1221                }
1222                Value::Array(
1223                    resolved
1224                        .iter()
1225                        .take(required)
1226                        .map(|(_, value)| value.clone())
1227                        .collect::<Vec<_>>(),
1228                )
1229            }
1230        };
1231
1232        scope
1233            .record_node_output(&node.id, output.clone(), ScopeCapability::ReduceWrite)
1234            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1235                node_id: node.id.clone(),
1236                source,
1237            })?;
1238
1239        Ok(NodeExecution {
1240            step,
1241            node_id: node.id.clone(),
1242            data: NodeExecutionData::Merge {
1243                policy: spec.policy.clone(),
1244                sources: spec.sources.to_vec(),
1245                output,
1246                next: spec.next.to_string(),
1247            },
1248        })
1249    }
1250
1251    async fn execute_map_node(
1252        &self,
1253        step: usize,
1254        node: &Node,
1255        spec: MapNodeSpec<'_>,
1256        scope: &mut RuntimeScope,
1257        cancellation: Option<&dyn CancellationSignal>,
1258        retry_events: &mut Vec<WorkflowRetryEvent>,
1259    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1260        let executor =
1261            self.tool_executor
1262                .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
1263                    node_id: node.id.clone(),
1264                })?;
1265
1266        let scoped_input = scope
1267            .scoped_input(ScopeCapability::MapRead)
1268            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1269                node_id: node.id.clone(),
1270                source,
1271            })?;
1272        let items = resolve_path(&scoped_input, spec.items_path)
1273            .and_then(Value::as_array)
1274            .ok_or_else(|| WorkflowRuntimeError::MapItemsNotArray {
1275                node_id: node.id.clone(),
1276                items_path: spec.items_path.to_string(),
1277            })?
1278            .clone();
1279        if items.len() > self.options.security_limits.max_map_items {
1280            return Err(WorkflowRuntimeError::MapItemLimitExceeded {
1281                node_id: node.id.clone(),
1282                actual_items: items.len(),
1283                max_items: self.options.security_limits.max_map_items,
1284            });
1285        }
1286
1287        let scheduler = DagScheduler::new(
1288            spec.max_in_flight
1289                .unwrap_or(self.options.scheduler_max_in_flight),
1290        );
1291        let map_node = node.clone();
1292        let mapped: Vec<(Value, Vec<WorkflowRetryEvent>)> = scheduler
1293            .run_bounded(items.into_iter().enumerate(), |(index, item)| {
1294                let scoped_input = scoped_input.clone();
1295                let map_node = map_node.clone();
1296                async move {
1297                    let item_scope = map_item_scoped_input(&scoped_input, &item, index);
1298                    let (output, retries) = self
1299                        .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1300                            step,
1301                            node: &map_node,
1302                            tool: spec.tool,
1303                            input: &item,
1304                            executor,
1305                            scoped_input: item_scope,
1306                            cancellation,
1307                        })
1308                        .await?;
1309                    Ok::<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError>((output, retries))
1310                }
1311            })
1312            .await?;
1313
1314        let mut outputs = Vec::with_capacity(mapped.len());
1315        for (output, local_retries) in mapped {
1316            outputs.push(output);
1317            retry_events.extend(local_retries);
1318        }
1319
1320        let output = Value::Array(outputs);
1321        scope
1322            .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1323            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1324                node_id: node.id.clone(),
1325                source,
1326            })?;
1327
1328        Ok(NodeExecution {
1329            step,
1330            node_id: node.id.clone(),
1331            data: NodeExecutionData::Map {
1332                item_count: output.as_array().map_or(0, Vec::len),
1333                output,
1334                next: spec.next.to_string(),
1335            },
1336        })
1337    }
1338
1339    fn execute_reduce_node(
1340        &self,
1341        step: usize,
1342        node: &Node,
1343        source: &str,
1344        operation: &ReduceOperation,
1345        next: &str,
1346        scope: &mut RuntimeScope,
1347    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1348        let source_value = scope.node_output(source).cloned().ok_or_else(|| {
1349            WorkflowRuntimeError::MissingMergeSource {
1350                node_id: node.id.clone(),
1351                source_id: source.to_string(),
1352            }
1353        })?;
1354
1355        let reduced = reduce_value(&node.id, source, operation, source_value)?;
1356        scope
1357            .record_node_output(&node.id, reduced.clone(), ScopeCapability::ReduceWrite)
1358            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1359                node_id: node.id.clone(),
1360                source,
1361            })?;
1362
1363        Ok(NodeExecution {
1364            step,
1365            node_id: node.id.clone(),
1366            data: NodeExecutionData::Reduce {
1367                operation: operation.clone(),
1368                output: reduced,
1369                next: next.to_string(),
1370            },
1371        })
1372    }
1373
1374    fn execute_batch_node(
1375        &self,
1376        step: usize,
1377        node: &Node,
1378        items_path: &str,
1379        next: &str,
1380        scope: &mut RuntimeScope,
1381    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1382        let scoped = scope
1383            .scoped_input(ScopeCapability::MapRead)
1384            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1385                node_id: node.id.clone(),
1386                source,
1387            })?;
1388        let items = resolve_path(&scoped, items_path).ok_or_else(|| {
1389            WorkflowRuntimeError::BatchItemsNotArray {
1390                node_id: node.id.clone(),
1391                items_path: items_path.to_string(),
1392            }
1393        })?;
1394        let array = items
1395            .as_array()
1396            .ok_or_else(|| WorkflowRuntimeError::BatchItemsNotArray {
1397                node_id: node.id.clone(),
1398                items_path: items_path.to_string(),
1399            })?;
1400
1401        let output = Value::Array(array.clone());
1402        scope
1403            .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1404            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1405                node_id: node.id.clone(),
1406                source,
1407            })?;
1408
1409        Ok(NodeExecution {
1410            step,
1411            node_id: node.id.clone(),
1412            data: NodeExecutionData::Batch {
1413                items_path: items_path.to_string(),
1414                item_count: array.len(),
1415                next: next.to_string(),
1416            },
1417        })
1418    }
1419
1420    fn execute_filter_node(
1421        &self,
1422        step: usize,
1423        node: &Node,
1424        items_path: &str,
1425        expression: &str,
1426        next: &str,
1427        scope: &mut RuntimeScope,
1428    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1429        let scoped = scope
1430            .scoped_input(ScopeCapability::MapRead)
1431            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1432                node_id: node.id.clone(),
1433                source,
1434            })?;
1435        let items_value = resolve_path(&scoped, items_path).ok_or_else(|| {
1436            WorkflowRuntimeError::FilterItemsNotArray {
1437                node_id: node.id.clone(),
1438                items_path: items_path.to_string(),
1439            }
1440        })?;
1441        let array =
1442            items_value
1443                .as_array()
1444                .ok_or_else(|| WorkflowRuntimeError::FilterItemsNotArray {
1445                    node_id: node.id.clone(),
1446                    items_path: items_path.to_string(),
1447                })?;
1448        if array.len() > self.options.security_limits.max_filter_items {
1449            return Err(WorkflowRuntimeError::FilterItemLimitExceeded {
1450                node_id: node.id.clone(),
1451                actual_items: array.len(),
1452                max_items: self.options.security_limits.max_filter_items,
1453            });
1454        }
1455
1456        let mut kept = Vec::new();
1457        for (index, item) in array.iter().enumerate() {
1458            let mut eval_scope = scoped.clone();
1459            if let Some(object) = eval_scope.as_object_mut() {
1460                object.insert("item".to_string(), item.clone());
1461                object.insert("item_index".to_string(), Value::from(index as u64));
1462            }
1463            enforce_expression_scope_budget(
1464                &node.id,
1465                &eval_scope,
1466                self.options.security_limits.max_expression_scope_bytes,
1467            )?;
1468            let include =
1469                expressions::evaluate_bool(expression, &eval_scope).map_err(|reason| {
1470                    WorkflowRuntimeError::InvalidFilterExpression {
1471                        node_id: node.id.clone(),
1472                        expression: expression.to_string(),
1473                        reason: reason.to_string(),
1474                    }
1475                })?;
1476            if include {
1477                kept.push(item.clone());
1478            }
1479        }
1480        let output = Value::Array(kept.clone());
1481        scope
1482            .record_node_output(&node.id, output, ScopeCapability::MapWrite)
1483            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1484                node_id: node.id.clone(),
1485                source,
1486            })?;
1487
1488        Ok(NodeExecution {
1489            step,
1490            node_id: node.id.clone(),
1491            data: NodeExecutionData::Filter {
1492                items_path: items_path.to_string(),
1493                expression: expression.to_string(),
1494                kept: kept.len(),
1495                next: next.to_string(),
1496            },
1497        })
1498    }
1499
1500    async fn execute_llm_with_policy(
1501        &self,
1502        step: usize,
1503        node: &Node,
1504        model: &str,
1505        prompt: &str,
1506        scope: &RuntimeScope,
1507        cancellation: Option<&dyn CancellationSignal>,
1508    ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1509        let scoped_input = scope
1510            .scoped_input(ScopeCapability::LlmRead)
1511            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1512                node_id: node.id.clone(),
1513                source,
1514            })?;
1515
1516        self.execute_llm_with_policy_for_scope(
1517            step,
1518            node,
1519            model,
1520            prompt,
1521            scoped_input,
1522            cancellation,
1523        )
1524        .await
1525    }
1526
1527    async fn execute_parallel_branch(
1528        &self,
1529        step: usize,
1530        parallel_node_id: &str,
1531        branch_node: &Node,
1532        scoped_input: Value,
1533        cancellation: Option<&dyn CancellationSignal>,
1534    ) -> Result<(String, Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1535        match &branch_node.kind {
1536            NodeKind::Llm {
1537                model,
1538                prompt,
1539                next: _,
1540            } => {
1541                let (output, retries) = self
1542                    .execute_llm_with_policy_for_scope(
1543                        step,
1544                        branch_node,
1545                        model,
1546                        prompt,
1547                        scoped_input,
1548                        cancellation,
1549                    )
1550                    .await?;
1551                Ok((
1552                    branch_node.id.clone(),
1553                    Value::String(output.content),
1554                    retries,
1555                ))
1556            }
1557            NodeKind::Tool { tool, input, .. } => {
1558                let executor = self.tool_executor.ok_or_else(|| {
1559                    WorkflowRuntimeError::MissingToolExecutor {
1560                        node_id: branch_node.id.clone(),
1561                    }
1562                })?;
1563                let (output, retries) = self
1564                    .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1565                        step,
1566                        node: branch_node,
1567                        tool,
1568                        input,
1569                        executor,
1570                        scoped_input,
1571                        cancellation,
1572                    })
1573                    .await?;
1574                Ok((branch_node.id.clone(), output, retries))
1575            }
1576            _ => Err(WorkflowRuntimeError::ParallelBranchUnsupported {
1577                node_id: parallel_node_id.to_string(),
1578                branch_id: branch_node.id.clone(),
1579                reason: "only llm/tool branches are supported".to_string(),
1580            }),
1581        }
1582    }
1583
1584    async fn execute_llm_with_policy_for_scope(
1585        &self,
1586        step: usize,
1587        node: &Node,
1588        model: &str,
1589        prompt: &str,
1590        scoped_input: Value,
1591        cancellation: Option<&dyn CancellationSignal>,
1592    ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1593        let max_attempts = self.options.llm_node_policy.max_retries.saturating_add(1);
1594        let mut retry_events = Vec::new();
1595
1596        for attempt in 1..=max_attempts {
1597            check_cancelled(cancellation)?;
1598
1599            let execution = self.llm_executor.execute(LlmExecutionInput {
1600                node_id: node.id.clone(),
1601                model: model.to_string(),
1602                prompt: prompt.to_string(),
1603                scoped_input: scoped_input.clone(),
1604            });
1605
1606            let outcome = if let Some(timeout_duration) = self.options.llm_node_policy.timeout {
1607                match timeout(timeout_duration, execution).await {
1608                    Ok(result) => result,
1609                    Err(_) => {
1610                        if attempt == max_attempts {
1611                            return Err(WorkflowRuntimeError::LlmTimeout {
1612                                node_id: node.id.clone(),
1613                                timeout_ms: timeout_duration.as_millis(),
1614                                attempts: attempt,
1615                            });
1616                        }
1617                        retry_events.push(WorkflowRetryEvent {
1618                            step,
1619                            node_id: node.id.clone(),
1620                            operation: "llm".to_string(),
1621                            failed_attempt: attempt,
1622                            reason: format!(
1623                                "attempt {} timed out after {} ms",
1624                                attempt,
1625                                timeout_duration.as_millis()
1626                            ),
1627                        });
1628                        check_cancelled(cancellation)?;
1629                        continue;
1630                    }
1631                }
1632            } else {
1633                execution.await
1634            };
1635
1636            match outcome {
1637                Ok(output) => return Ok((output, retry_events)),
1638                Err(last_error) => {
1639                    if attempt == max_attempts {
1640                        return Err(WorkflowRuntimeError::LlmRetryExhausted {
1641                            node_id: node.id.clone(),
1642                            attempts: attempt,
1643                            last_error,
1644                        });
1645                    }
1646                    retry_events.push(WorkflowRetryEvent {
1647                        step,
1648                        node_id: node.id.clone(),
1649                        operation: "llm".to_string(),
1650                        failed_attempt: attempt,
1651                        reason: last_error.to_string(),
1652                    });
1653                    check_cancelled(cancellation)?;
1654                }
1655            }
1656        }
1657
1658        unreachable!("llm attempts loop always returns")
1659    }
1660
1661    async fn execute_tool_with_policy_for_scope(
1662        &self,
1663        request: ToolPolicyRequest<'_>,
1664    ) -> Result<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1665        let ToolPolicyRequest {
1666            step,
1667            node,
1668            tool,
1669            input,
1670            executor,
1671            scoped_input,
1672            cancellation,
1673        } = request;
1674        let max_attempts = self.options.tool_node_policy.max_retries.saturating_add(1);
1675        let mut retry_events = Vec::new();
1676
1677        for attempt in 1..=max_attempts {
1678            check_cancelled(cancellation)?;
1679
1680            let execution = executor.execute_tool(ToolExecutionInput {
1681                node_id: node.id.clone(),
1682                tool: tool.to_string(),
1683                input: input.clone(),
1684                scoped_input: scoped_input.clone(),
1685            });
1686
1687            let outcome = if let Some(timeout_duration) = self.options.tool_node_policy.timeout {
1688                match timeout(timeout_duration, execution).await {
1689                    Ok(result) => result,
1690                    Err(_) => {
1691                        if attempt == max_attempts {
1692                            return Err(WorkflowRuntimeError::ToolTimeout {
1693                                node_id: node.id.clone(),
1694                                timeout_ms: timeout_duration.as_millis(),
1695                                attempts: attempt,
1696                            });
1697                        }
1698                        retry_events.push(WorkflowRetryEvent {
1699                            step,
1700                            node_id: node.id.clone(),
1701                            operation: "tool".to_string(),
1702                            failed_attempt: attempt,
1703                            reason: format!(
1704                                "attempt {} timed out after {} ms",
1705                                attempt,
1706                                timeout_duration.as_millis()
1707                            ),
1708                        });
1709                        check_cancelled(cancellation)?;
1710                        continue;
1711                    }
1712                }
1713            } else {
1714                execution.await
1715            };
1716
1717            match outcome {
1718                Ok(output) => return Ok((output, retry_events)),
1719                Err(last_error) => {
1720                    if attempt == max_attempts {
1721                        return Err(WorkflowRuntimeError::ToolRetryExhausted {
1722                            node_id: node.id.clone(),
1723                            attempts: attempt,
1724                            last_error,
1725                        });
1726                    }
1727                    retry_events.push(WorkflowRetryEvent {
1728                        step,
1729                        node_id: node.id.clone(),
1730                        operation: "tool".to_string(),
1731                        failed_attempt: attempt,
1732                        reason: last_error.to_string(),
1733                    });
1734                    check_cancelled(cancellation)?;
1735                }
1736            }
1737        }
1738
1739        unreachable!("tool attempts loop always returns")
1740    }
1741}
1742
1743struct ToolPolicyRequest<'a> {
1744    step: usize,
1745    node: &'a Node,
1746    tool: &'a str,
1747    input: &'a Value,
1748    executor: &'a dyn ToolExecutor,
1749    scoped_input: Value,
1750    cancellation: Option<&'a dyn CancellationSignal>,
1751}
1752
1753struct LlmNodeSpec<'a> {
1754    model: &'a str,
1755    prompt: &'a str,
1756    next: &'a Option<String>,
1757}
1758
1759struct ToolNodeSpec<'a> {
1760    tool: &'a str,
1761    input: &'a Value,
1762    next: &'a Option<String>,
1763}
1764
1765struct ConditionNodeSpec<'a> {
1766    expression: &'a str,
1767    on_true: &'a str,
1768    on_false: &'a str,
1769}
1770
1771struct ParallelNodeSpec<'a> {
1772    node_index: &'a HashMap<&'a str, &'a Node>,
1773    branches: &'a [String],
1774    next: &'a str,
1775    max_in_flight: Option<usize>,
1776}
1777
1778struct MergeNodeSpec<'a> {
1779    sources: &'a [String],
1780    policy: &'a MergePolicy,
1781    quorum: Option<usize>,
1782    next: &'a str,
1783}
1784
1785struct MapNodeSpec<'a> {
1786    tool: &'a str,
1787    items_path: &'a str,
1788    next: &'a str,
1789    max_in_flight: Option<usize>,
1790}
1791
1792fn check_cancelled(
1793    cancellation: Option<&dyn CancellationSignal>,
1794) -> Result<(), WorkflowRuntimeError> {
1795    if cancellation.is_some_and(CancellationSignal::is_cancelled) {
1796        Err(WorkflowRuntimeError::Cancelled)
1797    } else {
1798        Ok(())
1799    }
1800}
1801
1802fn next_trace_timestamp(clock: &mut u64) -> u64 {
1803    let timestamp = *clock;
1804    *clock = clock.saturating_add(1);
1805    timestamp
1806}
1807
1808fn build_prompt_with_scope(prompt: &str, scoped_input: &Value) -> String {
1809    format!("{}\n\nScoped context:\n{}", prompt, scoped_input)
1810}
1811
1812fn build_node_index(workflow: &WorkflowDefinition) -> HashMap<&str, &Node> {
1813    let mut index = HashMap::with_capacity(workflow.nodes.len());
1814    for node in &workflow.nodes {
1815        index.insert(node.id.as_str(), node);
1816    }
1817    index
1818}
1819
1820fn find_start_node_id(workflow: &WorkflowDefinition) -> Result<String, WorkflowRuntimeError> {
1821    workflow
1822        .nodes
1823        .iter()
1824        .find_map(|node| match node.kind {
1825            NodeKind::Start { .. } => Some(node.id.clone()),
1826            _ => None,
1827        })
1828        .ok_or(WorkflowRuntimeError::MissingStartNode)
1829}
1830
1831fn next_node_id(data: &NodeExecutionData) -> Option<String> {
1832    match data {
1833        NodeExecutionData::Start { next }
1834        | NodeExecutionData::Llm { next, .. }
1835        | NodeExecutionData::Tool { next, .. }
1836        | NodeExecutionData::Condition { next, .. }
1837        | NodeExecutionData::Debounce { next, .. }
1838        | NodeExecutionData::Throttle { next, .. }
1839        | NodeExecutionData::RetryCompensate { next, .. }
1840        | NodeExecutionData::HumanInTheLoop { next, .. }
1841        | NodeExecutionData::CacheRead { next, .. }
1842        | NodeExecutionData::CacheWrite { next, .. }
1843        | NodeExecutionData::EventTrigger { next, .. }
1844        | NodeExecutionData::Router { next, .. }
1845        | NodeExecutionData::Transform { next, .. }
1846        | NodeExecutionData::Loop { next, .. }
1847        | NodeExecutionData::Subgraph { next, .. }
1848        | NodeExecutionData::Batch { next, .. }
1849        | NodeExecutionData::Filter { next, .. }
1850        | NodeExecutionData::Parallel { next, .. }
1851        | NodeExecutionData::Merge { next, .. }
1852        | NodeExecutionData::Map { next, .. }
1853        | NodeExecutionData::Reduce { next, .. } => Some(next.clone()),
1854        NodeExecutionData::End => None,
1855    }
1856}
1857
1858fn resolve_path<'a>(scope: &'a Value, path: &str) -> Option<&'a Value> {
1859    let mut current = scope;
1860    for segment in path.split('.') {
1861        if segment.is_empty() {
1862            continue;
1863        }
1864        current = current.get(segment)?;
1865    }
1866    Some(current)
1867}
1868
1869fn resolve_string_path(scope: &Value, path: &str) -> Option<String> {
1870    resolve_path(scope, path)
1871        .and_then(Value::as_str)
1872        .map(str::to_string)
1873}
1874
1875fn evaluate_human_decision(value: &Value) -> Option<bool> {
1876    match value {
1877        Value::Bool(flag) => Some(*flag),
1878        Value::String(text) => {
1879            let normalized = text.trim().to_ascii_lowercase();
1880            match normalized.as_str() {
1881                "approve" | "approved" | "yes" | "true" => Some(true),
1882                "reject" | "rejected" | "no" | "false" => Some(false),
1883                _ => None,
1884            }
1885        }
1886        _ => None,
1887    }
1888}
1889
1890fn evaluate_transform_expression(expression: &str, scope: &Value) -> Result<Value, String> {
1891    let trimmed = expression.trim();
1892    if trimmed.is_empty() {
1893        return Err("expression is empty".to_string());
1894    }
1895
1896    if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
1897        return Ok(value);
1898    }
1899
1900    let path = trimmed.strip_prefix("$.").unwrap_or(trimmed);
1901    resolve_path(scope, path)
1902        .cloned()
1903        .ok_or_else(|| format!("path '{path}' not found in scoped input"))
1904}
1905
1906fn map_item_scoped_input(base_scope: &Value, item: &Value, index: usize) -> Value {
1907    let mut object = match base_scope {
1908        Value::Object(map) => map.clone(),
1909        _ => Map::new(),
1910    };
1911    object.insert("map_item".to_string(), item.clone());
1912    object.insert("map_index".to_string(), Value::from(index as u64));
1913    Value::Object(object)
1914}
1915
1916fn enforce_expression_scope_budget(
1917    node_id: &str,
1918    scoped_input: &Value,
1919    max_expression_scope_bytes: usize,
1920) -> Result<(), WorkflowRuntimeError> {
1921    let size = serde_json::to_vec(scoped_input)
1922        .map(|bytes| bytes.len())
1923        .unwrap_or(max_expression_scope_bytes.saturating_add(1));
1924    if size > max_expression_scope_bytes {
1925        return Err(WorkflowRuntimeError::ExpressionScopeLimitExceeded {
1926            node_id: node_id.to_string(),
1927            actual_bytes: size,
1928            limit_bytes: max_expression_scope_bytes,
1929        });
1930    }
1931    Ok(())
1932}
1933
1934fn reduce_value(
1935    node_id: &str,
1936    source: &str,
1937    operation: &ReduceOperation,
1938    source_value: Value,
1939) -> Result<Value, WorkflowRuntimeError> {
1940    let items =
1941        source_value
1942            .as_array()
1943            .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
1944                node_id: node_id.to_string(),
1945                source_node: source.to_string(),
1946                reason: "expected source output to be an array".to_string(),
1947            })?;
1948
1949    match operation {
1950        ReduceOperation::Count => Ok(Value::from(items.len() as u64)),
1951        ReduceOperation::Sum => {
1952            let mut sum = 0.0f64;
1953            for value in items {
1954                let number =
1955                    value
1956                        .as_f64()
1957                        .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
1958                            node_id: node_id.to_string(),
1959                            source_node: source.to_string(),
1960                            reason: "sum operation requires numeric array values".to_string(),
1961                        })?;
1962                sum += number;
1963            }
1964            let number = serde_json::Number::from_f64(sum).ok_or_else(|| {
1965                WorkflowRuntimeError::InvalidReduceInput {
1966                    node_id: node_id.to_string(),
1967                    source_node: source.to_string(),
1968                    reason: "sum produced non-finite value".to_string(),
1969                }
1970            })?;
1971            Ok(Value::Number(number))
1972        }
1973    }
1974}
1975
1976#[cfg(test)]
1977mod tests {
1978    use std::sync::atomic::{AtomicUsize, Ordering};
1979    use std::sync::{Arc, Mutex};
1980    use std::time::Duration;
1981
1982    use async_trait::async_trait;
1983    use serde_json::json;
1984    use tokio::time::sleep;
1985
1986    use super::*;
1987    use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
1988
1989    struct MockLlmExecutor {
1990        output: String,
1991    }
1992
1993    #[async_trait]
1994    impl LlmExecutor for MockLlmExecutor {
1995        async fn execute(
1996            &self,
1997            _input: LlmExecutionInput,
1998        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
1999            Ok(LlmExecutionOutput {
2000                content: self.output.clone(),
2001            })
2002        }
2003    }
2004
2005    struct MockToolExecutor {
2006        output: Value,
2007        fail: bool,
2008    }
2009
2010    #[async_trait]
2011    impl ToolExecutor for MockToolExecutor {
2012        async fn execute_tool(
2013            &self,
2014            input: ToolExecutionInput,
2015        ) -> Result<Value, ToolExecutionError> {
2016            if self.fail {
2017                return Err(ToolExecutionError::Failed(format!(
2018                    "tool '{}' failed intentionally",
2019                    input.tool
2020                )));
2021            }
2022            Ok(self.output.clone())
2023        }
2024    }
2025
2026    struct SequencedLlmExecutor {
2027        responses: Mutex<Vec<Result<LlmExecutionOutput, LlmExecutionError>>>,
2028        calls: AtomicUsize,
2029    }
2030
2031    #[async_trait]
2032    impl LlmExecutor for SequencedLlmExecutor {
2033        async fn execute(
2034            &self,
2035            _input: LlmExecutionInput,
2036        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2037            self.calls.fetch_add(1, Ordering::Relaxed);
2038            self.responses
2039                .lock()
2040                .expect("sequenced llm lock poisoned")
2041                .remove(0)
2042        }
2043    }
2044
2045    struct SlowToolExecutor {
2046        delay: Duration,
2047    }
2048
2049    #[async_trait]
2050    impl ToolExecutor for SlowToolExecutor {
2051        async fn execute_tool(
2052            &self,
2053            _input: ToolExecutionInput,
2054        ) -> Result<Value, ToolExecutionError> {
2055            sleep(self.delay).await;
2056            Ok(json!({"status": "slow-ok"}))
2057        }
2058    }
2059
2060    struct IncrementingToolExecutor {
2061        value: AtomicUsize,
2062    }
2063
2064    #[async_trait]
2065    impl ToolExecutor for IncrementingToolExecutor {
2066        async fn execute_tool(
2067            &self,
2068            _input: ToolExecutionInput,
2069        ) -> Result<Value, ToolExecutionError> {
2070            let next = self.value.fetch_add(1, Ordering::Relaxed) + 1;
2071            Ok(json!(next))
2072        }
2073    }
2074
2075    struct EchoInputToolExecutor;
2076
2077    #[async_trait]
2078    impl ToolExecutor for EchoInputToolExecutor {
2079        async fn execute_tool(
2080            &self,
2081            input: ToolExecutionInput,
2082        ) -> Result<Value, ToolExecutionError> {
2083            Ok(input.input)
2084        }
2085    }
2086
2087    struct RetryCompensateToolExecutor {
2088        attempts: AtomicUsize,
2089    }
2090
2091    #[async_trait]
2092    impl ToolExecutor for RetryCompensateToolExecutor {
2093        async fn execute_tool(
2094            &self,
2095            input: ToolExecutionInput,
2096        ) -> Result<Value, ToolExecutionError> {
2097            match input.tool.as_str() {
2098                "unstable_primary" => {
2099                    let current = self.attempts.fetch_add(1, Ordering::Relaxed) + 1;
2100                    if current <= 1 {
2101                        Err(ToolExecutionError::Failed("primary failed".to_string()))
2102                    } else {
2103                        Ok(json!({"primary_attempt": current}))
2104                    }
2105                }
2106                "always_fail" => Err(ToolExecutionError::Failed("always fail".to_string())),
2107                "compensate" => Ok(json!({"compensated": true})),
2108                _ => Err(ToolExecutionError::NotFound {
2109                    tool: input.tool.clone(),
2110                }),
2111            }
2112        }
2113    }
2114
2115    struct CancellingLlmExecutor {
2116        cancel_flag: Arc<AtomicBool>,
2117        calls: AtomicUsize,
2118    }
2119
2120    #[async_trait]
2121    impl LlmExecutor for CancellingLlmExecutor {
2122        async fn execute(
2123            &self,
2124            _input: LlmExecutionInput,
2125        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2126            self.calls.fetch_add(1, Ordering::Relaxed);
2127            self.cancel_flag.store(true, Ordering::Relaxed);
2128            Err(LlmExecutionError::Client("transient failure".to_string()))
2129        }
2130    }
2131
2132    fn linear_workflow() -> WorkflowDefinition {
2133        WorkflowDefinition {
2134            version: "v0".to_string(),
2135            name: "linear".to_string(),
2136            nodes: vec![
2137                Node {
2138                    id: "start".to_string(),
2139                    kind: NodeKind::Start {
2140                        next: "llm".to_string(),
2141                    },
2142                },
2143                Node {
2144                    id: "llm".to_string(),
2145                    kind: NodeKind::Llm {
2146                        model: "gpt-4".to_string(),
2147                        prompt: "Summarize".to_string(),
2148                        next: Some("tool".to_string()),
2149                    },
2150                },
2151                Node {
2152                    id: "tool".to_string(),
2153                    kind: NodeKind::Tool {
2154                        tool: "extract".to_string(),
2155                        input: json!({"k": "v"}),
2156                        next: Some("end".to_string()),
2157                    },
2158                },
2159                Node {
2160                    id: "end".to_string(),
2161                    kind: NodeKind::End,
2162                },
2163            ],
2164        }
2165    }
2166
2167    fn llm_only_workflow() -> WorkflowDefinition {
2168        WorkflowDefinition {
2169            version: "v0".to_string(),
2170            name: "llm-only".to_string(),
2171            nodes: vec![
2172                Node {
2173                    id: "start".to_string(),
2174                    kind: NodeKind::Start {
2175                        next: "llm".to_string(),
2176                    },
2177                },
2178                Node {
2179                    id: "llm".to_string(),
2180                    kind: NodeKind::Llm {
2181                        model: "gpt-4".to_string(),
2182                        prompt: "Summarize".to_string(),
2183                        next: Some("end".to_string()),
2184                    },
2185                },
2186                Node {
2187                    id: "end".to_string(),
2188                    kind: NodeKind::End,
2189                },
2190            ],
2191        }
2192    }
2193
2194    fn loop_workflow(max_iterations: Option<u32>) -> WorkflowDefinition {
2195        WorkflowDefinition {
2196            version: "v0".to_string(),
2197            name: "loop-workflow".to_string(),
2198            nodes: vec![
2199                Node {
2200                    id: "start".to_string(),
2201                    kind: NodeKind::Start {
2202                        next: "loop".to_string(),
2203                    },
2204                },
2205                Node {
2206                    id: "loop".to_string(),
2207                    kind: NodeKind::Loop {
2208                        condition: "last_tool_output != 3".to_string(),
2209                        body: "counter".to_string(),
2210                        next: "end".to_string(),
2211                        max_iterations,
2212                    },
2213                },
2214                Node {
2215                    id: "counter".to_string(),
2216                    kind: NodeKind::Tool {
2217                        tool: "counter".to_string(),
2218                        input: json!({}),
2219                        next: Some("loop".to_string()),
2220                    },
2221                },
2222                Node {
2223                    id: "end".to_string(),
2224                    kind: NodeKind::End,
2225                },
2226            ],
2227        }
2228    }
2229
2230    fn parallel_merge_workflow(policy: MergePolicy, quorum: Option<usize>) -> WorkflowDefinition {
2231        WorkflowDefinition {
2232            version: "v0".to_string(),
2233            name: "parallel-merge".to_string(),
2234            nodes: vec![
2235                Node {
2236                    id: "start".to_string(),
2237                    kind: NodeKind::Start {
2238                        next: "parallel".to_string(),
2239                    },
2240                },
2241                Node {
2242                    id: "parallel".to_string(),
2243                    kind: NodeKind::Parallel {
2244                        branches: vec!["tool_a".to_string(), "tool_b".to_string()],
2245                        next: "merge".to_string(),
2246                        max_in_flight: Some(2),
2247                    },
2248                },
2249                Node {
2250                    id: "tool_a".to_string(),
2251                    kind: NodeKind::Tool {
2252                        tool: "extract".to_string(),
2253                        input: json!({"value": 1}),
2254                        next: Some("end".to_string()),
2255                    },
2256                },
2257                Node {
2258                    id: "tool_b".to_string(),
2259                    kind: NodeKind::Tool {
2260                        tool: "extract".to_string(),
2261                        input: json!({"value": 2}),
2262                        next: Some("end".to_string()),
2263                    },
2264                },
2265                Node {
2266                    id: "merge".to_string(),
2267                    kind: NodeKind::Merge {
2268                        sources: vec!["tool_a".to_string(), "tool_b".to_string()],
2269                        policy,
2270                        quorum,
2271                        next: "end".to_string(),
2272                    },
2273                },
2274                Node {
2275                    id: "end".to_string(),
2276                    kind: NodeKind::End,
2277                },
2278            ],
2279        }
2280    }
2281
2282    fn map_reduce_workflow(operation: ReduceOperation) -> WorkflowDefinition {
2283        WorkflowDefinition {
2284            version: "v0".to_string(),
2285            name: "map-reduce".to_string(),
2286            nodes: vec![
2287                Node {
2288                    id: "start".to_string(),
2289                    kind: NodeKind::Start {
2290                        next: "map".to_string(),
2291                    },
2292                },
2293                Node {
2294                    id: "map".to_string(),
2295                    kind: NodeKind::Map {
2296                        tool: "counter".to_string(),
2297                        items_path: "input.values".to_string(),
2298                        next: "reduce".to_string(),
2299                        max_in_flight: Some(3),
2300                    },
2301                },
2302                Node {
2303                    id: "reduce".to_string(),
2304                    kind: NodeKind::Reduce {
2305                        source: "map".to_string(),
2306                        operation,
2307                        next: "end".to_string(),
2308                    },
2309                },
2310                Node {
2311                    id: "end".to_string(),
2312                    kind: NodeKind::End,
2313                },
2314            ],
2315        }
2316    }
2317
2318    fn debounce_and_throttle_workflow() -> WorkflowDefinition {
2319        WorkflowDefinition {
2320            version: "v0".to_string(),
2321            name: "debounce-throttle".to_string(),
2322            nodes: vec![
2323                Node {
2324                    id: "start".to_string(),
2325                    kind: NodeKind::Start {
2326                        next: "debounce_a".to_string(),
2327                    },
2328                },
2329                Node {
2330                    id: "debounce_a".to_string(),
2331                    kind: NodeKind::Debounce {
2332                        key_path: "input.key".to_string(),
2333                        window_steps: 3,
2334                        next: "debounce_a".to_string(),
2335                        on_suppressed: Some("throttle_a".to_string()),
2336                    },
2337                },
2338                Node {
2339                    id: "throttle_a".to_string(),
2340                    kind: NodeKind::Throttle {
2341                        key_path: "input.key".to_string(),
2342                        window_steps: 3,
2343                        next: "throttle_a".to_string(),
2344                        on_throttled: Some("end_throttled".to_string()),
2345                    },
2346                },
2347                Node {
2348                    id: "end_throttled".to_string(),
2349                    kind: NodeKind::End,
2350                },
2351            ],
2352        }
2353    }
2354
2355    fn extended_nodes_workflow() -> WorkflowDefinition {
2356        WorkflowDefinition {
2357            version: "v0".to_string(),
2358            name: "extended-nodes".to_string(),
2359            nodes: vec![
2360                Node {
2361                    id: "start".to_string(),
2362                    kind: NodeKind::Start {
2363                        next: "event".to_string(),
2364                    },
2365                },
2366                Node {
2367                    id: "event".to_string(),
2368                    kind: NodeKind::EventTrigger {
2369                        event: "webhook".to_string(),
2370                        event_path: "input.event_type".to_string(),
2371                        next: "cache_write".to_string(),
2372                        on_mismatch: Some("end_mismatch".to_string()),
2373                    },
2374                },
2375                Node {
2376                    id: "cache_write".to_string(),
2377                    kind: NodeKind::CacheWrite {
2378                        key_path: "input.cache_key".to_string(),
2379                        value_path: "input.payload".to_string(),
2380                        next: "cache_read".to_string(),
2381                    },
2382                },
2383                Node {
2384                    id: "cache_read".to_string(),
2385                    kind: NodeKind::CacheRead {
2386                        key_path: "input.cache_key".to_string(),
2387                        next: "router".to_string(),
2388                        on_miss: Some("end_miss".to_string()),
2389                    },
2390                },
2391                Node {
2392                    id: "router".to_string(),
2393                    kind: NodeKind::Router {
2394                        routes: vec![crate::ir::RouterRoute {
2395                            when: "input.mode == 'manual'".to_string(),
2396                            next: "human".to_string(),
2397                        }],
2398                        default: "transform".to_string(),
2399                    },
2400                },
2401                Node {
2402                    id: "human".to_string(),
2403                    kind: NodeKind::HumanInTheLoop {
2404                        decision_path: "input.approval".to_string(),
2405                        response_path: Some("input.review_notes".to_string()),
2406                        on_approve: "transform".to_string(),
2407                        on_reject: "end_rejected".to_string(),
2408                    },
2409                },
2410                Node {
2411                    id: "transform".to_string(),
2412                    kind: NodeKind::Transform {
2413                        expression: "node_outputs.cache_read.value".to_string(),
2414                        next: "end".to_string(),
2415                    },
2416                },
2417                Node {
2418                    id: "end".to_string(),
2419                    kind: NodeKind::End,
2420                },
2421                Node {
2422                    id: "end_mismatch".to_string(),
2423                    kind: NodeKind::End,
2424                },
2425                Node {
2426                    id: "end_miss".to_string(),
2427                    kind: NodeKind::End,
2428                },
2429                Node {
2430                    id: "end_rejected".to_string(),
2431                    kind: NodeKind::End,
2432                },
2433            ],
2434        }
2435    }
2436
2437    fn retry_compensate_workflow(primary_tool: &str) -> WorkflowDefinition {
2438        WorkflowDefinition {
2439            version: "v0".to_string(),
2440            name: "retry-compensate".to_string(),
2441            nodes: vec![
2442                Node {
2443                    id: "start".to_string(),
2444                    kind: NodeKind::Start {
2445                        next: "retry_comp".to_string(),
2446                    },
2447                },
2448                Node {
2449                    id: "retry_comp".to_string(),
2450                    kind: NodeKind::RetryCompensate {
2451                        tool: primary_tool.to_string(),
2452                        input: json!({"job": "run"}),
2453                        max_retries: 1,
2454                        compensate_tool: "compensate".to_string(),
2455                        compensate_input: json!({"job": "rollback"}),
2456                        next: "end".to_string(),
2457                        on_compensated: Some("end_compensated".to_string()),
2458                    },
2459                },
2460                Node {
2461                    id: "end".to_string(),
2462                    kind: NodeKind::End,
2463                },
2464                Node {
2465                    id: "end_compensated".to_string(),
2466                    kind: NodeKind::End,
2467                },
2468            ],
2469        }
2470    }
2471
2472    #[tokio::test]
2473    async fn executes_happy_path_linear_flow() {
2474        let llm = MockLlmExecutor {
2475            output: "ok".to_string(),
2476        };
2477        let tools = MockToolExecutor {
2478            output: json!({"status": "done"}),
2479            fail: false,
2480        };
2481        let runtime = WorkflowRuntime::new(
2482            linear_workflow(),
2483            &llm,
2484            Some(&tools),
2485            WorkflowRuntimeOptions::default(),
2486        );
2487
2488        let result = runtime
2489            .execute(json!({"request_id": "r1"}), None)
2490            .await
2491            .expect("linear workflow should succeed");
2492
2493        assert_eq!(result.workflow_name, "linear");
2494        assert_eq!(result.terminal_node_id, "end");
2495        assert_eq!(result.node_executions.len(), 4);
2496        assert_eq!(
2497            result.node_outputs.get("llm"),
2498            Some(&Value::String("ok".to_string()))
2499        );
2500        assert_eq!(
2501            result.node_outputs.get("tool"),
2502            Some(&json!({"status": "done"}))
2503        );
2504        assert_eq!(result.events.len(), 8);
2505        assert!(result.retry_events.is_empty());
2506        assert!(result.trace.is_some());
2507        assert_eq!(result.replay_report, None);
2508    }
2509
2510    #[tokio::test]
2511    async fn linear_flow_records_expected_node_execution_payloads() {
2512        let llm = MockLlmExecutor {
2513            output: "ok".to_string(),
2514        };
2515        let tools = MockToolExecutor {
2516            output: json!({"status": "done"}),
2517            fail: false,
2518        };
2519        let runtime = WorkflowRuntime::new(
2520            linear_workflow(),
2521            &llm,
2522            Some(&tools),
2523            WorkflowRuntimeOptions::default(),
2524        );
2525
2526        let result = runtime
2527            .execute(json!({"request_id": "r1"}), None)
2528            .await
2529            .expect("linear workflow should succeed");
2530
2531        assert!(matches!(
2532            result.node_executions[0].data,
2533            NodeExecutionData::Start { ref next } if next == "llm"
2534        ));
2535        assert!(matches!(
2536            result.node_executions[1].data,
2537            NodeExecutionData::Llm {
2538                ref model,
2539                ref output,
2540                ref next
2541            } if model == "gpt-4" && output == "ok" && next == "tool"
2542        ));
2543        assert!(matches!(
2544            result.node_executions[2].data,
2545            NodeExecutionData::Tool {
2546                ref tool,
2547                ref next,
2548                ..
2549            } if tool == "extract" && next == "end"
2550        ));
2551    }
2552
2553    #[tokio::test]
2554    async fn executes_conditional_branching() {
2555        let workflow = WorkflowDefinition {
2556            version: "v0".to_string(),
2557            name: "conditional".to_string(),
2558            nodes: vec![
2559                Node {
2560                    id: "start".to_string(),
2561                    kind: NodeKind::Start {
2562                        next: "condition".to_string(),
2563                    },
2564                },
2565                Node {
2566                    id: "condition".to_string(),
2567                    kind: NodeKind::Condition {
2568                        expression: "input.approved".to_string(),
2569                        on_true: "end_true".to_string(),
2570                        on_false: "end_false".to_string(),
2571                    },
2572                },
2573                Node {
2574                    id: "end_true".to_string(),
2575                    kind: NodeKind::End,
2576                },
2577                Node {
2578                    id: "end_false".to_string(),
2579                    kind: NodeKind::End,
2580                },
2581            ],
2582        };
2583
2584        let llm = MockLlmExecutor {
2585            output: "unused".to_string(),
2586        };
2587        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
2588
2589        let result = runtime
2590            .execute(json!({"approved": true}), None)
2591            .await
2592            .expect("conditional workflow should succeed");
2593
2594        assert_eq!(result.terminal_node_id, "end_true");
2595    }
2596
2597    #[tokio::test]
2598    async fn executes_conditional_false_branch() {
2599        let workflow = WorkflowDefinition {
2600            version: "v0".to_string(),
2601            name: "conditional-false".to_string(),
2602            nodes: vec![
2603                Node {
2604                    id: "start".to_string(),
2605                    kind: NodeKind::Start {
2606                        next: "condition".to_string(),
2607                    },
2608                },
2609                Node {
2610                    id: "condition".to_string(),
2611                    kind: NodeKind::Condition {
2612                        expression: "input.approved".to_string(),
2613                        on_true: "end_true".to_string(),
2614                        on_false: "end_false".to_string(),
2615                    },
2616                },
2617                Node {
2618                    id: "end_true".to_string(),
2619                    kind: NodeKind::End,
2620                },
2621                Node {
2622                    id: "end_false".to_string(),
2623                    kind: NodeKind::End,
2624                },
2625            ],
2626        };
2627        let llm = MockLlmExecutor {
2628            output: "unused".to_string(),
2629        };
2630        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
2631
2632        let result = runtime
2633            .execute(json!({"approved": false}), None)
2634            .await
2635            .expect("conditional workflow should take false branch");
2636
2637        assert_eq!(result.terminal_node_id, "end_false");
2638    }
2639
2640    #[tokio::test]
2641    async fn fails_when_tool_executor_is_missing() {
2642        let llm = MockLlmExecutor {
2643            output: "ok".to_string(),
2644        };
2645        let runtime = WorkflowRuntime::new(
2646            linear_workflow(),
2647            &llm,
2648            None,
2649            WorkflowRuntimeOptions::default(),
2650        );
2651
2652        let error = runtime
2653            .execute(json!({}), None)
2654            .await
2655            .expect_err("workflow should fail without tool executor");
2656
2657        assert!(matches!(
2658            error,
2659            WorkflowRuntimeError::MissingToolExecutor { node_id } if node_id == "tool"
2660        ));
2661    }
2662
2663    #[tokio::test]
2664    async fn fails_on_tool_execution_error() {
2665        let llm = MockLlmExecutor {
2666            output: "ok".to_string(),
2667        };
2668        let tools = MockToolExecutor {
2669            output: json!({"status": "unused"}),
2670            fail: true,
2671        };
2672        let runtime = WorkflowRuntime::new(
2673            linear_workflow(),
2674            &llm,
2675            Some(&tools),
2676            WorkflowRuntimeOptions::default(),
2677        );
2678
2679        let error = runtime
2680            .execute(json!({}), None)
2681            .await
2682            .expect_err("workflow should fail on tool error");
2683
2684        assert!(matches!(
2685            error,
2686            WorkflowRuntimeError::ToolRetryExhausted { node_id, attempts: 1, .. }
2687                if node_id == "tool"
2688        ));
2689    }
2690
2691    #[tokio::test]
2692    async fn retries_llm_after_transient_failure() {
2693        let llm = SequencedLlmExecutor {
2694            responses: Mutex::new(vec![
2695                Err(LlmExecutionError::Client("temporary".to_string())),
2696                Ok(LlmExecutionOutput {
2697                    content: "recovered".to_string(),
2698                }),
2699            ]),
2700            calls: AtomicUsize::new(0),
2701        };
2702        let runtime = WorkflowRuntime::new(
2703            llm_only_workflow(),
2704            &llm,
2705            None,
2706            WorkflowRuntimeOptions {
2707                llm_node_policy: NodeExecutionPolicy {
2708                    timeout: None,
2709                    max_retries: 1,
2710                },
2711                ..WorkflowRuntimeOptions::default()
2712            },
2713        );
2714
2715        let result = runtime
2716            .execute(json!({"request_id": "r2"}), None)
2717            .await
2718            .expect("llm retry should recover");
2719
2720        assert_eq!(result.terminal_node_id, "end");
2721        assert_eq!(result.node_outputs.get("llm"), Some(&json!("recovered")));
2722        assert_eq!(result.retry_events.len(), 1);
2723        assert_eq!(result.retry_events[0].operation, "llm");
2724        assert_eq!(llm.calls.load(Ordering::Relaxed), 2);
2725    }
2726
2727    #[tokio::test]
2728    async fn times_out_tool_execution_per_policy() {
2729        let llm = MockLlmExecutor {
2730            output: "ok".to_string(),
2731        };
2732        let tool = SlowToolExecutor {
2733            delay: Duration::from_millis(50),
2734        };
2735        let runtime = WorkflowRuntime::new(
2736            linear_workflow(),
2737            &llm,
2738            Some(&tool),
2739            WorkflowRuntimeOptions {
2740                tool_node_policy: NodeExecutionPolicy {
2741                    timeout: Some(Duration::from_millis(5)),
2742                    max_retries: 0,
2743                },
2744                ..WorkflowRuntimeOptions::default()
2745            },
2746        );
2747
2748        let error = runtime
2749            .execute(json!({}), None)
2750            .await
2751            .expect_err("tool execution should time out");
2752
2753        assert!(matches!(
2754            error,
2755            WorkflowRuntimeError::ToolTimeout {
2756                node_id,
2757                timeout_ms: 5,
2758                attempts: 1,
2759            } if node_id == "tool"
2760        ));
2761    }
2762
2763    #[tokio::test]
2764    async fn cancels_between_retry_attempts() {
2765        let cancel_flag = Arc::new(AtomicBool::new(false));
2766        let llm = CancellingLlmExecutor {
2767            cancel_flag: Arc::clone(&cancel_flag),
2768            calls: AtomicUsize::new(0),
2769        };
2770        let runtime = WorkflowRuntime::new(
2771            llm_only_workflow(),
2772            &llm,
2773            None,
2774            WorkflowRuntimeOptions {
2775                llm_node_policy: NodeExecutionPolicy {
2776                    timeout: None,
2777                    max_retries: 3,
2778                },
2779                ..WorkflowRuntimeOptions::default()
2780            },
2781        );
2782
2783        let error = runtime
2784            .execute(json!({}), Some(cancel_flag.as_ref()))
2785            .await
2786            .expect_err("workflow should stop when cancellation is observed");
2787
2788        assert!(matches!(error, WorkflowRuntimeError::Cancelled));
2789        assert_eq!(llm.calls.load(Ordering::Relaxed), 1);
2790    }
2791
2792    #[tokio::test]
2793    async fn enforces_step_limit_guard() {
2794        let workflow = WorkflowDefinition {
2795            version: "v0".to_string(),
2796            name: "loop".to_string(),
2797            nodes: vec![
2798                Node {
2799                    id: "start".to_string(),
2800                    kind: NodeKind::Start {
2801                        next: "condition".to_string(),
2802                    },
2803                },
2804                Node {
2805                    id: "condition".to_string(),
2806                    kind: NodeKind::Condition {
2807                        expression: "true".to_string(),
2808                        on_true: "condition".to_string(),
2809                        on_false: "end".to_string(),
2810                    },
2811                },
2812                Node {
2813                    id: "end".to_string(),
2814                    kind: NodeKind::End,
2815                },
2816            ],
2817        };
2818
2819        let llm = MockLlmExecutor {
2820            output: "unused".to_string(),
2821        };
2822        let runtime = WorkflowRuntime::new(
2823            workflow,
2824            &llm,
2825            None,
2826            WorkflowRuntimeOptions {
2827                max_steps: 3,
2828                ..WorkflowRuntimeOptions::default()
2829            },
2830        );
2831
2832        let error = runtime
2833            .execute(json!({}), None)
2834            .await
2835            .expect_err("workflow should fail on step limit");
2836
2837        assert!(matches!(
2838            error,
2839            WorkflowRuntimeError::StepLimitExceeded { max_steps: 3 }
2840        ));
2841    }
2842
2843    #[tokio::test]
2844    async fn validates_recorded_trace_in_replay_mode() {
2845        let llm = MockLlmExecutor {
2846            output: "ok".to_string(),
2847        };
2848        let tools = MockToolExecutor {
2849            output: json!({"status": "done"}),
2850            fail: false,
2851        };
2852        let runtime = WorkflowRuntime::new(
2853            linear_workflow(),
2854            &llm,
2855            Some(&tools),
2856            WorkflowRuntimeOptions {
2857                replay_mode: WorkflowReplayMode::ValidateRecordedTrace,
2858                ..WorkflowRuntimeOptions::default()
2859            },
2860        );
2861
2862        let result = runtime
2863            .execute(json!({"request_id": "r1"}), None)
2864            .await
2865            .expect("replay validation should pass");
2866
2867        assert!(result.trace.is_some());
2868        assert_eq!(
2869            result.replay_report.as_ref().map(|r| r.total_events),
2870            Some(9)
2871        );
2872    }
2873
2874    #[tokio::test]
2875    async fn executes_loop_until_condition_fails() {
2876        let llm = MockLlmExecutor {
2877            output: "unused".to_string(),
2878        };
2879        let counter = IncrementingToolExecutor {
2880            value: AtomicUsize::new(0),
2881        };
2882        let runtime = WorkflowRuntime::new(
2883            loop_workflow(Some(8)),
2884            &llm,
2885            Some(&counter),
2886            WorkflowRuntimeOptions::default(),
2887        );
2888
2889        let result = runtime
2890            .execute(json!({}), None)
2891            .await
2892            .expect("loop workflow should terminate at end");
2893
2894        assert_eq!(result.terminal_node_id, "end");
2895        assert!(result.node_executions.iter().any(|step| matches!(
2896            step.data,
2897            NodeExecutionData::Loop {
2898                evaluated: false,
2899                ..
2900            }
2901        )));
2902    }
2903
2904    #[tokio::test]
2905    async fn fails_when_loop_exceeds_max_iterations() {
2906        let llm = MockLlmExecutor {
2907            output: "unused".to_string(),
2908        };
2909        let counter = MockToolExecutor {
2910            output: json!(0),
2911            fail: false,
2912        };
2913        let runtime = WorkflowRuntime::new(
2914            loop_workflow(Some(2)),
2915            &llm,
2916            Some(&counter),
2917            WorkflowRuntimeOptions {
2918                max_steps: 20,
2919                ..WorkflowRuntimeOptions::default()
2920            },
2921        );
2922
2923        let error = runtime
2924            .execute(json!({}), None)
2925            .await
2926            .expect_err("loop should fail once max iterations are exceeded");
2927
2928        assert!(matches!(
2929            error,
2930            WorkflowRuntimeError::LoopIterationLimitExceeded {
2931                node_id,
2932                max_iterations: 2
2933            } if node_id == "loop"
2934        ));
2935    }
2936
2937    #[tokio::test]
2938    async fn executes_parallel_then_merge_all() {
2939        let llm = MockLlmExecutor {
2940            output: "unused".to_string(),
2941        };
2942        let tool = EchoInputToolExecutor;
2943        let runtime = WorkflowRuntime::new(
2944            parallel_merge_workflow(MergePolicy::All, None),
2945            &llm,
2946            Some(&tool),
2947            WorkflowRuntimeOptions::default(),
2948        );
2949
2950        let result = runtime
2951            .execute(json!({}), None)
2952            .await
2953            .expect("parallel merge workflow should succeed");
2954
2955        assert_eq!(result.terminal_node_id, "end");
2956        assert_eq!(
2957            result.node_outputs.get("tool_a"),
2958            Some(&json!({"value": 1}))
2959        );
2960        assert_eq!(
2961            result.node_outputs.get("tool_b"),
2962            Some(&json!({"value": 2}))
2963        );
2964        assert_eq!(
2965            result.node_outputs.get("merge"),
2966            Some(&json!([{"value": 1}, {"value": 2}]))
2967        );
2968    }
2969
2970    #[tokio::test]
2971    async fn executes_map_reduce_sum() {
2972        let llm = MockLlmExecutor {
2973            output: "unused".to_string(),
2974        };
2975        let tool = EchoInputToolExecutor;
2976        let runtime = WorkflowRuntime::new(
2977            map_reduce_workflow(ReduceOperation::Sum),
2978            &llm,
2979            Some(&tool),
2980            WorkflowRuntimeOptions::default(),
2981        );
2982
2983        let result = runtime
2984            .execute(json!({"values": [1, 2, 3]}), None)
2985            .await
2986            .expect("map reduce workflow should succeed");
2987
2988        assert_eq!(result.node_outputs.get("map"), Some(&json!([1, 2, 3])));
2989        assert_eq!(result.node_outputs.get("reduce"), Some(&json!(6.0)));
2990    }
2991
2992    #[tokio::test]
2993    async fn fails_map_when_items_path_is_not_array() {
2994        let llm = MockLlmExecutor {
2995            output: "unused".to_string(),
2996        };
2997        let tool = EchoInputToolExecutor;
2998        let runtime = WorkflowRuntime::new(
2999            map_reduce_workflow(ReduceOperation::Count),
3000            &llm,
3001            Some(&tool),
3002            WorkflowRuntimeOptions::default(),
3003        );
3004
3005        let error = runtime
3006            .execute(json!({"values": {"not": "array"}}), None)
3007            .await
3008            .expect_err("map node should fail on non-array path");
3009
3010        assert!(matches!(
3011            error,
3012            WorkflowRuntimeError::MapItemsNotArray {
3013                node_id,
3014                items_path
3015            } if node_id == "map" && items_path == "input.values"
3016        ));
3017    }
3018
3019    #[tokio::test]
3020    async fn executes_subgraph_via_registry() {
3021        let llm = MockLlmExecutor {
3022            output: "nested-ok".to_string(),
3023        };
3024        let subgraph = WorkflowDefinition {
3025            version: "v0".to_string(),
3026            name: "child".to_string(),
3027            nodes: vec![
3028                Node {
3029                    id: "start".to_string(),
3030                    kind: NodeKind::Start {
3031                        next: "llm".to_string(),
3032                    },
3033                },
3034                Node {
3035                    id: "llm".to_string(),
3036                    kind: NodeKind::Llm {
3037                        model: "gpt-4".to_string(),
3038                        prompt: "child".to_string(),
3039                        next: Some("end".to_string()),
3040                    },
3041                },
3042                Node {
3043                    id: "end".to_string(),
3044                    kind: NodeKind::End,
3045                },
3046            ],
3047        };
3048        let parent = WorkflowDefinition {
3049            version: "v0".to_string(),
3050            name: "parent".to_string(),
3051            nodes: vec![
3052                Node {
3053                    id: "start".to_string(),
3054                    kind: NodeKind::Start {
3055                        next: "sub".to_string(),
3056                    },
3057                },
3058                Node {
3059                    id: "sub".to_string(),
3060                    kind: NodeKind::Subgraph {
3061                        graph: "child_graph".to_string(),
3062                        next: Some("end".to_string()),
3063                    },
3064                },
3065                Node {
3066                    id: "end".to_string(),
3067                    kind: NodeKind::End,
3068                },
3069            ],
3070        };
3071
3072        let mut registry = BTreeMap::new();
3073        registry.insert("child_graph".to_string(), subgraph);
3074        let runtime = WorkflowRuntime::new(
3075            parent,
3076            &llm,
3077            None,
3078            WorkflowRuntimeOptions {
3079                subgraph_registry: registry,
3080                ..WorkflowRuntimeOptions::default()
3081            },
3082        );
3083
3084        let result = runtime
3085            .execute(json!({"approved": true}), None)
3086            .await
3087            .expect("subgraph workflow should execute");
3088
3089        assert_eq!(result.terminal_node_id, "end");
3090        assert!(matches!(
3091            result.node_executions[1].data,
3092            NodeExecutionData::Subgraph { .. }
3093        ));
3094    }
3095
3096    #[tokio::test]
3097    async fn executes_batch_and_filter_nodes() {
3098        let llm = MockLlmExecutor {
3099            output: "unused".to_string(),
3100        };
3101        let workflow = WorkflowDefinition {
3102            version: "v0".to_string(),
3103            name: "batch-filter".to_string(),
3104            nodes: vec![
3105                Node {
3106                    id: "start".to_string(),
3107                    kind: NodeKind::Start {
3108                        next: "batch".to_string(),
3109                    },
3110                },
3111                Node {
3112                    id: "batch".to_string(),
3113                    kind: NodeKind::Batch {
3114                        items_path: "input.items".to_string(),
3115                        next: "filter".to_string(),
3116                    },
3117                },
3118                Node {
3119                    id: "filter".to_string(),
3120                    kind: NodeKind::Filter {
3121                        items_path: "node_outputs.batch".to_string(),
3122                        expression: "item.keep == true".to_string(),
3123                        next: "end".to_string(),
3124                    },
3125                },
3126                Node {
3127                    id: "end".to_string(),
3128                    kind: NodeKind::End,
3129                },
3130            ],
3131        };
3132
3133        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3134        let result = runtime
3135            .execute(
3136                json!({
3137                    "items": [
3138                        {"id": 1, "keep": true},
3139                        {"id": 2, "keep": false},
3140                        {"id": 3, "keep": true}
3141                    ]
3142                }),
3143                None,
3144            )
3145            .await
3146            .expect("batch/filter workflow should execute");
3147
3148        assert_eq!(
3149            result
3150                .node_outputs
3151                .get("batch")
3152                .and_then(Value::as_array)
3153                .map(Vec::len),
3154            Some(3)
3155        );
3156        assert_eq!(
3157            result
3158                .node_outputs
3159                .get("filter")
3160                .and_then(Value::as_array)
3161                .map(Vec::len),
3162            Some(2)
3163        );
3164    }
3165
3166    #[tokio::test]
3167    async fn executes_debounce_and_throttle_nodes() {
3168        let llm = MockLlmExecutor {
3169            output: "unused".to_string(),
3170        };
3171        let runtime = WorkflowRuntime::new(
3172            debounce_and_throttle_workflow(),
3173            &llm,
3174            None,
3175            WorkflowRuntimeOptions::default(),
3176        );
3177
3178        let result = runtime
3179            .execute(json!({"key": "k1"}), None)
3180            .await
3181            .expect("debounce/throttle workflow should execute");
3182
3183        assert_eq!(result.terminal_node_id, "end_throttled");
3184        assert_eq!(
3185            result.node_outputs.get("debounce_a"),
3186            Some(&json!({"key": "k1", "suppressed": true}))
3187        );
3188        assert_eq!(
3189            result.node_outputs.get("throttle_a"),
3190            Some(&json!({"key": "k1", "throttled": true}))
3191        );
3192    }
3193
3194    #[tokio::test]
3195    async fn executes_retry_compensate_successfully_without_compensation() {
3196        let llm = MockLlmExecutor {
3197            output: "unused".to_string(),
3198        };
3199        let tools = RetryCompensateToolExecutor {
3200            attempts: AtomicUsize::new(0),
3201        };
3202        let runtime = WorkflowRuntime::new(
3203            retry_compensate_workflow("unstable_primary"),
3204            &llm,
3205            Some(&tools),
3206            WorkflowRuntimeOptions::default(),
3207        );
3208
3209        let result = runtime
3210            .execute(json!({}), None)
3211            .await
3212            .expect("retry_compensate should recover by retry");
3213
3214        assert_eq!(result.terminal_node_id, "end");
3215        assert_eq!(tools.attempts.load(Ordering::Relaxed), 2);
3216        assert_eq!(result.retry_events.len(), 1);
3217    }
3218
3219    #[tokio::test]
3220    async fn executes_retry_compensate_with_compensation_route() {
3221        let llm = MockLlmExecutor {
3222            output: "unused".to_string(),
3223        };
3224        let tools = RetryCompensateToolExecutor {
3225            attempts: AtomicUsize::new(0),
3226        };
3227        let runtime = WorkflowRuntime::new(
3228            retry_compensate_workflow("always_fail"),
3229            &llm,
3230            Some(&tools),
3231            WorkflowRuntimeOptions::default(),
3232        );
3233
3234        let result = runtime
3235            .execute(json!({}), None)
3236            .await
3237            .expect("retry_compensate should use compensation");
3238
3239        assert_eq!(result.terminal_node_id, "end_compensated");
3240        assert_eq!(result.retry_events.len(), 1);
3241        assert_eq!(
3242            result
3243                .node_outputs
3244                .get("retry_comp")
3245                .and_then(|value| value.get("status")),
3246            Some(&json!("compensated"))
3247        );
3248    }
3249
3250    #[tokio::test]
3251    async fn executes_event_cache_router_human_transform_nodes() {
3252        let llm = MockLlmExecutor {
3253            output: "unused".to_string(),
3254        };
3255        let runtime = WorkflowRuntime::new(
3256            extended_nodes_workflow(),
3257            &llm,
3258            None,
3259            WorkflowRuntimeOptions::default(),
3260        );
3261
3262        let result = runtime
3263            .execute(
3264                json!({
3265                    "event_type": "webhook",
3266                    "cache_key": "customer-1",
3267                    "payload": {"value": 99},
3268                    "mode": "manual",
3269                    "approval": "approve",
3270                    "review_notes": {"editor": "ops"}
3271                }),
3272                None,
3273            )
3274            .await
3275            .expect("extended nodes workflow should execute");
3276
3277        assert_eq!(result.terminal_node_id, "end");
3278        assert_eq!(
3279            result.node_outputs.get("cache_read"),
3280            Some(&json!({"key": "customer-1", "hit": true, "value": {"value": 99}}))
3281        );
3282        assert_eq!(
3283            result.node_outputs.get("router"),
3284            Some(&json!({"selected": "human"}))
3285        );
3286        assert_eq!(
3287            result.node_outputs.get("transform"),
3288            Some(&json!({"value": 99}))
3289        );
3290    }
3291
3292    #[tokio::test]
3293    async fn routes_event_trigger_mismatch_to_fallback() {
3294        let llm = MockLlmExecutor {
3295            output: "unused".to_string(),
3296        };
3297        let runtime = WorkflowRuntime::new(
3298            extended_nodes_workflow(),
3299            &llm,
3300            None,
3301            WorkflowRuntimeOptions::default(),
3302        );
3303
3304        let result = runtime
3305            .execute(
3306                json!({
3307                    "event_type": "cron",
3308                    "cache_key": "customer-1",
3309                    "payload": {"value": 99},
3310                    "mode": "manual",
3311                    "approval": "approve"
3312                }),
3313                None,
3314            )
3315            .await
3316            .expect("event mismatch should route to fallback");
3317
3318        assert_eq!(result.terminal_node_id, "end_mismatch");
3319    }
3320
3321    #[tokio::test]
3322    async fn cache_read_routes_to_on_miss_when_value_absent() {
3323        let llm = MockLlmExecutor {
3324            output: "unused".to_string(),
3325        };
3326        let workflow = WorkflowDefinition {
3327            version: "v0".to_string(),
3328            name: "cache-read-miss".to_string(),
3329            nodes: vec![
3330                Node {
3331                    id: "start".to_string(),
3332                    kind: NodeKind::Start {
3333                        next: "cache_read".to_string(),
3334                    },
3335                },
3336                Node {
3337                    id: "cache_read".to_string(),
3338                    kind: NodeKind::CacheRead {
3339                        key_path: "input.cache_key".to_string(),
3340                        next: "end_hit".to_string(),
3341                        on_miss: Some("end_miss".to_string()),
3342                    },
3343                },
3344                Node {
3345                    id: "end_hit".to_string(),
3346                    kind: NodeKind::End,
3347                },
3348                Node {
3349                    id: "end_miss".to_string(),
3350                    kind: NodeKind::End,
3351                },
3352            ],
3353        };
3354        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3355
3356        let result = runtime
3357            .execute(json!({"cache_key": "new-customer"}), None)
3358            .await
3359            .expect("cache read miss should still execute via on_miss route");
3360
3361        assert_eq!(
3362            result.node_outputs.get("cache_read"),
3363            Some(&json!({"key": "new-customer", "hit": false, "value": null}))
3364        );
3365        assert_eq!(result.terminal_node_id, "end_miss");
3366    }
3367
3368    #[tokio::test]
3369    async fn cache_read_treats_cached_null_as_hit() {
3370        let llm = MockLlmExecutor {
3371            output: "unused".to_string(),
3372        };
3373        let workflow = WorkflowDefinition {
3374            version: "v0".to_string(),
3375            name: "cache-read-null-hit".to_string(),
3376            nodes: vec![
3377                Node {
3378                    id: "start".to_string(),
3379                    kind: NodeKind::Start {
3380                        next: "cache_write".to_string(),
3381                    },
3382                },
3383                Node {
3384                    id: "cache_write".to_string(),
3385                    kind: NodeKind::CacheWrite {
3386                        key_path: "input.cache_key".to_string(),
3387                        value_path: "input.payload".to_string(),
3388                        next: "cache_read".to_string(),
3389                    },
3390                },
3391                Node {
3392                    id: "cache_read".to_string(),
3393                    kind: NodeKind::CacheRead {
3394                        key_path: "input.cache_key".to_string(),
3395                        next: "end_hit".to_string(),
3396                        on_miss: Some("end_miss".to_string()),
3397                    },
3398                },
3399                Node {
3400                    id: "end_hit".to_string(),
3401                    kind: NodeKind::End,
3402                },
3403                Node {
3404                    id: "end_miss".to_string(),
3405                    kind: NodeKind::End,
3406                },
3407            ],
3408        };
3409        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3410
3411        let result = runtime
3412            .execute(json!({"cache_key": "customer-null", "payload": null}), None)
3413            .await
3414            .expect("cache read hit with null value should execute");
3415
3416        assert_eq!(
3417            result.node_outputs.get("cache_read"),
3418            Some(&json!({"key": "customer-null", "hit": true, "value": null}))
3419        );
3420        assert_eq!(result.terminal_node_id, "end_hit");
3421    }
3422
3423    #[tokio::test]
3424    async fn rejects_condition_when_expression_scope_exceeds_limit() {
3425        let llm = MockLlmExecutor {
3426            output: "unused".to_string(),
3427        };
3428        let workflow = WorkflowDefinition {
3429            version: "v0".to_string(),
3430            name: "scope-limit".to_string(),
3431            nodes: vec![
3432                Node {
3433                    id: "start".to_string(),
3434                    kind: NodeKind::Start {
3435                        next: "condition".to_string(),
3436                    },
3437                },
3438                Node {
3439                    id: "condition".to_string(),
3440                    kind: NodeKind::Condition {
3441                        expression: "input.flag == true".to_string(),
3442                        on_true: "end".to_string(),
3443                        on_false: "end".to_string(),
3444                    },
3445                },
3446                Node {
3447                    id: "end".to_string(),
3448                    kind: NodeKind::End,
3449                },
3450            ],
3451        };
3452
3453        let runtime = WorkflowRuntime::new(
3454            workflow,
3455            &llm,
3456            None,
3457            WorkflowRuntimeOptions {
3458                security_limits: RuntimeSecurityLimits {
3459                    max_expression_scope_bytes: 32,
3460                    ..RuntimeSecurityLimits::default()
3461                },
3462                ..WorkflowRuntimeOptions::default()
3463            },
3464        );
3465
3466        let error = runtime
3467            .execute(
3468                json!({"flag": true, "payload": "this-is-too-large-for-limit"}),
3469                None,
3470            )
3471            .await
3472            .expect_err("condition should fail when scope budget is exceeded");
3473        assert!(matches!(
3474            error,
3475            WorkflowRuntimeError::ExpressionScopeLimitExceeded { node_id, .. }
3476                if node_id == "condition"
3477        ));
3478    }
3479
3480    #[tokio::test]
3481    async fn rejects_map_when_item_count_exceeds_limit() {
3482        let llm = MockLlmExecutor {
3483            output: "unused".to_string(),
3484        };
3485        let tool = EchoInputToolExecutor;
3486        let runtime = WorkflowRuntime::new(
3487            map_reduce_workflow(ReduceOperation::Count),
3488            &llm,
3489            Some(&tool),
3490            WorkflowRuntimeOptions {
3491                security_limits: RuntimeSecurityLimits {
3492                    max_map_items: 2,
3493                    ..RuntimeSecurityLimits::default()
3494                },
3495                ..WorkflowRuntimeOptions::default()
3496            },
3497        );
3498
3499        let error = runtime
3500            .execute(json!({"values": [1, 2, 3]}), None)
3501            .await
3502            .expect_err("map should fail when item guard is exceeded");
3503        assert!(matches!(
3504            error,
3505            WorkflowRuntimeError::MapItemLimitExceeded {
3506                node_id,
3507                actual_items: 3,
3508                max_items: 2,
3509            } if node_id == "map"
3510        ));
3511    }
3512
3513    #[tokio::test]
3514    async fn resumes_from_checkpoint() {
3515        let llm = MockLlmExecutor {
3516            output: "unused".to_string(),
3517        };
3518        let tool = MockToolExecutor {
3519            output: json!({"ok": true}),
3520            fail: false,
3521        };
3522        let runtime = WorkflowRuntime::new(
3523            linear_workflow(),
3524            &llm,
3525            Some(&tool),
3526            WorkflowRuntimeOptions::default(),
3527        );
3528
3529        let checkpoint = WorkflowCheckpoint {
3530            run_id: "run-1".to_string(),
3531            workflow_name: "linear".to_string(),
3532            step: 2,
3533            next_node_id: "tool".to_string(),
3534            scope_snapshot: json!({"input": {"request_id": "r-resume"}}),
3535        };
3536
3537        let resumed = runtime
3538            .execute_resume_from_failure(&checkpoint, None)
3539            .await
3540            .expect("resume should continue from checkpoint node");
3541        assert_eq!(resumed.terminal_node_id, "end");
3542        assert_eq!(resumed.node_executions[0].node_id, "tool");
3543    }
3544
3545    #[test]
3546    fn scope_capabilities_enforce_read_write_boundaries() {
3547        let mut scope = RuntimeScope::new(json!({"k": "v"}));
3548
3549        let read_error = scope
3550            .scoped_input(ScopeCapability::LlmWrite)
3551            .expect_err("write capability should not read scope");
3552        assert!(matches!(read_error, ScopeAccessError::ReadDenied { .. }));
3553
3554        let write_error = scope
3555            .record_tool_output("tool", json!({"ok": true}), ScopeCapability::LlmWrite)
3556            .expect_err("llm write capability should not write tool output");
3557        assert!(matches!(write_error, ScopeAccessError::WriteDenied { .. }));
3558    }
3559}