Skip to main content

simple_agents_workflow/
runtime.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use serde_json::{json, Map, Value};
7use simple_agent_type::message::Message;
8use simple_agent_type::request::CompletionRequest;
9use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
10use thiserror::Error;
11use tokio::time::timeout;
12
13use crate::checkpoint::WorkflowCheckpoint;
14use crate::expressions;
15use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
16use crate::recorder::{TraceRecordError, TraceRecorder};
17use crate::replay::{replay_trace, ReplayError, ReplayReport};
18use crate::scheduler::DagScheduler;
19pub use crate::state::ScopeAccessError;
20use crate::trace::{TraceTerminalStatus, WorkflowTrace, WorkflowTraceMetadata};
21use crate::validation::{validate_and_normalize, ValidationErrors};
22
23mod engine;
24mod scope;
25use scope::{RuntimeScope, ScopeCapability};
26
27/// Runtime configuration for workflow execution.
28#[derive(Debug, Clone)]
29pub struct WorkflowRuntimeOptions {
30    /// Maximum number of node steps before aborting execution.
31    pub max_steps: usize,
32    /// Validate and normalize workflow before every run.
33    pub validate_before_run: bool,
34    /// Retry and timeout policy for LLM nodes.
35    pub llm_node_policy: NodeExecutionPolicy,
36    /// Retry and timeout policy for tool nodes.
37    pub tool_node_policy: NodeExecutionPolicy,
38    /// Enable deterministic trace recording for runtime events.
39    pub enable_trace_recording: bool,
40    /// Optional replay validation mode for recorded traces.
41    pub replay_mode: WorkflowReplayMode,
42    /// Global scheduler bound for node-level concurrent fan-out.
43    pub scheduler_max_in_flight: usize,
44    /// Named subgraph registry used by `subgraph` nodes.
45    pub subgraph_registry: BTreeMap<String, WorkflowDefinition>,
46    /// Runtime guardrails for expression and fan-out resource usage.
47    pub security_limits: RuntimeSecurityLimits,
48}
49
50/// Runtime-enforced security and resource limits.
51#[derive(Debug, Clone)]
52pub struct RuntimeSecurityLimits {
53    /// Maximum serialized bytes allowed for one expression evaluation scope.
54    pub max_expression_scope_bytes: usize,
55    /// Maximum array size accepted by `map` nodes.
56    pub max_map_items: usize,
57    /// Maximum branch count accepted by `parallel` nodes.
58    pub max_parallel_branches: usize,
59    /// Maximum array size accepted by `filter` nodes.
60    pub max_filter_items: usize,
61}
62
63/// Runtime replay behavior for deterministic runs.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum WorkflowReplayMode {
66    /// Trace replay validation is disabled.
67    Disabled,
68    /// Validate the recorded trace via `replay_trace` before success return.
69    ValidateRecordedTrace,
70}
71
72/// Retry and timeout policy for runtime-owned node execution.
73#[derive(Debug, Clone, Default)]
74pub struct NodeExecutionPolicy {
75    /// Per-attempt timeout. `None` disables timeout enforcement.
76    pub timeout: Option<Duration>,
77    /// Number of retries after the first failed attempt.
78    pub max_retries: usize,
79}
80
81impl Default for WorkflowRuntimeOptions {
82    fn default() -> Self {
83        Self {
84            max_steps: 256,
85            validate_before_run: true,
86            llm_node_policy: NodeExecutionPolicy::default(),
87            tool_node_policy: NodeExecutionPolicy::default(),
88            enable_trace_recording: true,
89            replay_mode: WorkflowReplayMode::Disabled,
90            scheduler_max_in_flight: 8,
91            subgraph_registry: BTreeMap::new(),
92            security_limits: RuntimeSecurityLimits::default(),
93        }
94    }
95}
96
97impl Default for RuntimeSecurityLimits {
98    fn default() -> Self {
99        Self {
100            max_expression_scope_bytes: 128 * 1024,
101            max_map_items: 4096,
102            max_parallel_branches: 128,
103            max_filter_items: 8192,
104        }
105    }
106}
107
108/// Cooperative cancellation interface for workflow runs.
109pub trait CancellationSignal: Send + Sync {
110    /// Returns true if execution should stop.
111    fn is_cancelled(&self) -> bool;
112}
113
114impl CancellationSignal for AtomicBool {
115    fn is_cancelled(&self) -> bool {
116        self.load(Ordering::Relaxed)
117    }
118}
119
120impl CancellationSignal for bool {
121    fn is_cancelled(&self) -> bool {
122        *self
123    }
124}
125
126/// Input payload passed to the LLM executor adapter.
127#[derive(Debug, Clone, PartialEq)]
128pub struct LlmExecutionInput {
129    /// Current workflow node id.
130    pub node_id: String,
131    /// Requested model.
132    pub model: String,
133    /// Prompt configured on the workflow node.
134    pub prompt: String,
135    /// Deterministic scoped context for this node execution.
136    pub scoped_input: Value,
137}
138
139/// LLM output returned to the runtime.
140#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct LlmExecutionOutput {
142    /// Assistant text returned by the model.
143    pub content: String,
144}
145
146/// Typed LLM adapter errors.
147#[derive(Debug, Error, Clone, PartialEq, Eq)]
148pub enum LlmExecutionError {
149    /// CompletionRequest build/validation failure.
150    #[error("invalid completion request: {0}")]
151    InvalidRequest(String),
152    /// Core client call failed.
153    #[error("llm client execution failed: {0}")]
154    Client(String),
155    /// Unexpected completion mode returned by client.
156    #[error("unexpected completion outcome: {0}")]
157    UnexpectedOutcome(&'static str),
158    /// Response did not include assistant content.
159    #[error("llm response had no content")]
160    EmptyResponse,
161}
162
163/// Async runtime adapter for LLM calls.
164#[async_trait]
165pub trait LlmExecutor: Send + Sync {
166    /// Executes one workflow LLM node.
167    async fn execute(
168        &self,
169        input: LlmExecutionInput,
170    ) -> Result<LlmExecutionOutput, LlmExecutionError>;
171}
172
173#[async_trait]
174impl LlmExecutor for SimpleAgentsClient {
175    async fn execute(
176        &self,
177        input: LlmExecutionInput,
178    ) -> Result<LlmExecutionOutput, LlmExecutionError> {
179        let user_prompt = build_prompt_with_scope(&input.prompt, &input.scoped_input);
180        let request = CompletionRequest::builder()
181            .model(input.model)
182            .message(Message::user(user_prompt))
183            .build()
184            .map_err(|error| LlmExecutionError::InvalidRequest(error.to_string()))?;
185
186        let outcome = self
187            .complete(&request, CompletionOptions::default())
188            .await
189            .map_err(|error| LlmExecutionError::Client(error.to_string()))?;
190
191        let response = match outcome {
192            CompletionOutcome::Response(response) => response,
193            CompletionOutcome::Stream(_) => {
194                return Err(LlmExecutionError::UnexpectedOutcome("stream"));
195            }
196            CompletionOutcome::HealedJson(_) => {
197                return Err(LlmExecutionError::UnexpectedOutcome("healed_json"));
198            }
199            CompletionOutcome::CoercedSchema(_) => {
200                return Err(LlmExecutionError::UnexpectedOutcome("coerced_schema"));
201            }
202        };
203
204        let content = response
205            .content()
206            .ok_or(LlmExecutionError::EmptyResponse)?
207            .to_string();
208
209        Ok(LlmExecutionOutput { content })
210    }
211}
212
213/// Input payload for host-provided tool execution.
214#[derive(Debug, Clone, PartialEq)]
215pub struct ToolExecutionInput {
216    /// Current workflow node id.
217    pub node_id: String,
218    /// Tool name declared by the workflow.
219    pub tool: String,
220    /// Static node input payload.
221    pub input: Value,
222    /// Deterministic scoped context for this node execution.
223    pub scoped_input: Value,
224}
225
226/// Typed tool execution errors.
227#[derive(Debug, Error, Clone, PartialEq, Eq)]
228pub enum ToolExecutionError {
229    /// Tool implementation is unavailable in the host runtime.
230    #[error("tool handler not found: {tool}")]
231    NotFound {
232        /// Missing tool name.
233        tool: String,
234    },
235    /// Tool returned an execution failure.
236    #[error("tool execution failed: {0}")]
237    Failed(String),
238}
239
240/// Async host tool executor surface.
241#[async_trait]
242pub trait ToolExecutor: Send + Sync {
243    /// Executes one workflow tool node.
244    async fn execute_tool(&self, input: ToolExecutionInput) -> Result<Value, ToolExecutionError>;
245}
246
247/// Node-level execution result.
248#[derive(Debug, Clone, PartialEq)]
249pub struct NodeExecution {
250    /// Zero-based step index.
251    pub step: usize,
252    /// Executed node id.
253    pub node_id: String,
254    /// Structured execution payload.
255    pub data: NodeExecutionData,
256}
257
258/// Structured result data emitted for each node.
259#[derive(Debug, Clone, PartialEq)]
260pub enum NodeExecutionData {
261    /// Start node transition.
262    Start {
263        /// Next node id.
264        next: String,
265    },
266    /// LLM node output.
267    Llm {
268        /// Model used.
269        model: String,
270        /// Assistant output text.
271        output: String,
272        /// Next node id.
273        next: String,
274    },
275    /// Tool node output.
276    Tool {
277        /// Tool name.
278        tool: String,
279        /// Tool output JSON payload.
280        output: Value,
281        /// Next node id.
282        next: String,
283    },
284    /// Condition node decision.
285    Condition {
286        /// Original expression.
287        expression: String,
288        /// Evaluated bool.
289        evaluated: bool,
290        /// Chosen next node id.
291        next: String,
292    },
293    /// Debounce gate decision.
294    Debounce {
295        /// Resolved debounce key.
296        key: String,
297        /// Whether the event was suppressed.
298        suppressed: bool,
299        /// Chosen next node id.
300        next: String,
301    },
302    /// Throttle gate decision.
303    Throttle {
304        /// Resolved throttle key.
305        key: String,
306        /// Whether the event was throttled.
307        throttled: bool,
308        /// Chosen next node id.
309        next: String,
310    },
311    /// Explicit retry/compensation execution result.
312    RetryCompensate {
313        /// Primary tool name.
314        tool: String,
315        /// Total primary attempts executed.
316        attempts: usize,
317        /// Whether compensation path was used.
318        compensated: bool,
319        /// Node output payload.
320        output: Value,
321        /// Chosen next node id.
322        next: String,
323    },
324    /// Human decision result.
325    HumanInTheLoop {
326        /// True when approved.
327        approved: bool,
328        /// Optional human payload routed through the node.
329        response: Value,
330        /// Chosen next node id.
331        next: String,
332    },
333    /// Cache read result.
334    CacheRead {
335        /// Cache key.
336        key: String,
337        /// True when key was found.
338        hit: bool,
339        /// Returned value (null on miss).
340        value: Value,
341        /// Chosen next node id.
342        next: String,
343    },
344    /// Cache write result.
345    CacheWrite {
346        /// Cache key.
347        key: String,
348        /// Stored value.
349        value: Value,
350        /// Next node id.
351        next: String,
352    },
353    /// Event trigger evaluation result.
354    EventTrigger {
355        /// Expected event name.
356        event: String,
357        /// True when input event matched.
358        matched: bool,
359        /// Chosen next node id.
360        next: String,
361    },
362    /// Router/selector decision.
363    Router {
364        /// Selected route node id.
365        selected: String,
366        /// Chosen next node id.
367        next: String,
368    },
369    /// Transform node output.
370    Transform {
371        /// Original transform expression.
372        expression: String,
373        /// Evaluated output payload.
374        output: Value,
375        /// Next node id.
376        next: String,
377    },
378    /// Loop node decision.
379    Loop {
380        /// Original loop condition.
381        condition: String,
382        /// Evaluated bool.
383        evaluated: bool,
384        /// Current iteration when entering body. Zero when exiting loop.
385        iteration: u32,
386        /// Chosen next node id.
387        next: String,
388    },
389    /// Subgraph execution result.
390    Subgraph {
391        /// Subgraph registry key.
392        graph: String,
393        /// Subgraph terminal node id.
394        terminal_node_id: String,
395        /// Aggregated subgraph node outputs.
396        output: Value,
397        /// Next node id.
398        next: String,
399    },
400    /// Batch extraction result.
401    Batch {
402        /// Source path used for extraction.
403        items_path: String,
404        /// Number of extracted items.
405        item_count: usize,
406        /// Next node id.
407        next: String,
408    },
409    /// Filter evaluation result.
410    Filter {
411        /// Source path used for extraction.
412        items_path: String,
413        /// Filter predicate expression.
414        expression: String,
415        /// Number of retained items.
416        kept: usize,
417        /// Next node id.
418        next: String,
419    },
420    /// Parallel node aggregate result.
421    Parallel {
422        /// Branch node ids executed by this node.
423        branches: Vec<String>,
424        /// Branch outputs keyed by branch node id.
425        outputs: BTreeMap<String, Value>,
426        /// Next node id.
427        next: String,
428    },
429    /// Merge node aggregate output.
430    Merge {
431        /// Merge policy used.
432        policy: MergePolicy,
433        /// Source node ids consumed.
434        sources: Vec<String>,
435        /// Merged value.
436        output: Value,
437        /// Next node id.
438        next: String,
439    },
440    /// Map node output.
441    Map {
442        /// Number of input items mapped.
443        item_count: usize,
444        /// Collected mapped values in source order.
445        output: Value,
446        /// Next node id.
447        next: String,
448    },
449    /// Reduce node output.
450    Reduce {
451        /// Reduce operation executed.
452        operation: ReduceOperation,
453        /// Reduced result.
454        output: Value,
455        /// Next node id.
456        next: String,
457    },
458    /// End node reached.
459    End,
460}
461
462/// Runtime event stream payload for trace consumers.
463#[derive(Debug, Clone, PartialEq)]
464pub struct WorkflowEvent {
465    /// Zero-based step index.
466    pub step: usize,
467    /// Node id associated with this event.
468    pub node_id: String,
469    /// Event payload.
470    pub kind: WorkflowEventKind,
471}
472
473/// Event kinds emitted by the runtime.
474#[derive(Debug, Clone, PartialEq)]
475pub enum WorkflowEventKind {
476    /// Node execution started.
477    NodeStarted,
478    /// Node execution completed successfully.
479    NodeCompleted {
480        /// Node execution payload.
481        data: NodeExecutionData,
482    },
483    /// Node execution failed.
484    NodeFailed {
485        /// Error message.
486        message: String,
487    },
488}
489
490/// Final workflow execution report.
491#[derive(Debug, Clone, PartialEq)]
492pub struct WorkflowRunResult {
493    /// Workflow name.
494    pub workflow_name: String,
495    /// Final terminal node id.
496    pub terminal_node_id: String,
497    /// Ordered node execution records.
498    pub node_executions: Vec<NodeExecution>,
499    /// Ordered runtime events.
500    pub events: Vec<WorkflowEvent>,
501    /// Retry diagnostics captured during node execution.
502    pub retry_events: Vec<WorkflowRetryEvent>,
503    /// Node output map keyed by node id.
504    pub node_outputs: BTreeMap<String, Value>,
505    /// Optional deterministic trace captured during this run.
506    pub trace: Option<WorkflowTrace>,
507    /// Replay validation report when replay mode is enabled.
508    pub replay_report: Option<ReplayReport>,
509}
510
511/// Captures one retry decision and the reason it occurred.
512#[derive(Debug, Clone, PartialEq, Eq)]
513pub struct WorkflowRetryEvent {
514    /// Zero-based step index for the active node.
515    pub step: usize,
516    /// Node id that will be retried.
517    pub node_id: String,
518    /// Operation category (`llm` or `tool`).
519    pub operation: String,
520    /// Attempt that failed and triggered the retry.
521    pub failed_attempt: usize,
522    /// Human-readable retry reason.
523    pub reason: String,
524}
525
526/// Runtime failures.
527#[derive(Debug, Error)]
528pub enum WorkflowRuntimeError {
529    /// Workflow failed structural validation.
530    #[error(transparent)]
531    Validation(#[from] ValidationErrors),
532    /// Workflow has no start node.
533    #[error("workflow has no start node")]
534    MissingStartNode,
535    /// Current node id not found in node index.
536    #[error("node not found: {node_id}")]
537    NodeNotFound {
538        /// Missing node id.
539        node_id: String,
540    },
541    /// Loop protection guard triggered.
542    #[error("workflow exceeded max step limit ({max_steps})")]
543    StepLimitExceeded {
544        /// Configured max steps.
545        max_steps: usize,
546    },
547    /// Execution cancelled by caller.
548    #[error("workflow execution cancelled")]
549    Cancelled,
550    /// LLM node failed.
551    #[error("llm node '{node_id}' failed: {source}")]
552    Llm {
553        /// Failing node id.
554        node_id: String,
555        /// Source error.
556        source: LlmExecutionError,
557    },
558    /// Tool node failed.
559    #[error("tool node '{node_id}' failed: {source}")]
560    Tool {
561        /// Failing node id.
562        node_id: String,
563        /// Source error.
564        source: ToolExecutionError,
565    },
566    /// LLM node exhausted all retry attempts.
567    #[error("llm node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
568    LlmRetryExhausted {
569        /// Failing node id.
570        node_id: String,
571        /// Number of attempts made.
572        attempts: usize,
573        /// Last attempt error.
574        last_error: LlmExecutionError,
575    },
576    /// Tool node exhausted all retry attempts.
577    #[error("tool node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
578    ToolRetryExhausted {
579        /// Failing node id.
580        node_id: String,
581        /// Number of attempts made.
582        attempts: usize,
583        /// Last attempt error.
584        last_error: ToolExecutionError,
585    },
586    /// LLM node timed out across all attempts.
587    #[error(
588        "llm node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
589    )]
590    LlmTimeout {
591        /// Failing node id.
592        node_id: String,
593        /// Per-attempt timeout in milliseconds.
594        timeout_ms: u128,
595        /// Number of attempts made.
596        attempts: usize,
597    },
598    /// Tool node timed out across all attempts.
599    #[error(
600        "tool node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
601    )]
602    ToolTimeout {
603        /// Failing node id.
604        node_id: String,
605        /// Per-attempt timeout in milliseconds.
606        timeout_ms: u128,
607        /// Number of attempts made.
608        attempts: usize,
609    },
610    /// Tool node reached without an executor.
611    #[error("tool node '{node_id}' requires a tool executor")]
612    MissingToolExecutor {
613        /// Failing node id.
614        node_id: String,
615    },
616    /// LLM/tool node missing its `next` edge.
617    #[error("node '{node_id}' is missing required next edge")]
618    MissingNextEdge {
619        /// Failing node id.
620        node_id: String,
621    },
622    /// Condition expression could not be evaluated.
623    #[error("condition node '{node_id}' has invalid expression '{expression}': {reason}")]
624    InvalidCondition {
625        /// Failing node id.
626        node_id: String,
627        /// Condition expression.
628        expression: String,
629        /// Detailed reason.
630        reason: String,
631    },
632    /// Loop condition expression could not be evaluated.
633    #[error("loop node '{node_id}' has invalid condition '{expression}': {reason}")]
634    InvalidLoopCondition {
635        /// Failing node id.
636        node_id: String,
637        /// Loop condition expression.
638        expression: String,
639        /// Detailed reason.
640        reason: String,
641    },
642    /// Loop exceeded configured max iterations.
643    #[error("loop node '{node_id}' exceeded max iterations ({max_iterations})")]
644    LoopIterationLimitExceeded {
645        /// Failing node id.
646        node_id: String,
647        /// Configured max iterations.
648        max_iterations: u32,
649    },
650    /// Non-terminal node did not return a next transition.
651    #[error("node '{node_id}' did not provide a next transition")]
652    MissingNextTransition {
653        /// Failing node id.
654        node_id: String,
655    },
656    /// Scoped state boundary check failed.
657    #[error("scope access failed on node '{node_id}': {source}")]
658    ScopeAccess {
659        /// Failing node id.
660        node_id: String,
661        /// Source access error.
662        source: ScopeAccessError,
663    },
664    /// Trace recorder operation failed.
665    #[error(transparent)]
666    TraceRecording(#[from] TraceRecordError),
667    /// Replay trace validation failed.
668    #[error(transparent)]
669    Replay(#[from] ReplayError),
670    /// Replay mode requested without trace recording.
671    #[error("replay validation requires trace recording to be enabled")]
672    ReplayRequiresTraceRecording,
673    /// Parallel branch references unsupported node kind.
674    #[error("parallel node '{node_id}' cannot execute branch '{branch_id}': {reason}")]
675    ParallelBranchUnsupported {
676        /// Parallel node id.
677        node_id: String,
678        /// Branch node id.
679        branch_id: String,
680        /// Reason for rejection.
681        reason: String,
682    },
683    /// Merge node cannot resolve one or more sources.
684    #[error("merge node '{node_id}' missing source output from '{source_id}'")]
685    MissingMergeSource {
686        /// Merge node id.
687        node_id: String,
688        /// Missing source id.
689        source_id: String,
690    },
691    /// Merge quorum policy could not be satisfied.
692    #[error("merge node '{node_id}' quorum not met: required {required}, resolved {resolved}")]
693    MergeQuorumNotMet {
694        /// Merge node id.
695        node_id: String,
696        /// Required number of sources.
697        required: usize,
698        /// Number of resolved sources.
699        resolved: usize,
700    },
701    /// Map node items path did not resolve to an array.
702    #[error("map node '{node_id}' items_path '{items_path}' did not resolve to an array")]
703    MapItemsNotArray {
704        /// Map node id.
705        node_id: String,
706        /// Configured path.
707        items_path: String,
708    },
709    /// Reduce node source value is not reducible.
710    #[error("reduce node '{node_id}' source '{source_node}' is not reducible: {reason}")]
711    InvalidReduceInput {
712        /// Reduce node id.
713        node_id: String,
714        /// Source node id.
715        source_node: String,
716        /// Detailed reason.
717        reason: String,
718    },
719    /// Subgraph registry key not found.
720    #[error("subgraph node '{node_id}' references unknown graph '{graph}'")]
721    SubgraphNotFound { node_id: String, graph: String },
722    /// Batch source path did not resolve to an array.
723    #[error("batch node '{node_id}' items_path '{items_path}' did not resolve to an array")]
724    BatchItemsNotArray { node_id: String, items_path: String },
725    /// Filter source path did not resolve to an array.
726    #[error("filter node '{node_id}' items_path '{items_path}' did not resolve to an array")]
727    FilterItemsNotArray { node_id: String, items_path: String },
728    /// Filter expression failed while evaluating one item.
729    #[error("filter node '{node_id}' expression '{expression}' failed: {reason}")]
730    InvalidFilterExpression {
731        node_id: String,
732        expression: String,
733        reason: String,
734    },
735    /// Serialized expression scope exceeded configured runtime limit.
736    #[error(
737        "expression scope too large on node '{node_id}': {actual_bytes} bytes exceeds limit {limit_bytes}"
738    )]
739    ExpressionScopeLimitExceeded {
740        node_id: String,
741        actual_bytes: usize,
742        limit_bytes: usize,
743    },
744    /// Parallel branch fan-out exceeded configured runtime limit.
745    #[error(
746        "parallel node '{node_id}' has {actual_branches} branches, exceeding limit {max_branches}"
747    )]
748    ParallelBranchLimitExceeded {
749        node_id: String,
750        actual_branches: usize,
751        max_branches: usize,
752    },
753    /// Map item count exceeded configured runtime limit.
754    #[error("map node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
755    MapItemLimitExceeded {
756        node_id: String,
757        actual_items: usize,
758        max_items: usize,
759    },
760    /// Filter item count exceeded configured runtime limit.
761    #[error("filter node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
762    FilterItemLimitExceeded {
763        node_id: String,
764        actual_items: usize,
765        max_items: usize,
766    },
767    /// A node-required path could not be resolved.
768    #[error("node '{node_id}' could not resolve required path '{path}'")]
769    MissingPath { node_id: String, path: String },
770    /// A cache key path did not resolve to a string.
771    #[error("node '{node_id}' path '{path}' did not resolve to a string cache key")]
772    CacheKeyNotString { node_id: String, path: String },
773    /// Human decision value is unsupported.
774    #[error("human node '{node_id}' has unsupported decision value at '{path}': {value}")]
775    InvalidHumanDecision {
776        node_id: String,
777        path: String,
778        value: String,
779    },
780    /// Event value did not resolve to a string.
781    #[error("event trigger node '{node_id}' path '{path}' did not resolve to an event string")]
782    InvalidEventValue { node_id: String, path: String },
783    /// Router expression evaluation failed.
784    #[error("router node '{node_id}' route expression '{expression}' failed: {reason}")]
785    InvalidRouterExpression {
786        node_id: String,
787        expression: String,
788        reason: String,
789    },
790    /// Transform expression evaluation failed.
791    #[error("transform node '{node_id}' expression '{expression}' failed: {reason}")]
792    InvalidTransformExpression {
793        node_id: String,
794        expression: String,
795        reason: String,
796    },
797    /// Explicit retry/compensate node exhausted primary and compensation failed.
798    #[error(
799        "retry_compensate node '{node_id}' failed after {attempts} primary attempt(s) and compensation error: {compensation_error}"
800    )]
801    RetryCompensateFailed {
802        node_id: String,
803        attempts: usize,
804        compensation_error: ToolExecutionError,
805    },
806}
807
808/// Deterministic minimal runtime for workflow execution.
809pub struct WorkflowRuntime<'a> {
810    definition: WorkflowDefinition,
811    llm_executor: &'a dyn LlmExecutor,
812    tool_executor: Option<&'a dyn ToolExecutor>,
813    options: WorkflowRuntimeOptions,
814}
815
816impl<'a> WorkflowRuntime<'a> {
817    /// Creates a runtime with host-provided LLM and tool adapters.
818    pub fn new(
819        definition: WorkflowDefinition,
820        llm_executor: &'a dyn LlmExecutor,
821        tool_executor: Option<&'a dyn ToolExecutor>,
822        options: WorkflowRuntimeOptions,
823    ) -> Self {
824        Self {
825            definition,
826            llm_executor,
827            tool_executor,
828            options,
829        }
830    }
831
832    /// Executes a workflow deterministically from `start` to `end`.
833    pub async fn execute(
834        &self,
835        input: Value,
836        cancellation: Option<&dyn CancellationSignal>,
837    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
838        let workflow = if self.options.validate_before_run {
839            validate_and_normalize(&self.definition)?
840        } else {
841            self.definition.normalized()
842        };
843
844        let start_id = find_start_node_id(&workflow)?;
845        self.execute_from_node(
846            workflow,
847            RuntimeScope::new(input),
848            start_id,
849            0,
850            cancellation,
851        )
852        .await
853    }
854
855    /// Resumes execution from a checkpoint created after a node failure.
856    pub async fn execute_resume_from_failure(
857        &self,
858        checkpoint: &WorkflowCheckpoint,
859        cancellation: Option<&dyn CancellationSignal>,
860    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
861        let workflow = if self.options.validate_before_run {
862            validate_and_normalize(&self.definition)?
863        } else {
864            self.definition.normalized()
865        };
866
867        let scope_input = checkpoint
868            .scope_snapshot
869            .get("input")
870            .cloned()
871            .unwrap_or_else(|| checkpoint.scope_snapshot.clone());
872
873        self.execute_from_node(
874            workflow,
875            RuntimeScope::new(scope_input),
876            checkpoint.next_node_id.clone(),
877            checkpoint.step,
878            cancellation,
879        )
880        .await
881    }
882
883    async fn execute_from_node(
884        &self,
885        workflow: WorkflowDefinition,
886        scope: RuntimeScope,
887        start_node_id: String,
888        starting_step: usize,
889        cancellation: Option<&dyn CancellationSignal>,
890    ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
891        engine::execute_from_node(
892            self,
893            workflow,
894            scope,
895            start_node_id,
896            starting_step,
897            cancellation,
898        )
899        .await
900    }
901
902    async fn execute_node(
903        &self,
904        node: &Node,
905        node_index: &HashMap<&str, &Node>,
906        step: usize,
907        scope: &mut RuntimeScope,
908        cancellation: Option<&dyn CancellationSignal>,
909        retry_events: &mut Vec<WorkflowRetryEvent>,
910    ) -> Result<NodeExecution, WorkflowRuntimeError> {
911        engine::execute_node(
912            self,
913            node,
914            node_index,
915            step,
916            scope,
917            cancellation,
918            retry_events,
919        )
920        .await
921    }
922
923    fn execute_start_node(&self, step: usize, node: &Node, next: &str) -> NodeExecution {
924        NodeExecution {
925            step,
926            node_id: node.id.clone(),
927            data: NodeExecutionData::Start {
928                next: next.to_string(),
929            },
930        }
931    }
932
933    async fn execute_llm_node(
934        &self,
935        step: usize,
936        node: &Node,
937        spec: LlmNodeSpec<'_>,
938        scope: &mut RuntimeScope,
939        cancellation: Option<&dyn CancellationSignal>,
940        retry_events: &mut Vec<WorkflowRetryEvent>,
941    ) -> Result<NodeExecution, WorkflowRuntimeError> {
942        let next_node = spec
943            .next
944            .clone()
945            .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
946                node_id: node.id.clone(),
947            })?;
948
949        let (output, llm_retries) = self
950            .execute_llm_with_policy(step, node, spec.model, spec.prompt, scope, cancellation)
951            .await?;
952        retry_events.extend(llm_retries);
953
954        scope
955            .record_llm_output(&node.id, output.content.clone(), ScopeCapability::LlmWrite)
956            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
957                node_id: node.id.clone(),
958                source,
959            })?;
960
961        Ok(NodeExecution {
962            step,
963            node_id: node.id.clone(),
964            data: NodeExecutionData::Llm {
965                model: spec.model.to_string(),
966                output: output.content,
967                next: next_node,
968            },
969        })
970    }
971
972    async fn execute_tool_node(
973        &self,
974        step: usize,
975        node: &Node,
976        spec: ToolNodeSpec<'_>,
977        scope: &mut RuntimeScope,
978        cancellation: Option<&dyn CancellationSignal>,
979        retry_events: &mut Vec<WorkflowRetryEvent>,
980    ) -> Result<NodeExecution, WorkflowRuntimeError> {
981        let next_node = spec
982            .next
983            .clone()
984            .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
985                node_id: node.id.clone(),
986            })?;
987
988        let executor =
989            self.tool_executor
990                .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
991                    node_id: node.id.clone(),
992                })?;
993
994        let scoped_input = scope
995            .scoped_input(ScopeCapability::ToolRead)
996            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
997                node_id: node.id.clone(),
998                source,
999            })?;
1000
1001        let (tool_output, tool_retries) = self
1002            .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1003                step,
1004                node,
1005                tool: spec.tool,
1006                input: spec.input,
1007                executor,
1008                scoped_input,
1009                cancellation,
1010            })
1011            .await?;
1012        retry_events.extend(tool_retries);
1013
1014        scope
1015            .record_tool_output(&node.id, tool_output.clone(), ScopeCapability::ToolWrite)
1016            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1017                node_id: node.id.clone(),
1018                source,
1019            })?;
1020
1021        Ok(NodeExecution {
1022            step,
1023            node_id: node.id.clone(),
1024            data: NodeExecutionData::Tool {
1025                tool: spec.tool.to_string(),
1026                output: tool_output,
1027                next: next_node,
1028            },
1029        })
1030    }
1031
1032    fn execute_condition_node(
1033        &self,
1034        step: usize,
1035        node: &Node,
1036        spec: ConditionNodeSpec<'_>,
1037        scope: &mut RuntimeScope,
1038        cancellation: Option<&dyn CancellationSignal>,
1039    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1040        check_cancelled(cancellation)?;
1041        let scoped_input =
1042            scope
1043                .scoped_input(ScopeCapability::ConditionRead)
1044                .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1045                    node_id: node.id.clone(),
1046                    source,
1047                })?;
1048        enforce_expression_scope_budget(
1049            &node.id,
1050            &scoped_input,
1051            self.options.security_limits.max_expression_scope_bytes,
1052        )?;
1053        let evaluated =
1054            expressions::evaluate_bool(spec.expression, &scoped_input).map_err(|reason| {
1055                WorkflowRuntimeError::InvalidCondition {
1056                    node_id: node.id.clone(),
1057                    expression: spec.expression.to_string(),
1058                    reason: reason.to_string(),
1059                }
1060            })?;
1061        let next = if evaluated {
1062            spec.on_true.to_string()
1063        } else {
1064            spec.on_false.to_string()
1065        };
1066
1067        scope
1068            .record_condition_output(&node.id, evaluated, ScopeCapability::ConditionWrite)
1069            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1070                node_id: node.id.clone(),
1071                source,
1072            })?;
1073
1074        Ok(NodeExecution {
1075            step,
1076            node_id: node.id.clone(),
1077            data: NodeExecutionData::Condition {
1078                expression: spec.expression.to_string(),
1079                evaluated,
1080                next,
1081            },
1082        })
1083    }
1084
1085    async fn execute_parallel_node(
1086        &self,
1087        step: usize,
1088        node: &Node,
1089        spec: ParallelNodeSpec<'_>,
1090        scope: &mut RuntimeScope,
1091        cancellation: Option<&dyn CancellationSignal>,
1092        retry_events: &mut Vec<WorkflowRetryEvent>,
1093    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1094        check_cancelled(cancellation)?;
1095        if spec.branches.len() > self.options.security_limits.max_parallel_branches {
1096            return Err(WorkflowRuntimeError::ParallelBranchLimitExceeded {
1097                node_id: node.id.clone(),
1098                actual_branches: spec.branches.len(),
1099                max_branches: self.options.security_limits.max_parallel_branches,
1100            });
1101        }
1102        let base_scope = scope
1103            .scoped_input(ScopeCapability::MapRead)
1104            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1105                node_id: node.id.clone(),
1106                source,
1107            })?;
1108        let scheduler = DagScheduler::new(
1109            spec.max_in_flight
1110                .unwrap_or(self.options.scheduler_max_in_flight),
1111        );
1112        let parallel_node_id = node.id.clone();
1113
1114        let branch_outputs: Vec<(String, Value, Vec<WorkflowRetryEvent>)> = scheduler
1115            .run_bounded(spec.branches.iter().cloned(), |branch_id| {
1116                let parallel_node_id = parallel_node_id.clone();
1117                let base_scope = base_scope.clone();
1118                async move {
1119                    let branch_node = spec.node_index.get(branch_id.as_str()).ok_or_else(|| {
1120                        WorkflowRuntimeError::NodeNotFound {
1121                            node_id: branch_id.clone(),
1122                        }
1123                    })?;
1124                    self.execute_parallel_branch(
1125                        step,
1126                        &parallel_node_id,
1127                        branch_node,
1128                        base_scope,
1129                        cancellation,
1130                    )
1131                    .await
1132                }
1133            })
1134            .await?;
1135
1136        let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
1137        for (branch_id, output, branch_retry_events) in branch_outputs {
1138            retry_events.extend(branch_retry_events);
1139            scope
1140                .record_node_output(&branch_id, output.clone(), ScopeCapability::MapWrite)
1141                .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1142                    node_id: node.id.clone(),
1143                    source,
1144                })?;
1145            outputs.insert(branch_id, output);
1146        }
1147
1148        scope
1149            .record_node_output(
1150                &node.id,
1151                Value::Object(
1152                    outputs
1153                        .iter()
1154                        .map(|(key, value)| (key.clone(), value.clone()))
1155                        .collect(),
1156                ),
1157                ScopeCapability::MapWrite,
1158            )
1159            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1160                node_id: node.id.clone(),
1161                source,
1162            })?;
1163
1164        Ok(NodeExecution {
1165            step,
1166            node_id: node.id.clone(),
1167            data: NodeExecutionData::Parallel {
1168                branches: spec.branches.to_vec(),
1169                outputs,
1170                next: spec.next.to_string(),
1171            },
1172        })
1173    }
1174
1175    fn execute_merge_node(
1176        &self,
1177        step: usize,
1178        node: &Node,
1179        spec: MergeNodeSpec<'_>,
1180        scope: &mut RuntimeScope,
1181    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1182        let mut resolved = Vec::with_capacity(spec.sources.len());
1183        for source in spec.sources {
1184            let Some(value) = scope.node_output(source).cloned() else {
1185                return Err(WorkflowRuntimeError::MissingMergeSource {
1186                    node_id: node.id.clone(),
1187                    source_id: source.clone(),
1188                });
1189            };
1190            resolved.push((source.clone(), value));
1191        }
1192
1193        let output = match spec.policy {
1194            MergePolicy::First => resolved
1195                .first()
1196                .map(|(_, value)| value.clone())
1197                .unwrap_or(Value::Null),
1198            MergePolicy::All => Value::Array(
1199                resolved
1200                    .iter()
1201                    .map(|(_, value)| value.clone())
1202                    .collect::<Vec<_>>(),
1203            ),
1204            MergePolicy::Quorum => {
1205                let required = spec.quorum.unwrap_or_default();
1206                let resolved_count = resolved.len();
1207                if resolved_count < required {
1208                    return Err(WorkflowRuntimeError::MergeQuorumNotMet {
1209                        node_id: node.id.clone(),
1210                        required,
1211                        resolved: resolved_count,
1212                    });
1213                }
1214                Value::Array(
1215                    resolved
1216                        .iter()
1217                        .take(required)
1218                        .map(|(_, value)| value.clone())
1219                        .collect::<Vec<_>>(),
1220                )
1221            }
1222        };
1223
1224        scope
1225            .record_node_output(&node.id, output.clone(), ScopeCapability::ReduceWrite)
1226            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1227                node_id: node.id.clone(),
1228                source,
1229            })?;
1230
1231        Ok(NodeExecution {
1232            step,
1233            node_id: node.id.clone(),
1234            data: NodeExecutionData::Merge {
1235                policy: spec.policy.clone(),
1236                sources: spec.sources.to_vec(),
1237                output,
1238                next: spec.next.to_string(),
1239            },
1240        })
1241    }
1242
1243    async fn execute_map_node(
1244        &self,
1245        step: usize,
1246        node: &Node,
1247        spec: MapNodeSpec<'_>,
1248        scope: &mut RuntimeScope,
1249        cancellation: Option<&dyn CancellationSignal>,
1250        retry_events: &mut Vec<WorkflowRetryEvent>,
1251    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1252        let executor =
1253            self.tool_executor
1254                .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
1255                    node_id: node.id.clone(),
1256                })?;
1257
1258        let scoped_input = scope
1259            .scoped_input(ScopeCapability::MapRead)
1260            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1261                node_id: node.id.clone(),
1262                source,
1263            })?;
1264        let items = resolve_path(&scoped_input, spec.items_path)
1265            .and_then(Value::as_array)
1266            .ok_or_else(|| WorkflowRuntimeError::MapItemsNotArray {
1267                node_id: node.id.clone(),
1268                items_path: spec.items_path.to_string(),
1269            })?
1270            .clone();
1271        if items.len() > self.options.security_limits.max_map_items {
1272            return Err(WorkflowRuntimeError::MapItemLimitExceeded {
1273                node_id: node.id.clone(),
1274                actual_items: items.len(),
1275                max_items: self.options.security_limits.max_map_items,
1276            });
1277        }
1278
1279        let scheduler = DagScheduler::new(
1280            spec.max_in_flight
1281                .unwrap_or(self.options.scheduler_max_in_flight),
1282        );
1283        let map_node = node.clone();
1284        let mapped: Vec<(Value, Vec<WorkflowRetryEvent>)> = scheduler
1285            .run_bounded(items.into_iter().enumerate(), |(index, item)| {
1286                let scoped_input = scoped_input.clone();
1287                let map_node = map_node.clone();
1288                async move {
1289                    let item_scope = map_item_scoped_input(&scoped_input, &item, index);
1290                    let (output, retries) = self
1291                        .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1292                            step,
1293                            node: &map_node,
1294                            tool: spec.tool,
1295                            input: &item,
1296                            executor,
1297                            scoped_input: item_scope,
1298                            cancellation,
1299                        })
1300                        .await?;
1301                    Ok::<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError>((output, retries))
1302                }
1303            })
1304            .await?;
1305
1306        let mut outputs = Vec::with_capacity(mapped.len());
1307        for (output, local_retries) in mapped {
1308            outputs.push(output);
1309            retry_events.extend(local_retries);
1310        }
1311
1312        let output = Value::Array(outputs);
1313        scope
1314            .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1315            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1316                node_id: node.id.clone(),
1317                source,
1318            })?;
1319
1320        Ok(NodeExecution {
1321            step,
1322            node_id: node.id.clone(),
1323            data: NodeExecutionData::Map {
1324                item_count: output.as_array().map_or(0, Vec::len),
1325                output,
1326                next: spec.next.to_string(),
1327            },
1328        })
1329    }
1330
1331    fn execute_reduce_node(
1332        &self,
1333        step: usize,
1334        node: &Node,
1335        source: &str,
1336        operation: &ReduceOperation,
1337        next: &str,
1338        scope: &mut RuntimeScope,
1339    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1340        let source_value = scope.node_output(source).cloned().ok_or_else(|| {
1341            WorkflowRuntimeError::MissingMergeSource {
1342                node_id: node.id.clone(),
1343                source_id: source.to_string(),
1344            }
1345        })?;
1346
1347        let reduced = reduce_value(&node.id, source, operation, source_value)?;
1348        scope
1349            .record_node_output(&node.id, reduced.clone(), ScopeCapability::ReduceWrite)
1350            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1351                node_id: node.id.clone(),
1352                source,
1353            })?;
1354
1355        Ok(NodeExecution {
1356            step,
1357            node_id: node.id.clone(),
1358            data: NodeExecutionData::Reduce {
1359                operation: operation.clone(),
1360                output: reduced,
1361                next: next.to_string(),
1362            },
1363        })
1364    }
1365
1366    fn execute_batch_node(
1367        &self,
1368        step: usize,
1369        node: &Node,
1370        items_path: &str,
1371        next: &str,
1372        scope: &mut RuntimeScope,
1373    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1374        let scoped = scope
1375            .scoped_input(ScopeCapability::MapRead)
1376            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1377                node_id: node.id.clone(),
1378                source,
1379            })?;
1380        let items = resolve_path(&scoped, items_path).ok_or_else(|| {
1381            WorkflowRuntimeError::BatchItemsNotArray {
1382                node_id: node.id.clone(),
1383                items_path: items_path.to_string(),
1384            }
1385        })?;
1386        let array = items
1387            .as_array()
1388            .ok_or_else(|| WorkflowRuntimeError::BatchItemsNotArray {
1389                node_id: node.id.clone(),
1390                items_path: items_path.to_string(),
1391            })?;
1392
1393        let output = Value::Array(array.clone());
1394        scope
1395            .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1396            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1397                node_id: node.id.clone(),
1398                source,
1399            })?;
1400
1401        Ok(NodeExecution {
1402            step,
1403            node_id: node.id.clone(),
1404            data: NodeExecutionData::Batch {
1405                items_path: items_path.to_string(),
1406                item_count: array.len(),
1407                next: next.to_string(),
1408            },
1409        })
1410    }
1411
1412    fn execute_filter_node(
1413        &self,
1414        step: usize,
1415        node: &Node,
1416        items_path: &str,
1417        expression: &str,
1418        next: &str,
1419        scope: &mut RuntimeScope,
1420    ) -> Result<NodeExecution, WorkflowRuntimeError> {
1421        let scoped = scope
1422            .scoped_input(ScopeCapability::MapRead)
1423            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1424                node_id: node.id.clone(),
1425                source,
1426            })?;
1427        let items_value = resolve_path(&scoped, items_path).ok_or_else(|| {
1428            WorkflowRuntimeError::FilterItemsNotArray {
1429                node_id: node.id.clone(),
1430                items_path: items_path.to_string(),
1431            }
1432        })?;
1433        let array =
1434            items_value
1435                .as_array()
1436                .ok_or_else(|| WorkflowRuntimeError::FilterItemsNotArray {
1437                    node_id: node.id.clone(),
1438                    items_path: items_path.to_string(),
1439                })?;
1440        if array.len() > self.options.security_limits.max_filter_items {
1441            return Err(WorkflowRuntimeError::FilterItemLimitExceeded {
1442                node_id: node.id.clone(),
1443                actual_items: array.len(),
1444                max_items: self.options.security_limits.max_filter_items,
1445            });
1446        }
1447
1448        let mut kept = Vec::new();
1449        for (index, item) in array.iter().enumerate() {
1450            let mut eval_scope = scoped.clone();
1451            if let Some(object) = eval_scope.as_object_mut() {
1452                object.insert("item".to_string(), item.clone());
1453                object.insert("item_index".to_string(), Value::from(index as u64));
1454            }
1455            enforce_expression_scope_budget(
1456                &node.id,
1457                &eval_scope,
1458                self.options.security_limits.max_expression_scope_bytes,
1459            )?;
1460            let include =
1461                expressions::evaluate_bool(expression, &eval_scope).map_err(|reason| {
1462                    WorkflowRuntimeError::InvalidFilterExpression {
1463                        node_id: node.id.clone(),
1464                        expression: expression.to_string(),
1465                        reason: reason.to_string(),
1466                    }
1467                })?;
1468            if include {
1469                kept.push(item.clone());
1470            }
1471        }
1472        let output = Value::Array(kept.clone());
1473        scope
1474            .record_node_output(&node.id, output, ScopeCapability::MapWrite)
1475            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1476                node_id: node.id.clone(),
1477                source,
1478            })?;
1479
1480        Ok(NodeExecution {
1481            step,
1482            node_id: node.id.clone(),
1483            data: NodeExecutionData::Filter {
1484                items_path: items_path.to_string(),
1485                expression: expression.to_string(),
1486                kept: kept.len(),
1487                next: next.to_string(),
1488            },
1489        })
1490    }
1491
1492    async fn execute_llm_with_policy(
1493        &self,
1494        step: usize,
1495        node: &Node,
1496        model: &str,
1497        prompt: &str,
1498        scope: &RuntimeScope,
1499        cancellation: Option<&dyn CancellationSignal>,
1500    ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1501        let scoped_input = scope
1502            .scoped_input(ScopeCapability::LlmRead)
1503            .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1504                node_id: node.id.clone(),
1505                source,
1506            })?;
1507
1508        self.execute_llm_with_policy_for_scope(
1509            step,
1510            node,
1511            model,
1512            prompt,
1513            scoped_input,
1514            cancellation,
1515        )
1516        .await
1517    }
1518
1519    async fn execute_parallel_branch(
1520        &self,
1521        step: usize,
1522        parallel_node_id: &str,
1523        branch_node: &Node,
1524        scoped_input: Value,
1525        cancellation: Option<&dyn CancellationSignal>,
1526    ) -> Result<(String, Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1527        match &branch_node.kind {
1528            NodeKind::Llm {
1529                model,
1530                prompt,
1531                next: _,
1532            } => {
1533                let (output, retries) = self
1534                    .execute_llm_with_policy_for_scope(
1535                        step,
1536                        branch_node,
1537                        model,
1538                        prompt,
1539                        scoped_input,
1540                        cancellation,
1541                    )
1542                    .await?;
1543                Ok((
1544                    branch_node.id.clone(),
1545                    Value::String(output.content),
1546                    retries,
1547                ))
1548            }
1549            NodeKind::Tool { tool, input, .. } => {
1550                let executor = self.tool_executor.ok_or_else(|| {
1551                    WorkflowRuntimeError::MissingToolExecutor {
1552                        node_id: branch_node.id.clone(),
1553                    }
1554                })?;
1555                let (output, retries) = self
1556                    .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1557                        step,
1558                        node: branch_node,
1559                        tool,
1560                        input,
1561                        executor,
1562                        scoped_input,
1563                        cancellation,
1564                    })
1565                    .await?;
1566                Ok((branch_node.id.clone(), output, retries))
1567            }
1568            _ => Err(WorkflowRuntimeError::ParallelBranchUnsupported {
1569                node_id: parallel_node_id.to_string(),
1570                branch_id: branch_node.id.clone(),
1571                reason: "only llm/tool branches are supported".to_string(),
1572            }),
1573        }
1574    }
1575
1576    async fn execute_llm_with_policy_for_scope(
1577        &self,
1578        step: usize,
1579        node: &Node,
1580        model: &str,
1581        prompt: &str,
1582        scoped_input: Value,
1583        cancellation: Option<&dyn CancellationSignal>,
1584    ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1585        let max_attempts = self.options.llm_node_policy.max_retries.saturating_add(1);
1586        let mut retry_events = Vec::new();
1587
1588        for attempt in 1..=max_attempts {
1589            check_cancelled(cancellation)?;
1590
1591            let execution = self.llm_executor.execute(LlmExecutionInput {
1592                node_id: node.id.clone(),
1593                model: model.to_string(),
1594                prompt: prompt.to_string(),
1595                scoped_input: scoped_input.clone(),
1596            });
1597
1598            let outcome = if let Some(timeout_duration) = self.options.llm_node_policy.timeout {
1599                match timeout(timeout_duration, execution).await {
1600                    Ok(result) => result,
1601                    Err(_) => {
1602                        if attempt == max_attempts {
1603                            return Err(WorkflowRuntimeError::LlmTimeout {
1604                                node_id: node.id.clone(),
1605                                timeout_ms: timeout_duration.as_millis(),
1606                                attempts: attempt,
1607                            });
1608                        }
1609                        retry_events.push(WorkflowRetryEvent {
1610                            step,
1611                            node_id: node.id.clone(),
1612                            operation: "llm".to_string(),
1613                            failed_attempt: attempt,
1614                            reason: format!(
1615                                "attempt {} timed out after {} ms",
1616                                attempt,
1617                                timeout_duration.as_millis()
1618                            ),
1619                        });
1620                        check_cancelled(cancellation)?;
1621                        continue;
1622                    }
1623                }
1624            } else {
1625                execution.await
1626            };
1627
1628            match outcome {
1629                Ok(output) => return Ok((output, retry_events)),
1630                Err(last_error) => {
1631                    if attempt == max_attempts {
1632                        return Err(WorkflowRuntimeError::LlmRetryExhausted {
1633                            node_id: node.id.clone(),
1634                            attempts: attempt,
1635                            last_error,
1636                        });
1637                    }
1638                    retry_events.push(WorkflowRetryEvent {
1639                        step,
1640                        node_id: node.id.clone(),
1641                        operation: "llm".to_string(),
1642                        failed_attempt: attempt,
1643                        reason: last_error.to_string(),
1644                    });
1645                    check_cancelled(cancellation)?;
1646                }
1647            }
1648        }
1649
1650        unreachable!("llm attempts loop always returns")
1651    }
1652
1653    async fn execute_tool_with_policy_for_scope(
1654        &self,
1655        request: ToolPolicyRequest<'_>,
1656    ) -> Result<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1657        let ToolPolicyRequest {
1658            step,
1659            node,
1660            tool,
1661            input,
1662            executor,
1663            scoped_input,
1664            cancellation,
1665        } = request;
1666        let max_attempts = self.options.tool_node_policy.max_retries.saturating_add(1);
1667        let mut retry_events = Vec::new();
1668
1669        for attempt in 1..=max_attempts {
1670            check_cancelled(cancellation)?;
1671
1672            let execution = executor.execute_tool(ToolExecutionInput {
1673                node_id: node.id.clone(),
1674                tool: tool.to_string(),
1675                input: input.clone(),
1676                scoped_input: scoped_input.clone(),
1677            });
1678
1679            let outcome = if let Some(timeout_duration) = self.options.tool_node_policy.timeout {
1680                match timeout(timeout_duration, execution).await {
1681                    Ok(result) => result,
1682                    Err(_) => {
1683                        if attempt == max_attempts {
1684                            return Err(WorkflowRuntimeError::ToolTimeout {
1685                                node_id: node.id.clone(),
1686                                timeout_ms: timeout_duration.as_millis(),
1687                                attempts: attempt,
1688                            });
1689                        }
1690                        retry_events.push(WorkflowRetryEvent {
1691                            step,
1692                            node_id: node.id.clone(),
1693                            operation: "tool".to_string(),
1694                            failed_attempt: attempt,
1695                            reason: format!(
1696                                "attempt {} timed out after {} ms",
1697                                attempt,
1698                                timeout_duration.as_millis()
1699                            ),
1700                        });
1701                        check_cancelled(cancellation)?;
1702                        continue;
1703                    }
1704                }
1705            } else {
1706                execution.await
1707            };
1708
1709            match outcome {
1710                Ok(output) => return Ok((output, retry_events)),
1711                Err(last_error) => {
1712                    if attempt == max_attempts {
1713                        return Err(WorkflowRuntimeError::ToolRetryExhausted {
1714                            node_id: node.id.clone(),
1715                            attempts: attempt,
1716                            last_error,
1717                        });
1718                    }
1719                    retry_events.push(WorkflowRetryEvent {
1720                        step,
1721                        node_id: node.id.clone(),
1722                        operation: "tool".to_string(),
1723                        failed_attempt: attempt,
1724                        reason: last_error.to_string(),
1725                    });
1726                    check_cancelled(cancellation)?;
1727                }
1728            }
1729        }
1730
1731        unreachable!("tool attempts loop always returns")
1732    }
1733}
1734
1735struct ToolPolicyRequest<'a> {
1736    step: usize,
1737    node: &'a Node,
1738    tool: &'a str,
1739    input: &'a Value,
1740    executor: &'a dyn ToolExecutor,
1741    scoped_input: Value,
1742    cancellation: Option<&'a dyn CancellationSignal>,
1743}
1744
1745struct LlmNodeSpec<'a> {
1746    model: &'a str,
1747    prompt: &'a str,
1748    next: &'a Option<String>,
1749}
1750
1751struct ToolNodeSpec<'a> {
1752    tool: &'a str,
1753    input: &'a Value,
1754    next: &'a Option<String>,
1755}
1756
1757struct ConditionNodeSpec<'a> {
1758    expression: &'a str,
1759    on_true: &'a str,
1760    on_false: &'a str,
1761}
1762
1763struct ParallelNodeSpec<'a> {
1764    node_index: &'a HashMap<&'a str, &'a Node>,
1765    branches: &'a [String],
1766    next: &'a str,
1767    max_in_flight: Option<usize>,
1768}
1769
1770struct MergeNodeSpec<'a> {
1771    sources: &'a [String],
1772    policy: &'a MergePolicy,
1773    quorum: Option<usize>,
1774    next: &'a str,
1775}
1776
1777struct MapNodeSpec<'a> {
1778    tool: &'a str,
1779    items_path: &'a str,
1780    next: &'a str,
1781    max_in_flight: Option<usize>,
1782}
1783
1784fn check_cancelled(
1785    cancellation: Option<&dyn CancellationSignal>,
1786) -> Result<(), WorkflowRuntimeError> {
1787    if cancellation.is_some_and(CancellationSignal::is_cancelled) {
1788        Err(WorkflowRuntimeError::Cancelled)
1789    } else {
1790        Ok(())
1791    }
1792}
1793
1794fn next_trace_timestamp(clock: &mut u64) -> u64 {
1795    let timestamp = *clock;
1796    *clock = clock.saturating_add(1);
1797    timestamp
1798}
1799
1800fn build_prompt_with_scope(prompt: &str, scoped_input: &Value) -> String {
1801    format!("{}\n\nScoped context:\n{}", prompt, scoped_input)
1802}
1803
1804fn build_node_index(workflow: &WorkflowDefinition) -> HashMap<&str, &Node> {
1805    let mut index = HashMap::with_capacity(workflow.nodes.len());
1806    for node in &workflow.nodes {
1807        index.insert(node.id.as_str(), node);
1808    }
1809    index
1810}
1811
1812fn find_start_node_id(workflow: &WorkflowDefinition) -> Result<String, WorkflowRuntimeError> {
1813    workflow
1814        .nodes
1815        .iter()
1816        .find_map(|node| match node.kind {
1817            NodeKind::Start { .. } => Some(node.id.clone()),
1818            _ => None,
1819        })
1820        .ok_or(WorkflowRuntimeError::MissingStartNode)
1821}
1822
1823fn next_node_id(data: &NodeExecutionData) -> Option<String> {
1824    match data {
1825        NodeExecutionData::Start { next }
1826        | NodeExecutionData::Llm { next, .. }
1827        | NodeExecutionData::Tool { next, .. }
1828        | NodeExecutionData::Condition { next, .. }
1829        | NodeExecutionData::Debounce { next, .. }
1830        | NodeExecutionData::Throttle { next, .. }
1831        | NodeExecutionData::RetryCompensate { next, .. }
1832        | NodeExecutionData::HumanInTheLoop { next, .. }
1833        | NodeExecutionData::CacheRead { next, .. }
1834        | NodeExecutionData::CacheWrite { next, .. }
1835        | NodeExecutionData::EventTrigger { next, .. }
1836        | NodeExecutionData::Router { next, .. }
1837        | NodeExecutionData::Transform { next, .. }
1838        | NodeExecutionData::Loop { next, .. }
1839        | NodeExecutionData::Subgraph { next, .. }
1840        | NodeExecutionData::Batch { next, .. }
1841        | NodeExecutionData::Filter { next, .. }
1842        | NodeExecutionData::Parallel { next, .. }
1843        | NodeExecutionData::Merge { next, .. }
1844        | NodeExecutionData::Map { next, .. }
1845        | NodeExecutionData::Reduce { next, .. } => Some(next.clone()),
1846        NodeExecutionData::End => None,
1847    }
1848}
1849
1850fn resolve_path<'a>(scope: &'a Value, path: &str) -> Option<&'a Value> {
1851    let mut current = scope;
1852    for segment in path.split('.') {
1853        if segment.is_empty() {
1854            continue;
1855        }
1856        current = current.get(segment)?;
1857    }
1858    Some(current)
1859}
1860
1861fn resolve_string_path(scope: &Value, path: &str) -> Option<String> {
1862    resolve_path(scope, path)
1863        .and_then(Value::as_str)
1864        .map(str::to_string)
1865}
1866
1867fn evaluate_human_decision(value: &Value) -> Option<bool> {
1868    match value {
1869        Value::Bool(flag) => Some(*flag),
1870        Value::String(text) => {
1871            let normalized = text.trim().to_ascii_lowercase();
1872            match normalized.as_str() {
1873                "approve" | "approved" | "yes" | "true" => Some(true),
1874                "reject" | "rejected" | "no" | "false" => Some(false),
1875                _ => None,
1876            }
1877        }
1878        _ => None,
1879    }
1880}
1881
1882fn evaluate_transform_expression(expression: &str, scope: &Value) -> Result<Value, String> {
1883    let trimmed = expression.trim();
1884    if trimmed.is_empty() {
1885        return Err("expression is empty".to_string());
1886    }
1887
1888    if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
1889        return Ok(value);
1890    }
1891
1892    let path = trimmed.strip_prefix("$.").unwrap_or(trimmed);
1893    resolve_path(scope, path)
1894        .cloned()
1895        .ok_or_else(|| format!("path '{path}' not found in scoped input"))
1896}
1897
1898fn map_item_scoped_input(base_scope: &Value, item: &Value, index: usize) -> Value {
1899    let mut object = match base_scope {
1900        Value::Object(map) => map.clone(),
1901        _ => Map::new(),
1902    };
1903    object.insert("map_item".to_string(), item.clone());
1904    object.insert("map_index".to_string(), Value::from(index as u64));
1905    Value::Object(object)
1906}
1907
1908fn enforce_expression_scope_budget(
1909    node_id: &str,
1910    scoped_input: &Value,
1911    max_expression_scope_bytes: usize,
1912) -> Result<(), WorkflowRuntimeError> {
1913    let size = serde_json::to_vec(scoped_input)
1914        .map(|bytes| bytes.len())
1915        .unwrap_or(max_expression_scope_bytes.saturating_add(1));
1916    if size > max_expression_scope_bytes {
1917        return Err(WorkflowRuntimeError::ExpressionScopeLimitExceeded {
1918            node_id: node_id.to_string(),
1919            actual_bytes: size,
1920            limit_bytes: max_expression_scope_bytes,
1921        });
1922    }
1923    Ok(())
1924}
1925
1926fn reduce_value(
1927    node_id: &str,
1928    source: &str,
1929    operation: &ReduceOperation,
1930    source_value: Value,
1931) -> Result<Value, WorkflowRuntimeError> {
1932    let items =
1933        source_value
1934            .as_array()
1935            .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
1936                node_id: node_id.to_string(),
1937                source_node: source.to_string(),
1938                reason: "expected source output to be an array".to_string(),
1939            })?;
1940
1941    match operation {
1942        ReduceOperation::Count => Ok(Value::from(items.len() as u64)),
1943        ReduceOperation::Sum => {
1944            let mut sum = 0.0f64;
1945            for value in items {
1946                let number =
1947                    value
1948                        .as_f64()
1949                        .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
1950                            node_id: node_id.to_string(),
1951                            source_node: source.to_string(),
1952                            reason: "sum operation requires numeric array values".to_string(),
1953                        })?;
1954                sum += number;
1955            }
1956            let number = serde_json::Number::from_f64(sum).ok_or_else(|| {
1957                WorkflowRuntimeError::InvalidReduceInput {
1958                    node_id: node_id.to_string(),
1959                    source_node: source.to_string(),
1960                    reason: "sum produced non-finite value".to_string(),
1961                }
1962            })?;
1963            Ok(Value::Number(number))
1964        }
1965    }
1966}
1967
1968#[cfg(test)]
1969mod tests {
1970    use std::sync::atomic::{AtomicUsize, Ordering};
1971    use std::sync::{Arc, Mutex};
1972    use std::time::Duration;
1973
1974    use async_trait::async_trait;
1975    use serde_json::json;
1976    use tokio::time::sleep;
1977
1978    use super::*;
1979    use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
1980
1981    struct MockLlmExecutor {
1982        output: String,
1983    }
1984
1985    #[async_trait]
1986    impl LlmExecutor for MockLlmExecutor {
1987        async fn execute(
1988            &self,
1989            _input: LlmExecutionInput,
1990        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
1991            Ok(LlmExecutionOutput {
1992                content: self.output.clone(),
1993            })
1994        }
1995    }
1996
1997    struct MockToolExecutor {
1998        output: Value,
1999        fail: bool,
2000    }
2001
2002    #[async_trait]
2003    impl ToolExecutor for MockToolExecutor {
2004        async fn execute_tool(
2005            &self,
2006            input: ToolExecutionInput,
2007        ) -> Result<Value, ToolExecutionError> {
2008            if self.fail {
2009                return Err(ToolExecutionError::Failed(format!(
2010                    "tool '{}' failed intentionally",
2011                    input.tool
2012                )));
2013            }
2014            Ok(self.output.clone())
2015        }
2016    }
2017
2018    struct SequencedLlmExecutor {
2019        responses: Mutex<Vec<Result<LlmExecutionOutput, LlmExecutionError>>>,
2020        calls: AtomicUsize,
2021    }
2022
2023    #[async_trait]
2024    impl LlmExecutor for SequencedLlmExecutor {
2025        async fn execute(
2026            &self,
2027            _input: LlmExecutionInput,
2028        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2029            self.calls.fetch_add(1, Ordering::Relaxed);
2030            self.responses
2031                .lock()
2032                .expect("sequenced llm lock poisoned")
2033                .remove(0)
2034        }
2035    }
2036
2037    struct SlowToolExecutor {
2038        delay: Duration,
2039    }
2040
2041    #[async_trait]
2042    impl ToolExecutor for SlowToolExecutor {
2043        async fn execute_tool(
2044            &self,
2045            _input: ToolExecutionInput,
2046        ) -> Result<Value, ToolExecutionError> {
2047            sleep(self.delay).await;
2048            Ok(json!({"status": "slow-ok"}))
2049        }
2050    }
2051
2052    struct IncrementingToolExecutor {
2053        value: AtomicUsize,
2054    }
2055
2056    #[async_trait]
2057    impl ToolExecutor for IncrementingToolExecutor {
2058        async fn execute_tool(
2059            &self,
2060            _input: ToolExecutionInput,
2061        ) -> Result<Value, ToolExecutionError> {
2062            let next = self.value.fetch_add(1, Ordering::Relaxed) + 1;
2063            Ok(json!(next))
2064        }
2065    }
2066
2067    struct EchoInputToolExecutor;
2068
2069    #[async_trait]
2070    impl ToolExecutor for EchoInputToolExecutor {
2071        async fn execute_tool(
2072            &self,
2073            input: ToolExecutionInput,
2074        ) -> Result<Value, ToolExecutionError> {
2075            Ok(input.input)
2076        }
2077    }
2078
2079    struct RetryCompensateToolExecutor {
2080        attempts: AtomicUsize,
2081    }
2082
2083    #[async_trait]
2084    impl ToolExecutor for RetryCompensateToolExecutor {
2085        async fn execute_tool(
2086            &self,
2087            input: ToolExecutionInput,
2088        ) -> Result<Value, ToolExecutionError> {
2089            match input.tool.as_str() {
2090                "unstable_primary" => {
2091                    let current = self.attempts.fetch_add(1, Ordering::Relaxed) + 1;
2092                    if current <= 1 {
2093                        Err(ToolExecutionError::Failed("primary failed".to_string()))
2094                    } else {
2095                        Ok(json!({"primary_attempt": current}))
2096                    }
2097                }
2098                "always_fail" => Err(ToolExecutionError::Failed("always fail".to_string())),
2099                "compensate" => Ok(json!({"compensated": true})),
2100                _ => Err(ToolExecutionError::NotFound {
2101                    tool: input.tool.clone(),
2102                }),
2103            }
2104        }
2105    }
2106
2107    struct CancellingLlmExecutor {
2108        cancel_flag: Arc<AtomicBool>,
2109        calls: AtomicUsize,
2110    }
2111
2112    #[async_trait]
2113    impl LlmExecutor for CancellingLlmExecutor {
2114        async fn execute(
2115            &self,
2116            _input: LlmExecutionInput,
2117        ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2118            self.calls.fetch_add(1, Ordering::Relaxed);
2119            self.cancel_flag.store(true, Ordering::Relaxed);
2120            Err(LlmExecutionError::Client("transient failure".to_string()))
2121        }
2122    }
2123
2124    fn linear_workflow() -> WorkflowDefinition {
2125        WorkflowDefinition {
2126            version: "v0".to_string(),
2127            name: "linear".to_string(),
2128            nodes: vec![
2129                Node {
2130                    id: "start".to_string(),
2131                    kind: NodeKind::Start {
2132                        next: "llm".to_string(),
2133                    },
2134                },
2135                Node {
2136                    id: "llm".to_string(),
2137                    kind: NodeKind::Llm {
2138                        model: "gpt-4".to_string(),
2139                        prompt: "Summarize".to_string(),
2140                        next: Some("tool".to_string()),
2141                    },
2142                },
2143                Node {
2144                    id: "tool".to_string(),
2145                    kind: NodeKind::Tool {
2146                        tool: "extract".to_string(),
2147                        input: json!({"k": "v"}),
2148                        next: Some("end".to_string()),
2149                    },
2150                },
2151                Node {
2152                    id: "end".to_string(),
2153                    kind: NodeKind::End,
2154                },
2155            ],
2156        }
2157    }
2158
2159    fn llm_only_workflow() -> WorkflowDefinition {
2160        WorkflowDefinition {
2161            version: "v0".to_string(),
2162            name: "llm-only".to_string(),
2163            nodes: vec![
2164                Node {
2165                    id: "start".to_string(),
2166                    kind: NodeKind::Start {
2167                        next: "llm".to_string(),
2168                    },
2169                },
2170                Node {
2171                    id: "llm".to_string(),
2172                    kind: NodeKind::Llm {
2173                        model: "gpt-4".to_string(),
2174                        prompt: "Summarize".to_string(),
2175                        next: Some("end".to_string()),
2176                    },
2177                },
2178                Node {
2179                    id: "end".to_string(),
2180                    kind: NodeKind::End,
2181                },
2182            ],
2183        }
2184    }
2185
2186    fn loop_workflow(max_iterations: Option<u32>) -> WorkflowDefinition {
2187        WorkflowDefinition {
2188            version: "v0".to_string(),
2189            name: "loop-workflow".to_string(),
2190            nodes: vec![
2191                Node {
2192                    id: "start".to_string(),
2193                    kind: NodeKind::Start {
2194                        next: "loop".to_string(),
2195                    },
2196                },
2197                Node {
2198                    id: "loop".to_string(),
2199                    kind: NodeKind::Loop {
2200                        condition: "last_tool_output != 3".to_string(),
2201                        body: "counter".to_string(),
2202                        next: "end".to_string(),
2203                        max_iterations,
2204                    },
2205                },
2206                Node {
2207                    id: "counter".to_string(),
2208                    kind: NodeKind::Tool {
2209                        tool: "counter".to_string(),
2210                        input: json!({}),
2211                        next: Some("loop".to_string()),
2212                    },
2213                },
2214                Node {
2215                    id: "end".to_string(),
2216                    kind: NodeKind::End,
2217                },
2218            ],
2219        }
2220    }
2221
2222    fn parallel_merge_workflow(policy: MergePolicy, quorum: Option<usize>) -> WorkflowDefinition {
2223        WorkflowDefinition {
2224            version: "v0".to_string(),
2225            name: "parallel-merge".to_string(),
2226            nodes: vec![
2227                Node {
2228                    id: "start".to_string(),
2229                    kind: NodeKind::Start {
2230                        next: "parallel".to_string(),
2231                    },
2232                },
2233                Node {
2234                    id: "parallel".to_string(),
2235                    kind: NodeKind::Parallel {
2236                        branches: vec!["tool_a".to_string(), "tool_b".to_string()],
2237                        next: "merge".to_string(),
2238                        max_in_flight: Some(2),
2239                    },
2240                },
2241                Node {
2242                    id: "tool_a".to_string(),
2243                    kind: NodeKind::Tool {
2244                        tool: "extract".to_string(),
2245                        input: json!({"value": 1}),
2246                        next: Some("end".to_string()),
2247                    },
2248                },
2249                Node {
2250                    id: "tool_b".to_string(),
2251                    kind: NodeKind::Tool {
2252                        tool: "extract".to_string(),
2253                        input: json!({"value": 2}),
2254                        next: Some("end".to_string()),
2255                    },
2256                },
2257                Node {
2258                    id: "merge".to_string(),
2259                    kind: NodeKind::Merge {
2260                        sources: vec!["tool_a".to_string(), "tool_b".to_string()],
2261                        policy,
2262                        quorum,
2263                        next: "end".to_string(),
2264                    },
2265                },
2266                Node {
2267                    id: "end".to_string(),
2268                    kind: NodeKind::End,
2269                },
2270            ],
2271        }
2272    }
2273
2274    fn map_reduce_workflow(operation: ReduceOperation) -> WorkflowDefinition {
2275        WorkflowDefinition {
2276            version: "v0".to_string(),
2277            name: "map-reduce".to_string(),
2278            nodes: vec![
2279                Node {
2280                    id: "start".to_string(),
2281                    kind: NodeKind::Start {
2282                        next: "map".to_string(),
2283                    },
2284                },
2285                Node {
2286                    id: "map".to_string(),
2287                    kind: NodeKind::Map {
2288                        tool: "counter".to_string(),
2289                        items_path: "input.values".to_string(),
2290                        next: "reduce".to_string(),
2291                        max_in_flight: Some(3),
2292                    },
2293                },
2294                Node {
2295                    id: "reduce".to_string(),
2296                    kind: NodeKind::Reduce {
2297                        source: "map".to_string(),
2298                        operation,
2299                        next: "end".to_string(),
2300                    },
2301                },
2302                Node {
2303                    id: "end".to_string(),
2304                    kind: NodeKind::End,
2305                },
2306            ],
2307        }
2308    }
2309
2310    fn debounce_and_throttle_workflow() -> WorkflowDefinition {
2311        WorkflowDefinition {
2312            version: "v0".to_string(),
2313            name: "debounce-throttle".to_string(),
2314            nodes: vec![
2315                Node {
2316                    id: "start".to_string(),
2317                    kind: NodeKind::Start {
2318                        next: "debounce_a".to_string(),
2319                    },
2320                },
2321                Node {
2322                    id: "debounce_a".to_string(),
2323                    kind: NodeKind::Debounce {
2324                        key_path: "input.key".to_string(),
2325                        window_steps: 3,
2326                        next: "debounce_a".to_string(),
2327                        on_suppressed: Some("throttle_a".to_string()),
2328                    },
2329                },
2330                Node {
2331                    id: "throttle_a".to_string(),
2332                    kind: NodeKind::Throttle {
2333                        key_path: "input.key".to_string(),
2334                        window_steps: 3,
2335                        next: "throttle_a".to_string(),
2336                        on_throttled: Some("end_throttled".to_string()),
2337                    },
2338                },
2339                Node {
2340                    id: "end_throttled".to_string(),
2341                    kind: NodeKind::End,
2342                },
2343            ],
2344        }
2345    }
2346
2347    fn extended_nodes_workflow() -> WorkflowDefinition {
2348        WorkflowDefinition {
2349            version: "v0".to_string(),
2350            name: "extended-nodes".to_string(),
2351            nodes: vec![
2352                Node {
2353                    id: "start".to_string(),
2354                    kind: NodeKind::Start {
2355                        next: "event".to_string(),
2356                    },
2357                },
2358                Node {
2359                    id: "event".to_string(),
2360                    kind: NodeKind::EventTrigger {
2361                        event: "webhook".to_string(),
2362                        event_path: "input.event_type".to_string(),
2363                        next: "cache_write".to_string(),
2364                        on_mismatch: Some("end_mismatch".to_string()),
2365                    },
2366                },
2367                Node {
2368                    id: "cache_write".to_string(),
2369                    kind: NodeKind::CacheWrite {
2370                        key_path: "input.cache_key".to_string(),
2371                        value_path: "input.payload".to_string(),
2372                        next: "cache_read".to_string(),
2373                    },
2374                },
2375                Node {
2376                    id: "cache_read".to_string(),
2377                    kind: NodeKind::CacheRead {
2378                        key_path: "input.cache_key".to_string(),
2379                        next: "router".to_string(),
2380                        on_miss: Some("end_miss".to_string()),
2381                    },
2382                },
2383                Node {
2384                    id: "router".to_string(),
2385                    kind: NodeKind::Router {
2386                        routes: vec![crate::ir::RouterRoute {
2387                            when: "input.mode == 'manual'".to_string(),
2388                            next: "human".to_string(),
2389                        }],
2390                        default: "transform".to_string(),
2391                    },
2392                },
2393                Node {
2394                    id: "human".to_string(),
2395                    kind: NodeKind::HumanInTheLoop {
2396                        decision_path: "input.approval".to_string(),
2397                        response_path: Some("input.review_notes".to_string()),
2398                        on_approve: "transform".to_string(),
2399                        on_reject: "end_rejected".to_string(),
2400                    },
2401                },
2402                Node {
2403                    id: "transform".to_string(),
2404                    kind: NodeKind::Transform {
2405                        expression: "node_outputs.cache_read.value".to_string(),
2406                        next: "end".to_string(),
2407                    },
2408                },
2409                Node {
2410                    id: "end".to_string(),
2411                    kind: NodeKind::End,
2412                },
2413                Node {
2414                    id: "end_mismatch".to_string(),
2415                    kind: NodeKind::End,
2416                },
2417                Node {
2418                    id: "end_miss".to_string(),
2419                    kind: NodeKind::End,
2420                },
2421                Node {
2422                    id: "end_rejected".to_string(),
2423                    kind: NodeKind::End,
2424                },
2425            ],
2426        }
2427    }
2428
2429    fn retry_compensate_workflow(primary_tool: &str) -> WorkflowDefinition {
2430        WorkflowDefinition {
2431            version: "v0".to_string(),
2432            name: "retry-compensate".to_string(),
2433            nodes: vec![
2434                Node {
2435                    id: "start".to_string(),
2436                    kind: NodeKind::Start {
2437                        next: "retry_comp".to_string(),
2438                    },
2439                },
2440                Node {
2441                    id: "retry_comp".to_string(),
2442                    kind: NodeKind::RetryCompensate {
2443                        tool: primary_tool.to_string(),
2444                        input: json!({"job": "run"}),
2445                        max_retries: 1,
2446                        compensate_tool: "compensate".to_string(),
2447                        compensate_input: json!({"job": "rollback"}),
2448                        next: "end".to_string(),
2449                        on_compensated: Some("end_compensated".to_string()),
2450                    },
2451                },
2452                Node {
2453                    id: "end".to_string(),
2454                    kind: NodeKind::End,
2455                },
2456                Node {
2457                    id: "end_compensated".to_string(),
2458                    kind: NodeKind::End,
2459                },
2460            ],
2461        }
2462    }
2463
2464    #[tokio::test]
2465    async fn executes_happy_path_linear_flow() {
2466        let llm = MockLlmExecutor {
2467            output: "ok".to_string(),
2468        };
2469        let tools = MockToolExecutor {
2470            output: json!({"status": "done"}),
2471            fail: false,
2472        };
2473        let runtime = WorkflowRuntime::new(
2474            linear_workflow(),
2475            &llm,
2476            Some(&tools),
2477            WorkflowRuntimeOptions::default(),
2478        );
2479
2480        let result = runtime
2481            .execute(json!({"request_id": "r1"}), None)
2482            .await
2483            .expect("linear workflow should succeed");
2484
2485        assert_eq!(result.workflow_name, "linear");
2486        assert_eq!(result.terminal_node_id, "end");
2487        assert_eq!(result.node_executions.len(), 4);
2488        assert_eq!(
2489            result.node_outputs.get("llm"),
2490            Some(&Value::String("ok".to_string()))
2491        );
2492        assert_eq!(
2493            result.node_outputs.get("tool"),
2494            Some(&json!({"status": "done"}))
2495        );
2496        assert_eq!(result.events.len(), 8);
2497        assert!(result.retry_events.is_empty());
2498        assert!(result.trace.is_some());
2499        assert_eq!(result.replay_report, None);
2500    }
2501
2502    #[tokio::test]
2503    async fn linear_flow_records_expected_node_execution_payloads() {
2504        let llm = MockLlmExecutor {
2505            output: "ok".to_string(),
2506        };
2507        let tools = MockToolExecutor {
2508            output: json!({"status": "done"}),
2509            fail: false,
2510        };
2511        let runtime = WorkflowRuntime::new(
2512            linear_workflow(),
2513            &llm,
2514            Some(&tools),
2515            WorkflowRuntimeOptions::default(),
2516        );
2517
2518        let result = runtime
2519            .execute(json!({"request_id": "r1"}), None)
2520            .await
2521            .expect("linear workflow should succeed");
2522
2523        assert!(matches!(
2524            result.node_executions[0].data,
2525            NodeExecutionData::Start { ref next } if next == "llm"
2526        ));
2527        assert!(matches!(
2528            result.node_executions[1].data,
2529            NodeExecutionData::Llm {
2530                ref model,
2531                ref output,
2532                ref next
2533            } if model == "gpt-4" && output == "ok" && next == "tool"
2534        ));
2535        assert!(matches!(
2536            result.node_executions[2].data,
2537            NodeExecutionData::Tool {
2538                ref tool,
2539                ref next,
2540                ..
2541            } if tool == "extract" && next == "end"
2542        ));
2543    }
2544
2545    #[tokio::test]
2546    async fn executes_conditional_branching() {
2547        let workflow = WorkflowDefinition {
2548            version: "v0".to_string(),
2549            name: "conditional".to_string(),
2550            nodes: vec![
2551                Node {
2552                    id: "start".to_string(),
2553                    kind: NodeKind::Start {
2554                        next: "condition".to_string(),
2555                    },
2556                },
2557                Node {
2558                    id: "condition".to_string(),
2559                    kind: NodeKind::Condition {
2560                        expression: "input.approved".to_string(),
2561                        on_true: "end_true".to_string(),
2562                        on_false: "end_false".to_string(),
2563                    },
2564                },
2565                Node {
2566                    id: "end_true".to_string(),
2567                    kind: NodeKind::End,
2568                },
2569                Node {
2570                    id: "end_false".to_string(),
2571                    kind: NodeKind::End,
2572                },
2573            ],
2574        };
2575
2576        let llm = MockLlmExecutor {
2577            output: "unused".to_string(),
2578        };
2579        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
2580
2581        let result = runtime
2582            .execute(json!({"approved": true}), None)
2583            .await
2584            .expect("conditional workflow should succeed");
2585
2586        assert_eq!(result.terminal_node_id, "end_true");
2587    }
2588
2589    #[tokio::test]
2590    async fn executes_conditional_false_branch() {
2591        let workflow = WorkflowDefinition {
2592            version: "v0".to_string(),
2593            name: "conditional-false".to_string(),
2594            nodes: vec![
2595                Node {
2596                    id: "start".to_string(),
2597                    kind: NodeKind::Start {
2598                        next: "condition".to_string(),
2599                    },
2600                },
2601                Node {
2602                    id: "condition".to_string(),
2603                    kind: NodeKind::Condition {
2604                        expression: "input.approved".to_string(),
2605                        on_true: "end_true".to_string(),
2606                        on_false: "end_false".to_string(),
2607                    },
2608                },
2609                Node {
2610                    id: "end_true".to_string(),
2611                    kind: NodeKind::End,
2612                },
2613                Node {
2614                    id: "end_false".to_string(),
2615                    kind: NodeKind::End,
2616                },
2617            ],
2618        };
2619        let llm = MockLlmExecutor {
2620            output: "unused".to_string(),
2621        };
2622        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
2623
2624        let result = runtime
2625            .execute(json!({"approved": false}), None)
2626            .await
2627            .expect("conditional workflow should take false branch");
2628
2629        assert_eq!(result.terminal_node_id, "end_false");
2630    }
2631
2632    #[tokio::test]
2633    async fn fails_when_tool_executor_is_missing() {
2634        let llm = MockLlmExecutor {
2635            output: "ok".to_string(),
2636        };
2637        let runtime = WorkflowRuntime::new(
2638            linear_workflow(),
2639            &llm,
2640            None,
2641            WorkflowRuntimeOptions::default(),
2642        );
2643
2644        let error = runtime
2645            .execute(json!({}), None)
2646            .await
2647            .expect_err("workflow should fail without tool executor");
2648
2649        assert!(matches!(
2650            error,
2651            WorkflowRuntimeError::MissingToolExecutor { node_id } if node_id == "tool"
2652        ));
2653    }
2654
2655    #[tokio::test]
2656    async fn fails_on_tool_execution_error() {
2657        let llm = MockLlmExecutor {
2658            output: "ok".to_string(),
2659        };
2660        let tools = MockToolExecutor {
2661            output: json!({"status": "unused"}),
2662            fail: true,
2663        };
2664        let runtime = WorkflowRuntime::new(
2665            linear_workflow(),
2666            &llm,
2667            Some(&tools),
2668            WorkflowRuntimeOptions::default(),
2669        );
2670
2671        let error = runtime
2672            .execute(json!({}), None)
2673            .await
2674            .expect_err("workflow should fail on tool error");
2675
2676        assert!(matches!(
2677            error,
2678            WorkflowRuntimeError::ToolRetryExhausted { node_id, attempts: 1, .. }
2679                if node_id == "tool"
2680        ));
2681    }
2682
2683    #[tokio::test]
2684    async fn retries_llm_after_transient_failure() {
2685        let llm = SequencedLlmExecutor {
2686            responses: Mutex::new(vec![
2687                Err(LlmExecutionError::Client("temporary".to_string())),
2688                Ok(LlmExecutionOutput {
2689                    content: "recovered".to_string(),
2690                }),
2691            ]),
2692            calls: AtomicUsize::new(0),
2693        };
2694        let runtime = WorkflowRuntime::new(
2695            llm_only_workflow(),
2696            &llm,
2697            None,
2698            WorkflowRuntimeOptions {
2699                llm_node_policy: NodeExecutionPolicy {
2700                    timeout: None,
2701                    max_retries: 1,
2702                },
2703                ..WorkflowRuntimeOptions::default()
2704            },
2705        );
2706
2707        let result = runtime
2708            .execute(json!({"request_id": "r2"}), None)
2709            .await
2710            .expect("llm retry should recover");
2711
2712        assert_eq!(result.terminal_node_id, "end");
2713        assert_eq!(result.node_outputs.get("llm"), Some(&json!("recovered")));
2714        assert_eq!(result.retry_events.len(), 1);
2715        assert_eq!(result.retry_events[0].operation, "llm");
2716        assert_eq!(llm.calls.load(Ordering::Relaxed), 2);
2717    }
2718
2719    #[tokio::test]
2720    async fn times_out_tool_execution_per_policy() {
2721        let llm = MockLlmExecutor {
2722            output: "ok".to_string(),
2723        };
2724        let tool = SlowToolExecutor {
2725            delay: Duration::from_millis(50),
2726        };
2727        let runtime = WorkflowRuntime::new(
2728            linear_workflow(),
2729            &llm,
2730            Some(&tool),
2731            WorkflowRuntimeOptions {
2732                tool_node_policy: NodeExecutionPolicy {
2733                    timeout: Some(Duration::from_millis(5)),
2734                    max_retries: 0,
2735                },
2736                ..WorkflowRuntimeOptions::default()
2737            },
2738        );
2739
2740        let error = runtime
2741            .execute(json!({}), None)
2742            .await
2743            .expect_err("tool execution should time out");
2744
2745        assert!(matches!(
2746            error,
2747            WorkflowRuntimeError::ToolTimeout {
2748                node_id,
2749                timeout_ms: 5,
2750                attempts: 1,
2751            } if node_id == "tool"
2752        ));
2753    }
2754
2755    #[tokio::test]
2756    async fn cancels_between_retry_attempts() {
2757        let cancel_flag = Arc::new(AtomicBool::new(false));
2758        let llm = CancellingLlmExecutor {
2759            cancel_flag: Arc::clone(&cancel_flag),
2760            calls: AtomicUsize::new(0),
2761        };
2762        let runtime = WorkflowRuntime::new(
2763            llm_only_workflow(),
2764            &llm,
2765            None,
2766            WorkflowRuntimeOptions {
2767                llm_node_policy: NodeExecutionPolicy {
2768                    timeout: None,
2769                    max_retries: 3,
2770                },
2771                ..WorkflowRuntimeOptions::default()
2772            },
2773        );
2774
2775        let error = runtime
2776            .execute(json!({}), Some(cancel_flag.as_ref()))
2777            .await
2778            .expect_err("workflow should stop when cancellation is observed");
2779
2780        assert!(matches!(error, WorkflowRuntimeError::Cancelled));
2781        assert_eq!(llm.calls.load(Ordering::Relaxed), 1);
2782    }
2783
2784    #[tokio::test]
2785    async fn enforces_step_limit_guard() {
2786        let workflow = WorkflowDefinition {
2787            version: "v0".to_string(),
2788            name: "loop".to_string(),
2789            nodes: vec![
2790                Node {
2791                    id: "start".to_string(),
2792                    kind: NodeKind::Start {
2793                        next: "condition".to_string(),
2794                    },
2795                },
2796                Node {
2797                    id: "condition".to_string(),
2798                    kind: NodeKind::Condition {
2799                        expression: "true".to_string(),
2800                        on_true: "condition".to_string(),
2801                        on_false: "end".to_string(),
2802                    },
2803                },
2804                Node {
2805                    id: "end".to_string(),
2806                    kind: NodeKind::End,
2807                },
2808            ],
2809        };
2810
2811        let llm = MockLlmExecutor {
2812            output: "unused".to_string(),
2813        };
2814        let runtime = WorkflowRuntime::new(
2815            workflow,
2816            &llm,
2817            None,
2818            WorkflowRuntimeOptions {
2819                max_steps: 3,
2820                ..WorkflowRuntimeOptions::default()
2821            },
2822        );
2823
2824        let error = runtime
2825            .execute(json!({}), None)
2826            .await
2827            .expect_err("workflow should fail on step limit");
2828
2829        assert!(matches!(
2830            error,
2831            WorkflowRuntimeError::StepLimitExceeded { max_steps: 3 }
2832        ));
2833    }
2834
2835    #[tokio::test]
2836    async fn validates_recorded_trace_in_replay_mode() {
2837        let llm = MockLlmExecutor {
2838            output: "ok".to_string(),
2839        };
2840        let tools = MockToolExecutor {
2841            output: json!({"status": "done"}),
2842            fail: false,
2843        };
2844        let runtime = WorkflowRuntime::new(
2845            linear_workflow(),
2846            &llm,
2847            Some(&tools),
2848            WorkflowRuntimeOptions {
2849                replay_mode: WorkflowReplayMode::ValidateRecordedTrace,
2850                ..WorkflowRuntimeOptions::default()
2851            },
2852        );
2853
2854        let result = runtime
2855            .execute(json!({"request_id": "r1"}), None)
2856            .await
2857            .expect("replay validation should pass");
2858
2859        assert!(result.trace.is_some());
2860        assert_eq!(
2861            result.replay_report.as_ref().map(|r| r.total_events),
2862            Some(9)
2863        );
2864    }
2865
2866    #[tokio::test]
2867    async fn executes_loop_until_condition_fails() {
2868        let llm = MockLlmExecutor {
2869            output: "unused".to_string(),
2870        };
2871        let counter = IncrementingToolExecutor {
2872            value: AtomicUsize::new(0),
2873        };
2874        let runtime = WorkflowRuntime::new(
2875            loop_workflow(Some(8)),
2876            &llm,
2877            Some(&counter),
2878            WorkflowRuntimeOptions::default(),
2879        );
2880
2881        let result = runtime
2882            .execute(json!({}), None)
2883            .await
2884            .expect("loop workflow should terminate at end");
2885
2886        assert_eq!(result.terminal_node_id, "end");
2887        assert!(result.node_executions.iter().any(|step| matches!(
2888            step.data,
2889            NodeExecutionData::Loop {
2890                evaluated: false,
2891                ..
2892            }
2893        )));
2894    }
2895
2896    #[tokio::test]
2897    async fn fails_when_loop_exceeds_max_iterations() {
2898        let llm = MockLlmExecutor {
2899            output: "unused".to_string(),
2900        };
2901        let counter = MockToolExecutor {
2902            output: json!(0),
2903            fail: false,
2904        };
2905        let runtime = WorkflowRuntime::new(
2906            loop_workflow(Some(2)),
2907            &llm,
2908            Some(&counter),
2909            WorkflowRuntimeOptions {
2910                max_steps: 20,
2911                ..WorkflowRuntimeOptions::default()
2912            },
2913        );
2914
2915        let error = runtime
2916            .execute(json!({}), None)
2917            .await
2918            .expect_err("loop should fail once max iterations are exceeded");
2919
2920        assert!(matches!(
2921            error,
2922            WorkflowRuntimeError::LoopIterationLimitExceeded {
2923                node_id,
2924                max_iterations: 2
2925            } if node_id == "loop"
2926        ));
2927    }
2928
2929    #[tokio::test]
2930    async fn executes_parallel_then_merge_all() {
2931        let llm = MockLlmExecutor {
2932            output: "unused".to_string(),
2933        };
2934        let tool = EchoInputToolExecutor;
2935        let runtime = WorkflowRuntime::new(
2936            parallel_merge_workflow(MergePolicy::All, None),
2937            &llm,
2938            Some(&tool),
2939            WorkflowRuntimeOptions::default(),
2940        );
2941
2942        let result = runtime
2943            .execute(json!({}), None)
2944            .await
2945            .expect("parallel merge workflow should succeed");
2946
2947        assert_eq!(result.terminal_node_id, "end");
2948        assert_eq!(
2949            result.node_outputs.get("tool_a"),
2950            Some(&json!({"value": 1}))
2951        );
2952        assert_eq!(
2953            result.node_outputs.get("tool_b"),
2954            Some(&json!({"value": 2}))
2955        );
2956        assert_eq!(
2957            result.node_outputs.get("merge"),
2958            Some(&json!([{"value": 1}, {"value": 2}]))
2959        );
2960    }
2961
2962    #[tokio::test]
2963    async fn executes_map_reduce_sum() {
2964        let llm = MockLlmExecutor {
2965            output: "unused".to_string(),
2966        };
2967        let tool = EchoInputToolExecutor;
2968        let runtime = WorkflowRuntime::new(
2969            map_reduce_workflow(ReduceOperation::Sum),
2970            &llm,
2971            Some(&tool),
2972            WorkflowRuntimeOptions::default(),
2973        );
2974
2975        let result = runtime
2976            .execute(json!({"values": [1, 2, 3]}), None)
2977            .await
2978            .expect("map reduce workflow should succeed");
2979
2980        assert_eq!(result.node_outputs.get("map"), Some(&json!([1, 2, 3])));
2981        assert_eq!(result.node_outputs.get("reduce"), Some(&json!(6.0)));
2982    }
2983
2984    #[tokio::test]
2985    async fn fails_map_when_items_path_is_not_array() {
2986        let llm = MockLlmExecutor {
2987            output: "unused".to_string(),
2988        };
2989        let tool = EchoInputToolExecutor;
2990        let runtime = WorkflowRuntime::new(
2991            map_reduce_workflow(ReduceOperation::Count),
2992            &llm,
2993            Some(&tool),
2994            WorkflowRuntimeOptions::default(),
2995        );
2996
2997        let error = runtime
2998            .execute(json!({"values": {"not": "array"}}), None)
2999            .await
3000            .expect_err("map node should fail on non-array path");
3001
3002        assert!(matches!(
3003            error,
3004            WorkflowRuntimeError::MapItemsNotArray {
3005                node_id,
3006                items_path
3007            } if node_id == "map" && items_path == "input.values"
3008        ));
3009    }
3010
3011    #[tokio::test]
3012    async fn executes_subgraph_via_registry() {
3013        let llm = MockLlmExecutor {
3014            output: "nested-ok".to_string(),
3015        };
3016        let subgraph = WorkflowDefinition {
3017            version: "v0".to_string(),
3018            name: "child".to_string(),
3019            nodes: vec![
3020                Node {
3021                    id: "start".to_string(),
3022                    kind: NodeKind::Start {
3023                        next: "llm".to_string(),
3024                    },
3025                },
3026                Node {
3027                    id: "llm".to_string(),
3028                    kind: NodeKind::Llm {
3029                        model: "gpt-4".to_string(),
3030                        prompt: "child".to_string(),
3031                        next: Some("end".to_string()),
3032                    },
3033                },
3034                Node {
3035                    id: "end".to_string(),
3036                    kind: NodeKind::End,
3037                },
3038            ],
3039        };
3040        let parent = WorkflowDefinition {
3041            version: "v0".to_string(),
3042            name: "parent".to_string(),
3043            nodes: vec![
3044                Node {
3045                    id: "start".to_string(),
3046                    kind: NodeKind::Start {
3047                        next: "sub".to_string(),
3048                    },
3049                },
3050                Node {
3051                    id: "sub".to_string(),
3052                    kind: NodeKind::Subgraph {
3053                        graph: "child_graph".to_string(),
3054                        next: Some("end".to_string()),
3055                    },
3056                },
3057                Node {
3058                    id: "end".to_string(),
3059                    kind: NodeKind::End,
3060                },
3061            ],
3062        };
3063
3064        let mut registry = BTreeMap::new();
3065        registry.insert("child_graph".to_string(), subgraph);
3066        let runtime = WorkflowRuntime::new(
3067            parent,
3068            &llm,
3069            None,
3070            WorkflowRuntimeOptions {
3071                subgraph_registry: registry,
3072                ..WorkflowRuntimeOptions::default()
3073            },
3074        );
3075
3076        let result = runtime
3077            .execute(json!({"approved": true}), None)
3078            .await
3079            .expect("subgraph workflow should execute");
3080
3081        assert_eq!(result.terminal_node_id, "end");
3082        assert!(matches!(
3083            result.node_executions[1].data,
3084            NodeExecutionData::Subgraph { .. }
3085        ));
3086    }
3087
3088    #[tokio::test]
3089    async fn executes_batch_and_filter_nodes() {
3090        let llm = MockLlmExecutor {
3091            output: "unused".to_string(),
3092        };
3093        let workflow = WorkflowDefinition {
3094            version: "v0".to_string(),
3095            name: "batch-filter".to_string(),
3096            nodes: vec![
3097                Node {
3098                    id: "start".to_string(),
3099                    kind: NodeKind::Start {
3100                        next: "batch".to_string(),
3101                    },
3102                },
3103                Node {
3104                    id: "batch".to_string(),
3105                    kind: NodeKind::Batch {
3106                        items_path: "input.items".to_string(),
3107                        next: "filter".to_string(),
3108                    },
3109                },
3110                Node {
3111                    id: "filter".to_string(),
3112                    kind: NodeKind::Filter {
3113                        items_path: "node_outputs.batch".to_string(),
3114                        expression: "item.keep == true".to_string(),
3115                        next: "end".to_string(),
3116                    },
3117                },
3118                Node {
3119                    id: "end".to_string(),
3120                    kind: NodeKind::End,
3121                },
3122            ],
3123        };
3124
3125        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3126        let result = runtime
3127            .execute(
3128                json!({
3129                    "items": [
3130                        {"id": 1, "keep": true},
3131                        {"id": 2, "keep": false},
3132                        {"id": 3, "keep": true}
3133                    ]
3134                }),
3135                None,
3136            )
3137            .await
3138            .expect("batch/filter workflow should execute");
3139
3140        assert_eq!(
3141            result
3142                .node_outputs
3143                .get("batch")
3144                .and_then(Value::as_array)
3145                .map(Vec::len),
3146            Some(3)
3147        );
3148        assert_eq!(
3149            result
3150                .node_outputs
3151                .get("filter")
3152                .and_then(Value::as_array)
3153                .map(Vec::len),
3154            Some(2)
3155        );
3156    }
3157
3158    #[tokio::test]
3159    async fn executes_debounce_and_throttle_nodes() {
3160        let llm = MockLlmExecutor {
3161            output: "unused".to_string(),
3162        };
3163        let runtime = WorkflowRuntime::new(
3164            debounce_and_throttle_workflow(),
3165            &llm,
3166            None,
3167            WorkflowRuntimeOptions::default(),
3168        );
3169
3170        let result = runtime
3171            .execute(json!({"key": "k1"}), None)
3172            .await
3173            .expect("debounce/throttle workflow should execute");
3174
3175        assert_eq!(result.terminal_node_id, "end_throttled");
3176        assert_eq!(
3177            result.node_outputs.get("debounce_a"),
3178            Some(&json!({"key": "k1", "suppressed": true}))
3179        );
3180        assert_eq!(
3181            result.node_outputs.get("throttle_a"),
3182            Some(&json!({"key": "k1", "throttled": true}))
3183        );
3184    }
3185
3186    #[tokio::test]
3187    async fn executes_retry_compensate_successfully_without_compensation() {
3188        let llm = MockLlmExecutor {
3189            output: "unused".to_string(),
3190        };
3191        let tools = RetryCompensateToolExecutor {
3192            attempts: AtomicUsize::new(0),
3193        };
3194        let runtime = WorkflowRuntime::new(
3195            retry_compensate_workflow("unstable_primary"),
3196            &llm,
3197            Some(&tools),
3198            WorkflowRuntimeOptions::default(),
3199        );
3200
3201        let result = runtime
3202            .execute(json!({}), None)
3203            .await
3204            .expect("retry_compensate should recover by retry");
3205
3206        assert_eq!(result.terminal_node_id, "end");
3207        assert_eq!(tools.attempts.load(Ordering::Relaxed), 2);
3208        assert_eq!(result.retry_events.len(), 1);
3209    }
3210
3211    #[tokio::test]
3212    async fn executes_retry_compensate_with_compensation_route() {
3213        let llm = MockLlmExecutor {
3214            output: "unused".to_string(),
3215        };
3216        let tools = RetryCompensateToolExecutor {
3217            attempts: AtomicUsize::new(0),
3218        };
3219        let runtime = WorkflowRuntime::new(
3220            retry_compensate_workflow("always_fail"),
3221            &llm,
3222            Some(&tools),
3223            WorkflowRuntimeOptions::default(),
3224        );
3225
3226        let result = runtime
3227            .execute(json!({}), None)
3228            .await
3229            .expect("retry_compensate should use compensation");
3230
3231        assert_eq!(result.terminal_node_id, "end_compensated");
3232        assert_eq!(result.retry_events.len(), 1);
3233        assert_eq!(
3234            result
3235                .node_outputs
3236                .get("retry_comp")
3237                .and_then(|value| value.get("status")),
3238            Some(&json!("compensated"))
3239        );
3240    }
3241
3242    #[tokio::test]
3243    async fn executes_event_cache_router_human_transform_nodes() {
3244        let llm = MockLlmExecutor {
3245            output: "unused".to_string(),
3246        };
3247        let runtime = WorkflowRuntime::new(
3248            extended_nodes_workflow(),
3249            &llm,
3250            None,
3251            WorkflowRuntimeOptions::default(),
3252        );
3253
3254        let result = runtime
3255            .execute(
3256                json!({
3257                    "event_type": "webhook",
3258                    "cache_key": "customer-1",
3259                    "payload": {"value": 99},
3260                    "mode": "manual",
3261                    "approval": "approve",
3262                    "review_notes": {"editor": "ops"}
3263                }),
3264                None,
3265            )
3266            .await
3267            .expect("extended nodes workflow should execute");
3268
3269        assert_eq!(result.terminal_node_id, "end");
3270        assert_eq!(
3271            result.node_outputs.get("cache_read"),
3272            Some(&json!({"key": "customer-1", "hit": true, "value": {"value": 99}}))
3273        );
3274        assert_eq!(
3275            result.node_outputs.get("router"),
3276            Some(&json!({"selected": "human"}))
3277        );
3278        assert_eq!(
3279            result.node_outputs.get("transform"),
3280            Some(&json!({"value": 99}))
3281        );
3282    }
3283
3284    #[tokio::test]
3285    async fn routes_event_trigger_mismatch_to_fallback() {
3286        let llm = MockLlmExecutor {
3287            output: "unused".to_string(),
3288        };
3289        let runtime = WorkflowRuntime::new(
3290            extended_nodes_workflow(),
3291            &llm,
3292            None,
3293            WorkflowRuntimeOptions::default(),
3294        );
3295
3296        let result = runtime
3297            .execute(
3298                json!({
3299                    "event_type": "cron",
3300                    "cache_key": "customer-1",
3301                    "payload": {"value": 99},
3302                    "mode": "manual",
3303                    "approval": "approve"
3304                }),
3305                None,
3306            )
3307            .await
3308            .expect("event mismatch should route to fallback");
3309
3310        assert_eq!(result.terminal_node_id, "end_mismatch");
3311    }
3312
3313    #[tokio::test]
3314    async fn cache_read_routes_to_on_miss_when_value_absent() {
3315        let llm = MockLlmExecutor {
3316            output: "unused".to_string(),
3317        };
3318        let workflow = WorkflowDefinition {
3319            version: "v0".to_string(),
3320            name: "cache-read-miss".to_string(),
3321            nodes: vec![
3322                Node {
3323                    id: "start".to_string(),
3324                    kind: NodeKind::Start {
3325                        next: "cache_read".to_string(),
3326                    },
3327                },
3328                Node {
3329                    id: "cache_read".to_string(),
3330                    kind: NodeKind::CacheRead {
3331                        key_path: "input.cache_key".to_string(),
3332                        next: "end_hit".to_string(),
3333                        on_miss: Some("end_miss".to_string()),
3334                    },
3335                },
3336                Node {
3337                    id: "end_hit".to_string(),
3338                    kind: NodeKind::End,
3339                },
3340                Node {
3341                    id: "end_miss".to_string(),
3342                    kind: NodeKind::End,
3343                },
3344            ],
3345        };
3346        let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3347
3348        let result = runtime
3349            .execute(json!({"cache_key": "new-customer"}), None)
3350            .await
3351            .expect("cache read miss should still execute via on_miss route");
3352
3353        assert_eq!(
3354            result.node_outputs.get("cache_read"),
3355            Some(&json!({"key": "new-customer", "hit": false, "value": null}))
3356        );
3357        assert_eq!(result.terminal_node_id, "end_miss");
3358    }
3359
3360    #[tokio::test]
3361    async fn rejects_condition_when_expression_scope_exceeds_limit() {
3362        let llm = MockLlmExecutor {
3363            output: "unused".to_string(),
3364        };
3365        let workflow = WorkflowDefinition {
3366            version: "v0".to_string(),
3367            name: "scope-limit".to_string(),
3368            nodes: vec![
3369                Node {
3370                    id: "start".to_string(),
3371                    kind: NodeKind::Start {
3372                        next: "condition".to_string(),
3373                    },
3374                },
3375                Node {
3376                    id: "condition".to_string(),
3377                    kind: NodeKind::Condition {
3378                        expression: "input.flag == true".to_string(),
3379                        on_true: "end".to_string(),
3380                        on_false: "end".to_string(),
3381                    },
3382                },
3383                Node {
3384                    id: "end".to_string(),
3385                    kind: NodeKind::End,
3386                },
3387            ],
3388        };
3389
3390        let runtime = WorkflowRuntime::new(
3391            workflow,
3392            &llm,
3393            None,
3394            WorkflowRuntimeOptions {
3395                security_limits: RuntimeSecurityLimits {
3396                    max_expression_scope_bytes: 32,
3397                    ..RuntimeSecurityLimits::default()
3398                },
3399                ..WorkflowRuntimeOptions::default()
3400            },
3401        );
3402
3403        let error = runtime
3404            .execute(
3405                json!({"flag": true, "payload": "this-is-too-large-for-limit"}),
3406                None,
3407            )
3408            .await
3409            .expect_err("condition should fail when scope budget is exceeded");
3410        assert!(matches!(
3411            error,
3412            WorkflowRuntimeError::ExpressionScopeLimitExceeded { node_id, .. }
3413                if node_id == "condition"
3414        ));
3415    }
3416
3417    #[tokio::test]
3418    async fn rejects_map_when_item_count_exceeds_limit() {
3419        let llm = MockLlmExecutor {
3420            output: "unused".to_string(),
3421        };
3422        let tool = EchoInputToolExecutor;
3423        let runtime = WorkflowRuntime::new(
3424            map_reduce_workflow(ReduceOperation::Count),
3425            &llm,
3426            Some(&tool),
3427            WorkflowRuntimeOptions {
3428                security_limits: RuntimeSecurityLimits {
3429                    max_map_items: 2,
3430                    ..RuntimeSecurityLimits::default()
3431                },
3432                ..WorkflowRuntimeOptions::default()
3433            },
3434        );
3435
3436        let error = runtime
3437            .execute(json!({"values": [1, 2, 3]}), None)
3438            .await
3439            .expect_err("map should fail when item guard is exceeded");
3440        assert!(matches!(
3441            error,
3442            WorkflowRuntimeError::MapItemLimitExceeded {
3443                node_id,
3444                actual_items: 3,
3445                max_items: 2,
3446            } if node_id == "map"
3447        ));
3448    }
3449
3450    #[tokio::test]
3451    async fn resumes_from_checkpoint() {
3452        let llm = MockLlmExecutor {
3453            output: "unused".to_string(),
3454        };
3455        let tool = MockToolExecutor {
3456            output: json!({"ok": true}),
3457            fail: false,
3458        };
3459        let runtime = WorkflowRuntime::new(
3460            linear_workflow(),
3461            &llm,
3462            Some(&tool),
3463            WorkflowRuntimeOptions::default(),
3464        );
3465
3466        let checkpoint = WorkflowCheckpoint {
3467            run_id: "run-1".to_string(),
3468            workflow_name: "linear".to_string(),
3469            step: 2,
3470            next_node_id: "tool".to_string(),
3471            scope_snapshot: json!({"input": {"request_id": "r-resume"}}),
3472        };
3473
3474        let resumed = runtime
3475            .execute_resume_from_failure(&checkpoint, None)
3476            .await
3477            .expect("resume should continue from checkpoint node");
3478        assert_eq!(resumed.terminal_node_id, "end");
3479        assert_eq!(resumed.node_executions[0].node_id, "tool");
3480    }
3481
3482    #[test]
3483    fn scope_capabilities_enforce_read_write_boundaries() {
3484        let mut scope = RuntimeScope::new(json!({"k": "v"}));
3485
3486        let read_error = scope
3487            .scoped_input(ScopeCapability::LlmWrite)
3488            .expect_err("write capability should not read scope");
3489        assert!(matches!(read_error, ScopeAccessError::ReadDenied { .. }));
3490
3491        let write_error = scope
3492            .record_tool_output("tool", json!({"ok": true}), ScopeCapability::LlmWrite)
3493            .expect_err("llm write capability should not write tool output");
3494        assert!(matches!(write_error, ScopeAccessError::WriteDenied { .. }));
3495    }
3496}