1use std::collections::{BTreeMap, HashMap};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use serde_json::{json, Map, Value};
7use simple_agent_type::message::Message;
8use simple_agent_type::request::CompletionRequest;
9use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
10use thiserror::Error;
11use tokio::time::timeout;
12
13use crate::checkpoint::WorkflowCheckpoint;
14use crate::expressions;
15use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
16use crate::recorder::{TraceRecordError, TraceRecorder};
17use crate::replay::{replay_trace, ReplayError, ReplayReport};
18use crate::scheduler::DagScheduler;
19use crate::trace::{TraceTerminalStatus, WorkflowTrace, WorkflowTraceMetadata};
20use crate::validation::{validate_and_normalize, ValidationErrors};
21
22#[derive(Debug, Clone)]
24pub struct WorkflowRuntimeOptions {
25 pub max_steps: usize,
27 pub validate_before_run: bool,
29 pub llm_node_policy: NodeExecutionPolicy,
31 pub tool_node_policy: NodeExecutionPolicy,
33 pub enable_trace_recording: bool,
35 pub replay_mode: WorkflowReplayMode,
37 pub scheduler_max_in_flight: usize,
39 pub subgraph_registry: BTreeMap<String, WorkflowDefinition>,
41 pub security_limits: RuntimeSecurityLimits,
43}
44
45#[derive(Debug, Clone)]
47pub struct RuntimeSecurityLimits {
48 pub max_expression_scope_bytes: usize,
50 pub max_map_items: usize,
52 pub max_parallel_branches: usize,
54 pub max_filter_items: usize,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum WorkflowReplayMode {
61 Disabled,
63 ValidateRecordedTrace,
65}
66
67#[derive(Debug, Clone, Default)]
69pub struct NodeExecutionPolicy {
70 pub timeout: Option<Duration>,
72 pub max_retries: usize,
74}
75
76impl Default for WorkflowRuntimeOptions {
77 fn default() -> Self {
78 Self {
79 max_steps: 256,
80 validate_before_run: true,
81 llm_node_policy: NodeExecutionPolicy::default(),
82 tool_node_policy: NodeExecutionPolicy::default(),
83 enable_trace_recording: true,
84 replay_mode: WorkflowReplayMode::Disabled,
85 scheduler_max_in_flight: 8,
86 subgraph_registry: BTreeMap::new(),
87 security_limits: RuntimeSecurityLimits::default(),
88 }
89 }
90}
91
92impl Default for RuntimeSecurityLimits {
93 fn default() -> Self {
94 Self {
95 max_expression_scope_bytes: 128 * 1024,
96 max_map_items: 4096,
97 max_parallel_branches: 128,
98 max_filter_items: 8192,
99 }
100 }
101}
102
103pub trait CancellationSignal: Send + Sync {
105 fn is_cancelled(&self) -> bool;
107}
108
109impl CancellationSignal for AtomicBool {
110 fn is_cancelled(&self) -> bool {
111 self.load(Ordering::Relaxed)
112 }
113}
114
115impl CancellationSignal for bool {
116 fn is_cancelled(&self) -> bool {
117 *self
118 }
119}
120
121#[derive(Debug, Clone, PartialEq)]
123pub struct LlmExecutionInput {
124 pub node_id: String,
126 pub model: String,
128 pub prompt: String,
130 pub scoped_input: Value,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct LlmExecutionOutput {
137 pub content: String,
139}
140
141#[derive(Debug, Error, Clone, PartialEq, Eq)]
143pub enum LlmExecutionError {
144 #[error("invalid completion request: {0}")]
146 InvalidRequest(String),
147 #[error("llm client execution failed: {0}")]
149 Client(String),
150 #[error("unexpected completion outcome: {0}")]
152 UnexpectedOutcome(&'static str),
153 #[error("llm response had no content")]
155 EmptyResponse,
156}
157
158#[async_trait]
160pub trait LlmExecutor: Send + Sync {
161 async fn execute(
163 &self,
164 input: LlmExecutionInput,
165 ) -> Result<LlmExecutionOutput, LlmExecutionError>;
166}
167
168#[async_trait]
169impl LlmExecutor for SimpleAgentsClient {
170 async fn execute(
171 &self,
172 input: LlmExecutionInput,
173 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
174 let user_prompt = build_prompt_with_scope(&input.prompt, &input.scoped_input);
175 let request = CompletionRequest::builder()
176 .model(input.model)
177 .message(Message::user(user_prompt))
178 .build()
179 .map_err(|error| LlmExecutionError::InvalidRequest(error.to_string()))?;
180
181 let outcome = self
182 .complete(&request, CompletionOptions::default())
183 .await
184 .map_err(|error| LlmExecutionError::Client(error.to_string()))?;
185
186 let response = match outcome {
187 CompletionOutcome::Response(response) => response,
188 CompletionOutcome::Stream(_) => {
189 return Err(LlmExecutionError::UnexpectedOutcome("stream"));
190 }
191 CompletionOutcome::HealedJson(_) => {
192 return Err(LlmExecutionError::UnexpectedOutcome("healed_json"));
193 }
194 CompletionOutcome::CoercedSchema(_) => {
195 return Err(LlmExecutionError::UnexpectedOutcome("coerced_schema"));
196 }
197 };
198
199 let content = response
200 .content()
201 .ok_or(LlmExecutionError::EmptyResponse)?
202 .to_string();
203
204 Ok(LlmExecutionOutput { content })
205 }
206}
207
208#[derive(Debug, Clone, PartialEq)]
210pub struct ToolExecutionInput {
211 pub node_id: String,
213 pub tool: String,
215 pub input: Value,
217 pub scoped_input: Value,
219}
220
221#[derive(Debug, Error, Clone, PartialEq, Eq)]
223pub enum ToolExecutionError {
224 #[error("tool handler not found: {tool}")]
226 NotFound {
227 tool: String,
229 },
230 #[error("tool execution failed: {0}")]
232 Failed(String),
233}
234
235#[async_trait]
237pub trait ToolExecutor: Send + Sync {
238 async fn execute_tool(&self, input: ToolExecutionInput) -> Result<Value, ToolExecutionError>;
240}
241
242#[derive(Debug, Clone, PartialEq)]
244pub struct NodeExecution {
245 pub step: usize,
247 pub node_id: String,
249 pub data: NodeExecutionData,
251}
252
253#[derive(Debug, Clone, PartialEq)]
255pub enum NodeExecutionData {
256 Start {
258 next: String,
260 },
261 Llm {
263 model: String,
265 output: String,
267 next: String,
269 },
270 Tool {
272 tool: String,
274 output: Value,
276 next: String,
278 },
279 Condition {
281 expression: String,
283 evaluated: bool,
285 next: String,
287 },
288 Debounce {
290 key: String,
292 suppressed: bool,
294 next: String,
296 },
297 Throttle {
299 key: String,
301 throttled: bool,
303 next: String,
305 },
306 RetryCompensate {
308 tool: String,
310 attempts: usize,
312 compensated: bool,
314 output: Value,
316 next: String,
318 },
319 HumanInTheLoop {
321 approved: bool,
323 response: Value,
325 next: String,
327 },
328 CacheRead {
330 key: String,
332 hit: bool,
334 value: Value,
336 next: String,
338 },
339 CacheWrite {
341 key: String,
343 value: Value,
345 next: String,
347 },
348 EventTrigger {
350 event: String,
352 matched: bool,
354 next: String,
356 },
357 Router {
359 selected: String,
361 next: String,
363 },
364 Transform {
366 expression: String,
368 output: Value,
370 next: String,
372 },
373 Loop {
375 condition: String,
377 evaluated: bool,
379 iteration: u32,
381 next: String,
383 },
384 Subgraph {
386 graph: String,
388 terminal_node_id: String,
390 output: Value,
392 next: String,
394 },
395 Batch {
397 items_path: String,
399 item_count: usize,
401 next: String,
403 },
404 Filter {
406 items_path: String,
408 expression: String,
410 kept: usize,
412 next: String,
414 },
415 Parallel {
417 branches: Vec<String>,
419 outputs: BTreeMap<String, Value>,
421 next: String,
423 },
424 Merge {
426 policy: MergePolicy,
428 sources: Vec<String>,
430 output: Value,
432 next: String,
434 },
435 Map {
437 item_count: usize,
439 output: Value,
441 next: String,
443 },
444 Reduce {
446 operation: ReduceOperation,
448 output: Value,
450 next: String,
452 },
453 End,
455}
456
457#[derive(Debug, Clone, PartialEq)]
459pub struct WorkflowEvent {
460 pub step: usize,
462 pub node_id: String,
464 pub kind: WorkflowEventKind,
466}
467
468#[derive(Debug, Clone, PartialEq)]
470pub enum WorkflowEventKind {
471 NodeStarted,
473 NodeCompleted {
475 data: NodeExecutionData,
477 },
478 NodeFailed {
480 message: String,
482 },
483}
484
485#[derive(Debug, Clone, PartialEq)]
487pub struct WorkflowRunResult {
488 pub workflow_name: String,
490 pub terminal_node_id: String,
492 pub node_executions: Vec<NodeExecution>,
494 pub events: Vec<WorkflowEvent>,
496 pub retry_events: Vec<WorkflowRetryEvent>,
498 pub node_outputs: BTreeMap<String, Value>,
500 pub trace: Option<WorkflowTrace>,
502 pub replay_report: Option<ReplayReport>,
504}
505
506#[derive(Debug, Clone, PartialEq, Eq)]
508pub struct WorkflowRetryEvent {
509 pub step: usize,
511 pub node_id: String,
513 pub operation: String,
515 pub failed_attempt: usize,
517 pub reason: String,
519}
520
521#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523enum ScopeCapability {
524 LlmRead,
525 ToolRead,
526 ConditionRead,
527 LlmWrite,
528 ToolWrite,
529 ConditionWrite,
530 MapRead,
531 MapWrite,
532 ReduceWrite,
533}
534
535impl ScopeCapability {
536 fn as_str(self) -> &'static str {
537 match self {
538 Self::LlmRead => "llm_read",
539 Self::ToolRead => "tool_read",
540 Self::ConditionRead => "condition_read",
541 Self::LlmWrite => "llm_write",
542 Self::ToolWrite => "tool_write",
543 Self::ConditionWrite => "condition_write",
544 Self::MapRead => "map_read",
545 Self::MapWrite => "map_write",
546 Self::ReduceWrite => "reduce_write",
547 }
548 }
549}
550
551#[derive(Debug, Clone, PartialEq, Eq, Error)]
553pub enum ScopeAccessError {
554 #[error("scope read denied for capability '{capability}'")]
556 ReadDenied {
557 capability: &'static str,
559 },
560 #[error("scope write denied for capability '{capability}'")]
562 WriteDenied {
563 capability: &'static str,
565 },
566}
567
568#[derive(Debug, Error)]
570pub enum WorkflowRuntimeError {
571 #[error(transparent)]
573 Validation(#[from] ValidationErrors),
574 #[error("workflow has no start node")]
576 MissingStartNode,
577 #[error("node not found: {node_id}")]
579 NodeNotFound {
580 node_id: String,
582 },
583 #[error("workflow exceeded max step limit ({max_steps})")]
585 StepLimitExceeded {
586 max_steps: usize,
588 },
589 #[error("workflow execution cancelled")]
591 Cancelled,
592 #[error("llm node '{node_id}' failed: {source}")]
594 Llm {
595 node_id: String,
597 source: LlmExecutionError,
599 },
600 #[error("tool node '{node_id}' failed: {source}")]
602 Tool {
603 node_id: String,
605 source: ToolExecutionError,
607 },
608 #[error("llm node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
610 LlmRetryExhausted {
611 node_id: String,
613 attempts: usize,
615 last_error: LlmExecutionError,
617 },
618 #[error("tool node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
620 ToolRetryExhausted {
621 node_id: String,
623 attempts: usize,
625 last_error: ToolExecutionError,
627 },
628 #[error(
630 "llm node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
631 )]
632 LlmTimeout {
633 node_id: String,
635 timeout_ms: u128,
637 attempts: usize,
639 },
640 #[error(
642 "tool node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
643 )]
644 ToolTimeout {
645 node_id: String,
647 timeout_ms: u128,
649 attempts: usize,
651 },
652 #[error("tool node '{node_id}' requires a tool executor")]
654 MissingToolExecutor {
655 node_id: String,
657 },
658 #[error("node '{node_id}' is missing required next edge")]
660 MissingNextEdge {
661 node_id: String,
663 },
664 #[error("condition node '{node_id}' has invalid expression '{expression}': {reason}")]
666 InvalidCondition {
667 node_id: String,
669 expression: String,
671 reason: String,
673 },
674 #[error("loop node '{node_id}' has invalid condition '{expression}': {reason}")]
676 InvalidLoopCondition {
677 node_id: String,
679 expression: String,
681 reason: String,
683 },
684 #[error("loop node '{node_id}' exceeded max iterations ({max_iterations})")]
686 LoopIterationLimitExceeded {
687 node_id: String,
689 max_iterations: u32,
691 },
692 #[error("node '{node_id}' did not provide a next transition")]
694 MissingNextTransition {
695 node_id: String,
697 },
698 #[error("scope access failed on node '{node_id}': {source}")]
700 ScopeAccess {
701 node_id: String,
703 source: ScopeAccessError,
705 },
706 #[error(transparent)]
708 TraceRecording(#[from] TraceRecordError),
709 #[error(transparent)]
711 Replay(#[from] ReplayError),
712 #[error("replay validation requires trace recording to be enabled")]
714 ReplayRequiresTraceRecording,
715 #[error("parallel node '{node_id}' cannot execute branch '{branch_id}': {reason}")]
717 ParallelBranchUnsupported {
718 node_id: String,
720 branch_id: String,
722 reason: String,
724 },
725 #[error("merge node '{node_id}' missing source output from '{source_id}'")]
727 MissingMergeSource {
728 node_id: String,
730 source_id: String,
732 },
733 #[error("merge node '{node_id}' quorum not met: required {required}, resolved {resolved}")]
735 MergeQuorumNotMet {
736 node_id: String,
738 required: usize,
740 resolved: usize,
742 },
743 #[error("map node '{node_id}' items_path '{items_path}' did not resolve to an array")]
745 MapItemsNotArray {
746 node_id: String,
748 items_path: String,
750 },
751 #[error("reduce node '{node_id}' source '{source_node}' is not reducible: {reason}")]
753 InvalidReduceInput {
754 node_id: String,
756 source_node: String,
758 reason: String,
760 },
761 #[error("subgraph node '{node_id}' references unknown graph '{graph}'")]
763 SubgraphNotFound { node_id: String, graph: String },
764 #[error("batch node '{node_id}' items_path '{items_path}' did not resolve to an array")]
766 BatchItemsNotArray { node_id: String, items_path: String },
767 #[error("filter node '{node_id}' items_path '{items_path}' did not resolve to an array")]
769 FilterItemsNotArray { node_id: String, items_path: String },
770 #[error("filter node '{node_id}' expression '{expression}' failed: {reason}")]
772 InvalidFilterExpression {
773 node_id: String,
774 expression: String,
775 reason: String,
776 },
777 #[error(
779 "expression scope too large on node '{node_id}': {actual_bytes} bytes exceeds limit {limit_bytes}"
780 )]
781 ExpressionScopeLimitExceeded {
782 node_id: String,
783 actual_bytes: usize,
784 limit_bytes: usize,
785 },
786 #[error(
788 "parallel node '{node_id}' has {actual_branches} branches, exceeding limit {max_branches}"
789 )]
790 ParallelBranchLimitExceeded {
791 node_id: String,
792 actual_branches: usize,
793 max_branches: usize,
794 },
795 #[error("map node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
797 MapItemLimitExceeded {
798 node_id: String,
799 actual_items: usize,
800 max_items: usize,
801 },
802 #[error("filter node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
804 FilterItemLimitExceeded {
805 node_id: String,
806 actual_items: usize,
807 max_items: usize,
808 },
809 #[error("node '{node_id}' could not resolve required path '{path}'")]
811 MissingPath { node_id: String, path: String },
812 #[error("node '{node_id}' path '{path}' did not resolve to a string cache key")]
814 CacheKeyNotString { node_id: String, path: String },
815 #[error("human node '{node_id}' has unsupported decision value at '{path}': {value}")]
817 InvalidHumanDecision {
818 node_id: String,
819 path: String,
820 value: String,
821 },
822 #[error("event trigger node '{node_id}' path '{path}' did not resolve to an event string")]
824 InvalidEventValue { node_id: String, path: String },
825 #[error("router node '{node_id}' route expression '{expression}' failed: {reason}")]
827 InvalidRouterExpression {
828 node_id: String,
829 expression: String,
830 reason: String,
831 },
832 #[error("transform node '{node_id}' expression '{expression}' failed: {reason}")]
834 InvalidTransformExpression {
835 node_id: String,
836 expression: String,
837 reason: String,
838 },
839 #[error(
841 "retry_compensate node '{node_id}' failed after {attempts} primary attempt(s) and compensation error: {compensation_error}"
842 )]
843 RetryCompensateFailed {
844 node_id: String,
845 attempts: usize,
846 compensation_error: ToolExecutionError,
847 },
848}
849
850pub struct WorkflowRuntime<'a> {
852 definition: WorkflowDefinition,
853 llm_executor: &'a dyn LlmExecutor,
854 tool_executor: Option<&'a dyn ToolExecutor>,
855 options: WorkflowRuntimeOptions,
856}
857
858impl<'a> WorkflowRuntime<'a> {
859 pub fn new(
861 definition: WorkflowDefinition,
862 llm_executor: &'a dyn LlmExecutor,
863 tool_executor: Option<&'a dyn ToolExecutor>,
864 options: WorkflowRuntimeOptions,
865 ) -> Self {
866 Self {
867 definition,
868 llm_executor,
869 tool_executor,
870 options,
871 }
872 }
873
874 pub async fn execute(
876 &self,
877 input: Value,
878 cancellation: Option<&dyn CancellationSignal>,
879 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
880 let workflow = if self.options.validate_before_run {
881 validate_and_normalize(&self.definition)?
882 } else {
883 self.definition.normalized()
884 };
885
886 let start_id = find_start_node_id(&workflow)?;
887 self.execute_from_node(
888 workflow,
889 RuntimeScope::new(input),
890 start_id,
891 0,
892 cancellation,
893 )
894 .await
895 }
896
897 pub async fn execute_resume_from_failure(
899 &self,
900 checkpoint: &WorkflowCheckpoint,
901 cancellation: Option<&dyn CancellationSignal>,
902 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
903 let workflow = if self.options.validate_before_run {
904 validate_and_normalize(&self.definition)?
905 } else {
906 self.definition.normalized()
907 };
908
909 let scope_input = checkpoint
910 .scope_snapshot
911 .get("input")
912 .cloned()
913 .unwrap_or_else(|| checkpoint.scope_snapshot.clone());
914
915 self.execute_from_node(
916 workflow,
917 RuntimeScope::new(scope_input),
918 checkpoint.next_node_id.clone(),
919 checkpoint.step,
920 cancellation,
921 )
922 .await
923 }
924
925 async fn execute_from_node(
926 &self,
927 workflow: WorkflowDefinition,
928 mut scope: RuntimeScope,
929 start_node_id: String,
930 starting_step: usize,
931 cancellation: Option<&dyn CancellationSignal>,
932 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
933 let node_index = build_node_index(&workflow);
934
935 if matches!(
936 self.options.replay_mode,
937 WorkflowReplayMode::ValidateRecordedTrace
938 ) && !self.options.enable_trace_recording
939 {
940 return Err(WorkflowRuntimeError::ReplayRequiresTraceRecording);
941 }
942
943 let trace_recorder = self.options.enable_trace_recording.then(|| {
944 TraceRecorder::new(WorkflowTraceMetadata {
945 trace_id: format!("{}-{}-trace", workflow.name, workflow.version),
946 workflow_name: workflow.name.clone(),
947 workflow_version: workflow.version.clone(),
948 started_at_unix_ms: 0,
949 finished_at_unix_ms: None,
950 })
951 });
952 let mut trace_clock = 0u64;
953 let mut events = Vec::new();
954 let mut retry_events = Vec::new();
955 let mut node_executions = Vec::new();
956 let mut current_id = start_node_id;
957
958 for step in starting_step..self.options.max_steps {
959 check_cancelled(cancellation)?;
960 let node = node_index.get(current_id.as_str()).ok_or_else(|| {
961 WorkflowRuntimeError::NodeNotFound {
962 node_id: current_id.clone(),
963 }
964 })?;
965
966 events.push(WorkflowEvent {
967 step,
968 node_id: current_id.clone(),
969 kind: WorkflowEventKind::NodeStarted,
970 });
971
972 if let Some(recorder) = &trace_recorder {
973 recorder.record_node_enter(next_trace_timestamp(&mut trace_clock), ¤t_id)?;
974 }
975
976 let execution_result = self
977 .execute_node(
978 node,
979 &node_index,
980 step,
981 &mut scope,
982 cancellation,
983 &mut retry_events,
984 )
985 .await;
986 let execution = match execution_result {
987 Ok(execution) => execution,
988 Err(error) => {
989 if let Some(recorder) = &trace_recorder {
990 recorder.record_node_error(
991 next_trace_timestamp(&mut trace_clock),
992 ¤t_id,
993 error.to_string(),
994 )?;
995 recorder.record_terminal(
996 next_trace_timestamp(&mut trace_clock),
997 TraceTerminalStatus::Failed,
998 )?;
999 let _ = recorder.finalize(next_trace_timestamp(&mut trace_clock))?;
1000 }
1001 events.push(WorkflowEvent {
1002 step,
1003 node_id: current_id,
1004 kind: WorkflowEventKind::NodeFailed {
1005 message: error.to_string(),
1006 },
1007 });
1008 return Err(error);
1009 }
1010 };
1011
1012 events.push(WorkflowEvent {
1013 step,
1014 node_id: execution.node_id.clone(),
1015 kind: WorkflowEventKind::NodeCompleted {
1016 data: execution.data.clone(),
1017 },
1018 });
1019
1020 if let Some(recorder) = &trace_recorder {
1021 recorder
1022 .record_node_exit(next_trace_timestamp(&mut trace_clock), &execution.node_id)?;
1023 }
1024
1025 let is_terminal = matches!(execution.data, NodeExecutionData::End);
1026 let next_node = next_node_id(&execution.data);
1027 let executed_node_id = execution.node_id.clone();
1028 node_executions.push(execution);
1029
1030 if is_terminal {
1031 let (trace, replay_report) = if let Some(recorder) = &trace_recorder {
1032 recorder.record_terminal(
1033 next_trace_timestamp(&mut trace_clock),
1034 TraceTerminalStatus::Completed,
1035 )?;
1036 let finalized_trace =
1037 recorder.finalize(next_trace_timestamp(&mut trace_clock))?;
1038 let replay_report = match self.options.replay_mode {
1039 WorkflowReplayMode::Disabled => None,
1040 WorkflowReplayMode::ValidateRecordedTrace => {
1041 Some(replay_trace(&finalized_trace)?)
1042 }
1043 };
1044 (Some(finalized_trace), replay_report)
1045 } else {
1046 (None, None)
1047 };
1048
1049 return Ok(WorkflowRunResult {
1050 workflow_name: workflow.name,
1051 terminal_node_id: executed_node_id,
1052 node_executions,
1053 events,
1054 retry_events,
1055 node_outputs: scope.node_outputs,
1056 trace,
1057 replay_report,
1058 });
1059 }
1060
1061 current_id = next_node.ok_or(WorkflowRuntimeError::MissingNextTransition {
1062 node_id: executed_node_id,
1063 })?;
1064 }
1065
1066 Err(WorkflowRuntimeError::StepLimitExceeded {
1067 max_steps: self.options.max_steps,
1068 })
1069 }
1070
1071 async fn execute_node(
1072 &self,
1073 node: &Node,
1074 node_index: &HashMap<&str, &Node>,
1075 step: usize,
1076 scope: &mut RuntimeScope,
1077 cancellation: Option<&dyn CancellationSignal>,
1078 retry_events: &mut Vec<WorkflowRetryEvent>,
1079 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1080 match &node.kind {
1081 NodeKind::Start { next } => Ok(self.execute_start_node(step, node, next)),
1082 NodeKind::Llm {
1083 model,
1084 prompt,
1085 next,
1086 } => {
1087 self.execute_llm_node(
1088 step,
1089 node,
1090 LlmNodeSpec {
1091 model,
1092 prompt,
1093 next,
1094 },
1095 scope,
1096 cancellation,
1097 retry_events,
1098 )
1099 .await
1100 }
1101 NodeKind::Tool { tool, input, next } => {
1102 self.execute_tool_node(
1103 step,
1104 node,
1105 ToolNodeSpec { tool, input, next },
1106 scope,
1107 cancellation,
1108 retry_events,
1109 )
1110 .await
1111 }
1112 NodeKind::Condition {
1113 expression,
1114 on_true,
1115 on_false,
1116 } => self.execute_condition_node(
1117 step,
1118 node,
1119 ConditionNodeSpec {
1120 expression,
1121 on_true,
1122 on_false,
1123 },
1124 scope,
1125 cancellation,
1126 ),
1127 NodeKind::Debounce {
1128 key_path,
1129 window_steps,
1130 next,
1131 on_suppressed,
1132 } => {
1133 let scoped_input =
1134 scope
1135 .scoped_input(ScopeCapability::ConditionRead)
1136 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1137 node_id: node.id.clone(),
1138 source,
1139 })?;
1140 let key = resolve_string_path(&scoped_input, key_path).ok_or_else(|| {
1141 WorkflowRuntimeError::CacheKeyNotString {
1142 node_id: node.id.clone(),
1143 path: key_path.clone(),
1144 }
1145 })?;
1146 let suppressed = scope.debounce(&node.id, &key, step, *window_steps);
1147 let chosen_next = if suppressed {
1148 on_suppressed.clone().unwrap_or_else(|| next.clone())
1149 } else {
1150 next.clone()
1151 };
1152
1153 scope
1154 .record_node_output(
1155 &node.id,
1156 json!({"key": key.clone(), "suppressed": suppressed}),
1157 ScopeCapability::MapWrite,
1158 )
1159 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1160 node_id: node.id.clone(),
1161 source,
1162 })?;
1163
1164 Ok(NodeExecution {
1165 step,
1166 node_id: node.id.clone(),
1167 data: NodeExecutionData::Debounce {
1168 key,
1169 suppressed,
1170 next: chosen_next,
1171 },
1172 })
1173 }
1174 NodeKind::Throttle {
1175 key_path,
1176 window_steps,
1177 next,
1178 on_throttled,
1179 } => {
1180 let scoped_input =
1181 scope
1182 .scoped_input(ScopeCapability::ConditionRead)
1183 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1184 node_id: node.id.clone(),
1185 source,
1186 })?;
1187 let key = resolve_string_path(&scoped_input, key_path).ok_or_else(|| {
1188 WorkflowRuntimeError::CacheKeyNotString {
1189 node_id: node.id.clone(),
1190 path: key_path.clone(),
1191 }
1192 })?;
1193 let throttled = scope.throttle(&node.id, &key, step, *window_steps);
1194 let chosen_next = if throttled {
1195 on_throttled.clone().unwrap_or_else(|| next.clone())
1196 } else {
1197 next.clone()
1198 };
1199
1200 scope
1201 .record_node_output(
1202 &node.id,
1203 json!({"key": key.clone(), "throttled": throttled}),
1204 ScopeCapability::MapWrite,
1205 )
1206 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1207 node_id: node.id.clone(),
1208 source,
1209 })?;
1210
1211 Ok(NodeExecution {
1212 step,
1213 node_id: node.id.clone(),
1214 data: NodeExecutionData::Throttle {
1215 key,
1216 throttled,
1217 next: chosen_next,
1218 },
1219 })
1220 }
1221 NodeKind::RetryCompensate {
1222 tool,
1223 input,
1224 max_retries,
1225 compensate_tool,
1226 compensate_input,
1227 next,
1228 on_compensated,
1229 } => {
1230 let executor = self.tool_executor.ok_or_else(|| {
1231 WorkflowRuntimeError::MissingToolExecutor {
1232 node_id: node.id.clone(),
1233 }
1234 })?;
1235 let scoped_input =
1236 scope
1237 .scoped_input(ScopeCapability::ToolRead)
1238 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1239 node_id: node.id.clone(),
1240 source,
1241 })?;
1242 let total_attempts = max_retries.saturating_add(1);
1243 let mut last_error = None;
1244 let mut output = Value::Null;
1245 let mut compensated = false;
1246
1247 for attempt in 1..=total_attempts {
1248 check_cancelled(cancellation)?;
1249 match executor
1250 .execute_tool(ToolExecutionInput {
1251 node_id: node.id.clone(),
1252 tool: tool.clone(),
1253 input: input.clone(),
1254 scoped_input: scoped_input.clone(),
1255 })
1256 .await
1257 {
1258 Ok(value) => {
1259 output = json!({"status": "ok", "attempt": attempt, "value": value});
1260 break;
1261 }
1262 Err(error) => {
1263 last_error = Some(error.clone());
1264 if attempt < total_attempts {
1265 retry_events.push(WorkflowRetryEvent {
1266 step,
1267 node_id: node.id.clone(),
1268 operation: "retry_compensate".to_string(),
1269 failed_attempt: attempt,
1270 reason: error.to_string(),
1271 });
1272 }
1273 }
1274 }
1275 }
1276
1277 if output.is_null() {
1278 compensated = true;
1279 let compensation = executor
1280 .execute_tool(ToolExecutionInput {
1281 node_id: node.id.clone(),
1282 tool: compensate_tool.clone(),
1283 input: compensate_input.clone(),
1284 scoped_input,
1285 })
1286 .await
1287 .map_err(|compensation_error| {
1288 WorkflowRuntimeError::RetryCompensateFailed {
1289 node_id: node.id.clone(),
1290 attempts: total_attempts,
1291 compensation_error,
1292 }
1293 })?;
1294 output = json!({
1295 "status": "compensated",
1296 "attempts": total_attempts,
1297 "last_error": last_error.map(|error| error.to_string()).unwrap_or_default(),
1298 "compensation": compensation
1299 });
1300 }
1301
1302 let chosen_next = if compensated {
1303 on_compensated.clone().unwrap_or_else(|| next.clone())
1304 } else {
1305 next.clone()
1306 };
1307
1308 scope
1309 .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1310 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1311 node_id: node.id.clone(),
1312 source,
1313 })?;
1314
1315 Ok(NodeExecution {
1316 step,
1317 node_id: node.id.clone(),
1318 data: NodeExecutionData::RetryCompensate {
1319 tool: tool.clone(),
1320 attempts: total_attempts,
1321 compensated,
1322 output,
1323 next: chosen_next,
1324 },
1325 })
1326 }
1327 NodeKind::HumanInTheLoop {
1328 decision_path,
1329 response_path,
1330 on_approve,
1331 on_reject,
1332 } => {
1333 let scoped_input =
1334 scope
1335 .scoped_input(ScopeCapability::ConditionRead)
1336 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1337 node_id: node.id.clone(),
1338 source,
1339 })?;
1340 let decision_value =
1341 resolve_path(&scoped_input, decision_path).ok_or_else(|| {
1342 WorkflowRuntimeError::MissingPath {
1343 node_id: node.id.clone(),
1344 path: decision_path.clone(),
1345 }
1346 })?;
1347 let approved = evaluate_human_decision(decision_value).ok_or_else(|| {
1348 WorkflowRuntimeError::InvalidHumanDecision {
1349 node_id: node.id.clone(),
1350 path: decision_path.clone(),
1351 value: decision_value.to_string(),
1352 }
1353 })?;
1354 let response = if let Some(path) = response_path {
1355 resolve_path(&scoped_input, path).cloned().ok_or_else(|| {
1356 WorkflowRuntimeError::MissingPath {
1357 node_id: node.id.clone(),
1358 path: path.clone(),
1359 }
1360 })?
1361 } else {
1362 Value::Null
1363 };
1364 let chosen_next = if approved {
1365 on_approve.clone()
1366 } else {
1367 on_reject.clone()
1368 };
1369
1370 scope
1371 .record_node_output(
1372 &node.id,
1373 json!({"approved": approved, "response": response.clone()}),
1374 ScopeCapability::MapWrite,
1375 )
1376 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1377 node_id: node.id.clone(),
1378 source,
1379 })?;
1380
1381 Ok(NodeExecution {
1382 step,
1383 node_id: node.id.clone(),
1384 data: NodeExecutionData::HumanInTheLoop {
1385 approved,
1386 response,
1387 next: chosen_next,
1388 },
1389 })
1390 }
1391 NodeKind::CacheWrite {
1392 key_path,
1393 value_path,
1394 next,
1395 } => {
1396 let scoped_input =
1397 scope
1398 .scoped_input(ScopeCapability::ConditionRead)
1399 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1400 node_id: node.id.clone(),
1401 source,
1402 })?;
1403 let key = resolve_string_path(&scoped_input, key_path).ok_or_else(|| {
1404 WorkflowRuntimeError::CacheKeyNotString {
1405 node_id: node.id.clone(),
1406 path: key_path.clone(),
1407 }
1408 })?;
1409 let value = resolve_path(&scoped_input, value_path)
1410 .cloned()
1411 .ok_or_else(|| WorkflowRuntimeError::MissingPath {
1412 node_id: node.id.clone(),
1413 path: value_path.clone(),
1414 })?;
1415
1416 scope.put_cache(&key, value.clone());
1417 scope
1418 .record_node_output(
1419 &node.id,
1420 json!({"key": key.clone(), "value": value.clone()}),
1421 ScopeCapability::MapWrite,
1422 )
1423 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1424 node_id: node.id.clone(),
1425 source,
1426 })?;
1427
1428 Ok(NodeExecution {
1429 step,
1430 node_id: node.id.clone(),
1431 data: NodeExecutionData::CacheWrite {
1432 key,
1433 value,
1434 next: next.clone(),
1435 },
1436 })
1437 }
1438 NodeKind::CacheRead {
1439 key_path,
1440 next,
1441 on_miss,
1442 } => {
1443 let scoped_input =
1444 scope
1445 .scoped_input(ScopeCapability::ConditionRead)
1446 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1447 node_id: node.id.clone(),
1448 source,
1449 })?;
1450 let key = resolve_string_path(&scoped_input, key_path).ok_or_else(|| {
1451 WorkflowRuntimeError::CacheKeyNotString {
1452 node_id: node.id.clone(),
1453 path: key_path.clone(),
1454 }
1455 })?;
1456 let value = scope.cache_value(&key).cloned().unwrap_or(Value::Null);
1457 let hit = !value.is_null();
1458 let chosen_next = if hit {
1459 next.clone()
1460 } else {
1461 on_miss.clone().unwrap_or_else(|| next.clone())
1462 };
1463
1464 scope
1465 .record_node_output(
1466 &node.id,
1467 json!({"key": key.clone(), "hit": hit, "value": value.clone()}),
1468 ScopeCapability::MapWrite,
1469 )
1470 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1471 node_id: node.id.clone(),
1472 source,
1473 })?;
1474
1475 Ok(NodeExecution {
1476 step,
1477 node_id: node.id.clone(),
1478 data: NodeExecutionData::CacheRead {
1479 key,
1480 hit,
1481 value,
1482 next: chosen_next,
1483 },
1484 })
1485 }
1486 NodeKind::EventTrigger {
1487 event,
1488 event_path,
1489 next,
1490 on_mismatch,
1491 } => {
1492 let scoped_input =
1493 scope
1494 .scoped_input(ScopeCapability::ConditionRead)
1495 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1496 node_id: node.id.clone(),
1497 source,
1498 })?;
1499 let actual = resolve_path(&scoped_input, event_path)
1500 .and_then(Value::as_str)
1501 .ok_or_else(|| WorkflowRuntimeError::InvalidEventValue {
1502 node_id: node.id.clone(),
1503 path: event_path.clone(),
1504 })?;
1505 let matched = actual == event;
1506 let chosen_next = if matched {
1507 next.clone()
1508 } else {
1509 on_mismatch.clone().unwrap_or_else(|| next.clone())
1510 };
1511
1512 scope
1513 .record_node_output(
1514 &node.id,
1515 json!({"event": event.clone(), "matched": matched, "actual": actual}),
1516 ScopeCapability::MapWrite,
1517 )
1518 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1519 node_id: node.id.clone(),
1520 source,
1521 })?;
1522
1523 Ok(NodeExecution {
1524 step,
1525 node_id: node.id.clone(),
1526 data: NodeExecutionData::EventTrigger {
1527 event: event.clone(),
1528 matched,
1529 next: chosen_next,
1530 },
1531 })
1532 }
1533 NodeKind::Router { routes, default } => {
1534 let scoped_input =
1535 scope
1536 .scoped_input(ScopeCapability::ConditionRead)
1537 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1538 node_id: node.id.clone(),
1539 source,
1540 })?;
1541 enforce_expression_scope_budget(
1542 &node.id,
1543 &scoped_input,
1544 self.options.security_limits.max_expression_scope_bytes,
1545 )?;
1546 let mut selected = default.clone();
1547 for route in routes {
1548 let matched = expressions::evaluate_bool(&route.when, &scoped_input).map_err(
1549 |reason| WorkflowRuntimeError::InvalidRouterExpression {
1550 node_id: node.id.clone(),
1551 expression: route.when.clone(),
1552 reason: reason.to_string(),
1553 },
1554 )?;
1555 if matched {
1556 selected = route.next.clone();
1557 break;
1558 }
1559 }
1560
1561 scope
1562 .record_node_output(
1563 &node.id,
1564 json!({"selected": selected.clone()}),
1565 ScopeCapability::MapWrite,
1566 )
1567 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1568 node_id: node.id.clone(),
1569 source,
1570 })?;
1571
1572 Ok(NodeExecution {
1573 step,
1574 node_id: node.id.clone(),
1575 data: NodeExecutionData::Router {
1576 selected: selected.clone(),
1577 next: selected,
1578 },
1579 })
1580 }
1581 NodeKind::Transform { expression, next } => {
1582 let scoped_input =
1583 scope
1584 .scoped_input(ScopeCapability::ConditionRead)
1585 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1586 node_id: node.id.clone(),
1587 source,
1588 })?;
1589 let output =
1590 evaluate_transform_expression(expression, &scoped_input).map_err(|reason| {
1591 WorkflowRuntimeError::InvalidTransformExpression {
1592 node_id: node.id.clone(),
1593 expression: expression.clone(),
1594 reason,
1595 }
1596 })?;
1597
1598 scope
1599 .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1600 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1601 node_id: node.id.clone(),
1602 source,
1603 })?;
1604
1605 Ok(NodeExecution {
1606 step,
1607 node_id: node.id.clone(),
1608 data: NodeExecutionData::Transform {
1609 expression: expression.clone(),
1610 output,
1611 next: next.clone(),
1612 },
1613 })
1614 }
1615 NodeKind::Loop {
1616 condition,
1617 body,
1618 next,
1619 max_iterations,
1620 } => {
1621 check_cancelled(cancellation)?;
1622 let scoped_input =
1623 scope
1624 .scoped_input(ScopeCapability::ConditionRead)
1625 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1626 node_id: node.id.clone(),
1627 source,
1628 })?;
1629 enforce_expression_scope_budget(
1630 &node.id,
1631 &scoped_input,
1632 self.options.security_limits.max_expression_scope_bytes,
1633 )?;
1634 let evaluated =
1635 expressions::evaluate_bool(condition, &scoped_input).map_err(|reason| {
1636 WorkflowRuntimeError::InvalidLoopCondition {
1637 node_id: node.id.clone(),
1638 expression: condition.clone(),
1639 reason: reason.to_string(),
1640 }
1641 })?;
1642
1643 let (iteration, chosen_next) = if evaluated {
1644 let iteration = scope.loop_iteration(&node.id).saturating_add(1);
1645 if let Some(limit) = max_iterations {
1646 if iteration > *limit {
1647 return Err(WorkflowRuntimeError::LoopIterationLimitExceeded {
1648 node_id: node.id.clone(),
1649 max_iterations: *limit,
1650 });
1651 }
1652 }
1653 scope.set_loop_iteration(&node.id, iteration);
1654 (iteration, body.clone())
1655 } else {
1656 scope.clear_loop_iteration(&node.id);
1657 (0, next.clone())
1658 };
1659
1660 scope
1661 .record_condition_output(&node.id, evaluated, ScopeCapability::ConditionWrite)
1662 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1663 node_id: node.id.clone(),
1664 source,
1665 })?;
1666
1667 Ok(NodeExecution {
1668 step,
1669 node_id: node.id.clone(),
1670 data: NodeExecutionData::Loop {
1671 condition: condition.clone(),
1672 evaluated,
1673 iteration,
1674 next: chosen_next,
1675 },
1676 })
1677 }
1678 NodeKind::Parallel {
1679 branches,
1680 next,
1681 max_in_flight,
1682 } => {
1683 self.execute_parallel_node(
1684 step,
1685 node,
1686 ParallelNodeSpec {
1687 node_index,
1688 branches,
1689 next,
1690 max_in_flight: *max_in_flight,
1691 },
1692 scope,
1693 cancellation,
1694 retry_events,
1695 )
1696 .await
1697 }
1698 NodeKind::Merge {
1699 sources,
1700 policy,
1701 quorum,
1702 next,
1703 } => self.execute_merge_node(
1704 step,
1705 node,
1706 MergeNodeSpec {
1707 sources,
1708 policy,
1709 quorum: *quorum,
1710 next,
1711 },
1712 scope,
1713 ),
1714 NodeKind::Map {
1715 tool,
1716 items_path,
1717 next,
1718 max_in_flight,
1719 } => {
1720 self.execute_map_node(
1721 step,
1722 node,
1723 MapNodeSpec {
1724 tool,
1725 items_path,
1726 next,
1727 max_in_flight: *max_in_flight,
1728 },
1729 scope,
1730 cancellation,
1731 retry_events,
1732 )
1733 .await
1734 }
1735 NodeKind::Reduce {
1736 source,
1737 operation,
1738 next,
1739 } => self.execute_reduce_node(step, node, source, operation, next, scope),
1740 NodeKind::Subgraph { graph, next } => {
1741 check_cancelled(cancellation)?;
1742 let next_node =
1743 next.clone()
1744 .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
1745 node_id: node.id.clone(),
1746 })?;
1747 let subgraph = self.options.subgraph_registry.get(graph).ok_or_else(|| {
1748 WorkflowRuntimeError::SubgraphNotFound {
1749 node_id: node.id.clone(),
1750 graph: graph.clone(),
1751 }
1752 })?;
1753
1754 let subgraph_runtime = WorkflowRuntime::new(
1755 subgraph.clone(),
1756 self.llm_executor,
1757 self.tool_executor,
1758 WorkflowRuntimeOptions {
1759 replay_mode: WorkflowReplayMode::Disabled,
1760 enable_trace_recording: false,
1761 ..self.options.clone()
1762 },
1763 );
1764 let subgraph_result = Box::pin(
1765 subgraph_runtime.execute(
1766 scope
1767 .scoped_input(ScopeCapability::ConditionRead)
1768 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1769 node_id: node.id.clone(),
1770 source,
1771 })?,
1772 cancellation,
1773 ),
1774 )
1775 .await?;
1776
1777 let subgraph_output = json!({
1778 "terminal_node_id": subgraph_result.terminal_node_id,
1779 "node_outputs": subgraph_result.node_outputs,
1780 });
1781 scope
1782 .record_node_output(
1783 &node.id,
1784 subgraph_output.clone(),
1785 ScopeCapability::MapWrite,
1786 )
1787 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1788 node_id: node.id.clone(),
1789 source,
1790 })?;
1791
1792 Ok(NodeExecution {
1793 step,
1794 node_id: node.id.clone(),
1795 data: NodeExecutionData::Subgraph {
1796 graph: graph.clone(),
1797 terminal_node_id: subgraph_result.terminal_node_id,
1798 output: subgraph_output,
1799 next: next_node,
1800 },
1801 })
1802 }
1803 NodeKind::Batch { items_path, next } => {
1804 self.execute_batch_node(step, node, items_path, next, scope)
1805 }
1806 NodeKind::Filter {
1807 items_path,
1808 expression,
1809 next,
1810 } => self.execute_filter_node(step, node, items_path, expression, next, scope),
1811 NodeKind::End => Ok(NodeExecution {
1812 step,
1813 node_id: node.id.clone(),
1814 data: NodeExecutionData::End,
1815 }),
1816 }
1817 }
1818
1819 fn execute_start_node(&self, step: usize, node: &Node, next: &str) -> NodeExecution {
1820 NodeExecution {
1821 step,
1822 node_id: node.id.clone(),
1823 data: NodeExecutionData::Start {
1824 next: next.to_string(),
1825 },
1826 }
1827 }
1828
1829 async fn execute_llm_node(
1830 &self,
1831 step: usize,
1832 node: &Node,
1833 spec: LlmNodeSpec<'_>,
1834 scope: &mut RuntimeScope,
1835 cancellation: Option<&dyn CancellationSignal>,
1836 retry_events: &mut Vec<WorkflowRetryEvent>,
1837 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1838 let next_node = spec
1839 .next
1840 .clone()
1841 .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
1842 node_id: node.id.clone(),
1843 })?;
1844
1845 let (output, llm_retries) = self
1846 .execute_llm_with_policy(step, node, spec.model, spec.prompt, scope, cancellation)
1847 .await?;
1848 retry_events.extend(llm_retries);
1849
1850 scope
1851 .record_llm_output(&node.id, output.content.clone(), ScopeCapability::LlmWrite)
1852 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1853 node_id: node.id.clone(),
1854 source,
1855 })?;
1856
1857 Ok(NodeExecution {
1858 step,
1859 node_id: node.id.clone(),
1860 data: NodeExecutionData::Llm {
1861 model: spec.model.to_string(),
1862 output: output.content,
1863 next: next_node,
1864 },
1865 })
1866 }
1867
1868 async fn execute_tool_node(
1869 &self,
1870 step: usize,
1871 node: &Node,
1872 spec: ToolNodeSpec<'_>,
1873 scope: &mut RuntimeScope,
1874 cancellation: Option<&dyn CancellationSignal>,
1875 retry_events: &mut Vec<WorkflowRetryEvent>,
1876 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1877 let next_node = spec
1878 .next
1879 .clone()
1880 .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
1881 node_id: node.id.clone(),
1882 })?;
1883
1884 let executor =
1885 self.tool_executor
1886 .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
1887 node_id: node.id.clone(),
1888 })?;
1889
1890 let scoped_input = scope
1891 .scoped_input(ScopeCapability::ToolRead)
1892 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1893 node_id: node.id.clone(),
1894 source,
1895 })?;
1896
1897 let (tool_output, tool_retries) = self
1898 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1899 step,
1900 node,
1901 tool: spec.tool,
1902 input: spec.input,
1903 executor,
1904 scoped_input,
1905 cancellation,
1906 })
1907 .await?;
1908 retry_events.extend(tool_retries);
1909
1910 scope
1911 .record_tool_output(&node.id, tool_output.clone(), ScopeCapability::ToolWrite)
1912 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1913 node_id: node.id.clone(),
1914 source,
1915 })?;
1916
1917 Ok(NodeExecution {
1918 step,
1919 node_id: node.id.clone(),
1920 data: NodeExecutionData::Tool {
1921 tool: spec.tool.to_string(),
1922 output: tool_output,
1923 next: next_node,
1924 },
1925 })
1926 }
1927
1928 fn execute_condition_node(
1929 &self,
1930 step: usize,
1931 node: &Node,
1932 spec: ConditionNodeSpec<'_>,
1933 scope: &mut RuntimeScope,
1934 cancellation: Option<&dyn CancellationSignal>,
1935 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1936 check_cancelled(cancellation)?;
1937 let scoped_input =
1938 scope
1939 .scoped_input(ScopeCapability::ConditionRead)
1940 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1941 node_id: node.id.clone(),
1942 source,
1943 })?;
1944 enforce_expression_scope_budget(
1945 &node.id,
1946 &scoped_input,
1947 self.options.security_limits.max_expression_scope_bytes,
1948 )?;
1949 let evaluated =
1950 expressions::evaluate_bool(spec.expression, &scoped_input).map_err(|reason| {
1951 WorkflowRuntimeError::InvalidCondition {
1952 node_id: node.id.clone(),
1953 expression: spec.expression.to_string(),
1954 reason: reason.to_string(),
1955 }
1956 })?;
1957 let next = if evaluated {
1958 spec.on_true.to_string()
1959 } else {
1960 spec.on_false.to_string()
1961 };
1962
1963 scope
1964 .record_condition_output(&node.id, evaluated, ScopeCapability::ConditionWrite)
1965 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1966 node_id: node.id.clone(),
1967 source,
1968 })?;
1969
1970 Ok(NodeExecution {
1971 step,
1972 node_id: node.id.clone(),
1973 data: NodeExecutionData::Condition {
1974 expression: spec.expression.to_string(),
1975 evaluated,
1976 next,
1977 },
1978 })
1979 }
1980
1981 async fn execute_parallel_node(
1982 &self,
1983 step: usize,
1984 node: &Node,
1985 spec: ParallelNodeSpec<'_>,
1986 scope: &mut RuntimeScope,
1987 cancellation: Option<&dyn CancellationSignal>,
1988 retry_events: &mut Vec<WorkflowRetryEvent>,
1989 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1990 check_cancelled(cancellation)?;
1991 if spec.branches.len() > self.options.security_limits.max_parallel_branches {
1992 return Err(WorkflowRuntimeError::ParallelBranchLimitExceeded {
1993 node_id: node.id.clone(),
1994 actual_branches: spec.branches.len(),
1995 max_branches: self.options.security_limits.max_parallel_branches,
1996 });
1997 }
1998 let base_scope = scope
1999 .scoped_input(ScopeCapability::MapRead)
2000 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2001 node_id: node.id.clone(),
2002 source,
2003 })?;
2004 let scheduler = DagScheduler::new(
2005 spec.max_in_flight
2006 .unwrap_or(self.options.scheduler_max_in_flight),
2007 );
2008 let parallel_node_id = node.id.clone();
2009
2010 let branch_outputs: Vec<(String, Value, Vec<WorkflowRetryEvent>)> = scheduler
2011 .run_bounded(spec.branches.iter().cloned(), |branch_id| {
2012 let parallel_node_id = parallel_node_id.clone();
2013 let base_scope = base_scope.clone();
2014 async move {
2015 let branch_node = spec.node_index.get(branch_id.as_str()).ok_or_else(|| {
2016 WorkflowRuntimeError::NodeNotFound {
2017 node_id: branch_id.clone(),
2018 }
2019 })?;
2020 self.execute_parallel_branch(
2021 step,
2022 ¶llel_node_id,
2023 branch_node,
2024 base_scope,
2025 cancellation,
2026 )
2027 .await
2028 }
2029 })
2030 .await?;
2031
2032 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
2033 for (branch_id, output, branch_retry_events) in branch_outputs {
2034 retry_events.extend(branch_retry_events);
2035 scope
2036 .record_node_output(&branch_id, output.clone(), ScopeCapability::MapWrite)
2037 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2038 node_id: node.id.clone(),
2039 source,
2040 })?;
2041 outputs.insert(branch_id, output);
2042 }
2043
2044 scope
2045 .record_node_output(
2046 &node.id,
2047 Value::Object(
2048 outputs
2049 .iter()
2050 .map(|(key, value)| (key.clone(), value.clone()))
2051 .collect(),
2052 ),
2053 ScopeCapability::MapWrite,
2054 )
2055 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2056 node_id: node.id.clone(),
2057 source,
2058 })?;
2059
2060 Ok(NodeExecution {
2061 step,
2062 node_id: node.id.clone(),
2063 data: NodeExecutionData::Parallel {
2064 branches: spec.branches.to_vec(),
2065 outputs,
2066 next: spec.next.to_string(),
2067 },
2068 })
2069 }
2070
2071 fn execute_merge_node(
2072 &self,
2073 step: usize,
2074 node: &Node,
2075 spec: MergeNodeSpec<'_>,
2076 scope: &mut RuntimeScope,
2077 ) -> Result<NodeExecution, WorkflowRuntimeError> {
2078 let mut resolved = Vec::with_capacity(spec.sources.len());
2079 for source in spec.sources {
2080 let Some(value) = scope.node_output(source).cloned() else {
2081 return Err(WorkflowRuntimeError::MissingMergeSource {
2082 node_id: node.id.clone(),
2083 source_id: source.clone(),
2084 });
2085 };
2086 resolved.push((source.clone(), value));
2087 }
2088
2089 let output = match spec.policy {
2090 MergePolicy::First => resolved
2091 .first()
2092 .map(|(_, value)| value.clone())
2093 .unwrap_or(Value::Null),
2094 MergePolicy::All => Value::Array(
2095 resolved
2096 .iter()
2097 .map(|(_, value)| value.clone())
2098 .collect::<Vec<_>>(),
2099 ),
2100 MergePolicy::Quorum => {
2101 let required = spec.quorum.unwrap_or_default();
2102 let resolved_count = resolved.len();
2103 if resolved_count < required {
2104 return Err(WorkflowRuntimeError::MergeQuorumNotMet {
2105 node_id: node.id.clone(),
2106 required,
2107 resolved: resolved_count,
2108 });
2109 }
2110 Value::Array(
2111 resolved
2112 .iter()
2113 .take(required)
2114 .map(|(_, value)| value.clone())
2115 .collect::<Vec<_>>(),
2116 )
2117 }
2118 };
2119
2120 scope
2121 .record_node_output(&node.id, output.clone(), ScopeCapability::ReduceWrite)
2122 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2123 node_id: node.id.clone(),
2124 source,
2125 })?;
2126
2127 Ok(NodeExecution {
2128 step,
2129 node_id: node.id.clone(),
2130 data: NodeExecutionData::Merge {
2131 policy: spec.policy.clone(),
2132 sources: spec.sources.to_vec(),
2133 output,
2134 next: spec.next.to_string(),
2135 },
2136 })
2137 }
2138
2139 async fn execute_map_node(
2140 &self,
2141 step: usize,
2142 node: &Node,
2143 spec: MapNodeSpec<'_>,
2144 scope: &mut RuntimeScope,
2145 cancellation: Option<&dyn CancellationSignal>,
2146 retry_events: &mut Vec<WorkflowRetryEvent>,
2147 ) -> Result<NodeExecution, WorkflowRuntimeError> {
2148 let executor =
2149 self.tool_executor
2150 .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
2151 node_id: node.id.clone(),
2152 })?;
2153
2154 let scoped_input = scope
2155 .scoped_input(ScopeCapability::MapRead)
2156 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2157 node_id: node.id.clone(),
2158 source,
2159 })?;
2160 let items = resolve_path(&scoped_input, spec.items_path)
2161 .and_then(Value::as_array)
2162 .ok_or_else(|| WorkflowRuntimeError::MapItemsNotArray {
2163 node_id: node.id.clone(),
2164 items_path: spec.items_path.to_string(),
2165 })?
2166 .clone();
2167 if items.len() > self.options.security_limits.max_map_items {
2168 return Err(WorkflowRuntimeError::MapItemLimitExceeded {
2169 node_id: node.id.clone(),
2170 actual_items: items.len(),
2171 max_items: self.options.security_limits.max_map_items,
2172 });
2173 }
2174
2175 let scheduler = DagScheduler::new(
2176 spec.max_in_flight
2177 .unwrap_or(self.options.scheduler_max_in_flight),
2178 );
2179 let map_node = node.clone();
2180 let mapped: Vec<(Value, Vec<WorkflowRetryEvent>)> = scheduler
2181 .run_bounded(items.into_iter().enumerate(), |(index, item)| {
2182 let scoped_input = scoped_input.clone();
2183 let map_node = map_node.clone();
2184 async move {
2185 let item_scope = map_item_scoped_input(&scoped_input, &item, index);
2186 let (output, retries) = self
2187 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
2188 step,
2189 node: &map_node,
2190 tool: spec.tool,
2191 input: &item,
2192 executor,
2193 scoped_input: item_scope,
2194 cancellation,
2195 })
2196 .await?;
2197 Ok::<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError>((output, retries))
2198 }
2199 })
2200 .await?;
2201
2202 let mut outputs = Vec::with_capacity(mapped.len());
2203 for (output, local_retries) in mapped {
2204 outputs.push(output);
2205 retry_events.extend(local_retries);
2206 }
2207
2208 let output = Value::Array(outputs);
2209 scope
2210 .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
2211 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2212 node_id: node.id.clone(),
2213 source,
2214 })?;
2215
2216 Ok(NodeExecution {
2217 step,
2218 node_id: node.id.clone(),
2219 data: NodeExecutionData::Map {
2220 item_count: output.as_array().map_or(0, Vec::len),
2221 output,
2222 next: spec.next.to_string(),
2223 },
2224 })
2225 }
2226
2227 fn execute_reduce_node(
2228 &self,
2229 step: usize,
2230 node: &Node,
2231 source: &str,
2232 operation: &ReduceOperation,
2233 next: &str,
2234 scope: &mut RuntimeScope,
2235 ) -> Result<NodeExecution, WorkflowRuntimeError> {
2236 let source_value = scope.node_output(source).cloned().ok_or_else(|| {
2237 WorkflowRuntimeError::MissingMergeSource {
2238 node_id: node.id.clone(),
2239 source_id: source.to_string(),
2240 }
2241 })?;
2242
2243 let reduced = reduce_value(&node.id, source, operation, source_value)?;
2244 scope
2245 .record_node_output(&node.id, reduced.clone(), ScopeCapability::ReduceWrite)
2246 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2247 node_id: node.id.clone(),
2248 source,
2249 })?;
2250
2251 Ok(NodeExecution {
2252 step,
2253 node_id: node.id.clone(),
2254 data: NodeExecutionData::Reduce {
2255 operation: operation.clone(),
2256 output: reduced,
2257 next: next.to_string(),
2258 },
2259 })
2260 }
2261
2262 fn execute_batch_node(
2263 &self,
2264 step: usize,
2265 node: &Node,
2266 items_path: &str,
2267 next: &str,
2268 scope: &mut RuntimeScope,
2269 ) -> Result<NodeExecution, WorkflowRuntimeError> {
2270 let scoped = scope
2271 .scoped_input(ScopeCapability::MapRead)
2272 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2273 node_id: node.id.clone(),
2274 source,
2275 })?;
2276 let items = resolve_path(&scoped, items_path).ok_or_else(|| {
2277 WorkflowRuntimeError::BatchItemsNotArray {
2278 node_id: node.id.clone(),
2279 items_path: items_path.to_string(),
2280 }
2281 })?;
2282 let array = items
2283 .as_array()
2284 .ok_or_else(|| WorkflowRuntimeError::BatchItemsNotArray {
2285 node_id: node.id.clone(),
2286 items_path: items_path.to_string(),
2287 })?;
2288
2289 let output = Value::Array(array.clone());
2290 scope
2291 .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
2292 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2293 node_id: node.id.clone(),
2294 source,
2295 })?;
2296
2297 Ok(NodeExecution {
2298 step,
2299 node_id: node.id.clone(),
2300 data: NodeExecutionData::Batch {
2301 items_path: items_path.to_string(),
2302 item_count: array.len(),
2303 next: next.to_string(),
2304 },
2305 })
2306 }
2307
2308 fn execute_filter_node(
2309 &self,
2310 step: usize,
2311 node: &Node,
2312 items_path: &str,
2313 expression: &str,
2314 next: &str,
2315 scope: &mut RuntimeScope,
2316 ) -> Result<NodeExecution, WorkflowRuntimeError> {
2317 let scoped = scope
2318 .scoped_input(ScopeCapability::MapRead)
2319 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2320 node_id: node.id.clone(),
2321 source,
2322 })?;
2323 let items_value = resolve_path(&scoped, items_path).ok_or_else(|| {
2324 WorkflowRuntimeError::FilterItemsNotArray {
2325 node_id: node.id.clone(),
2326 items_path: items_path.to_string(),
2327 }
2328 })?;
2329 let array =
2330 items_value
2331 .as_array()
2332 .ok_or_else(|| WorkflowRuntimeError::FilterItemsNotArray {
2333 node_id: node.id.clone(),
2334 items_path: items_path.to_string(),
2335 })?;
2336 if array.len() > self.options.security_limits.max_filter_items {
2337 return Err(WorkflowRuntimeError::FilterItemLimitExceeded {
2338 node_id: node.id.clone(),
2339 actual_items: array.len(),
2340 max_items: self.options.security_limits.max_filter_items,
2341 });
2342 }
2343
2344 let mut kept = Vec::new();
2345 for (index, item) in array.iter().enumerate() {
2346 let mut eval_scope = scoped.clone();
2347 if let Some(object) = eval_scope.as_object_mut() {
2348 object.insert("item".to_string(), item.clone());
2349 object.insert("item_index".to_string(), Value::from(index as u64));
2350 }
2351 enforce_expression_scope_budget(
2352 &node.id,
2353 &eval_scope,
2354 self.options.security_limits.max_expression_scope_bytes,
2355 )?;
2356 let include =
2357 expressions::evaluate_bool(expression, &eval_scope).map_err(|reason| {
2358 WorkflowRuntimeError::InvalidFilterExpression {
2359 node_id: node.id.clone(),
2360 expression: expression.to_string(),
2361 reason: reason.to_string(),
2362 }
2363 })?;
2364 if include {
2365 kept.push(item.clone());
2366 }
2367 }
2368 let output = Value::Array(kept.clone());
2369 scope
2370 .record_node_output(&node.id, output, ScopeCapability::MapWrite)
2371 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2372 node_id: node.id.clone(),
2373 source,
2374 })?;
2375
2376 Ok(NodeExecution {
2377 step,
2378 node_id: node.id.clone(),
2379 data: NodeExecutionData::Filter {
2380 items_path: items_path.to_string(),
2381 expression: expression.to_string(),
2382 kept: kept.len(),
2383 next: next.to_string(),
2384 },
2385 })
2386 }
2387
2388 async fn execute_llm_with_policy(
2389 &self,
2390 step: usize,
2391 node: &Node,
2392 model: &str,
2393 prompt: &str,
2394 scope: &RuntimeScope,
2395 cancellation: Option<&dyn CancellationSignal>,
2396 ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
2397 let scoped_input = scope
2398 .scoped_input(ScopeCapability::LlmRead)
2399 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
2400 node_id: node.id.clone(),
2401 source,
2402 })?;
2403
2404 self.execute_llm_with_policy_for_scope(
2405 step,
2406 node,
2407 model,
2408 prompt,
2409 scoped_input,
2410 cancellation,
2411 )
2412 .await
2413 }
2414
2415 async fn execute_parallel_branch(
2416 &self,
2417 step: usize,
2418 parallel_node_id: &str,
2419 branch_node: &Node,
2420 scoped_input: Value,
2421 cancellation: Option<&dyn CancellationSignal>,
2422 ) -> Result<(String, Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
2423 match &branch_node.kind {
2424 NodeKind::Llm {
2425 model,
2426 prompt,
2427 next: _,
2428 } => {
2429 let (output, retries) = self
2430 .execute_llm_with_policy_for_scope(
2431 step,
2432 branch_node,
2433 model,
2434 prompt,
2435 scoped_input,
2436 cancellation,
2437 )
2438 .await?;
2439 Ok((
2440 branch_node.id.clone(),
2441 Value::String(output.content),
2442 retries,
2443 ))
2444 }
2445 NodeKind::Tool { tool, input, .. } => {
2446 let executor = self.tool_executor.ok_or_else(|| {
2447 WorkflowRuntimeError::MissingToolExecutor {
2448 node_id: branch_node.id.clone(),
2449 }
2450 })?;
2451 let (output, retries) = self
2452 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
2453 step,
2454 node: branch_node,
2455 tool,
2456 input,
2457 executor,
2458 scoped_input,
2459 cancellation,
2460 })
2461 .await?;
2462 Ok((branch_node.id.clone(), output, retries))
2463 }
2464 _ => Err(WorkflowRuntimeError::ParallelBranchUnsupported {
2465 node_id: parallel_node_id.to_string(),
2466 branch_id: branch_node.id.clone(),
2467 reason: "only llm/tool branches are supported".to_string(),
2468 }),
2469 }
2470 }
2471
2472 async fn execute_llm_with_policy_for_scope(
2473 &self,
2474 step: usize,
2475 node: &Node,
2476 model: &str,
2477 prompt: &str,
2478 scoped_input: Value,
2479 cancellation: Option<&dyn CancellationSignal>,
2480 ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
2481 let max_attempts = self.options.llm_node_policy.max_retries.saturating_add(1);
2482 let mut retry_events = Vec::new();
2483
2484 for attempt in 1..=max_attempts {
2485 check_cancelled(cancellation)?;
2486
2487 let execution = self.llm_executor.execute(LlmExecutionInput {
2488 node_id: node.id.clone(),
2489 model: model.to_string(),
2490 prompt: prompt.to_string(),
2491 scoped_input: scoped_input.clone(),
2492 });
2493
2494 let outcome = if let Some(timeout_duration) = self.options.llm_node_policy.timeout {
2495 match timeout(timeout_duration, execution).await {
2496 Ok(result) => result,
2497 Err(_) => {
2498 if attempt == max_attempts {
2499 return Err(WorkflowRuntimeError::LlmTimeout {
2500 node_id: node.id.clone(),
2501 timeout_ms: timeout_duration.as_millis(),
2502 attempts: attempt,
2503 });
2504 }
2505 retry_events.push(WorkflowRetryEvent {
2506 step,
2507 node_id: node.id.clone(),
2508 operation: "llm".to_string(),
2509 failed_attempt: attempt,
2510 reason: format!(
2511 "attempt {} timed out after {} ms",
2512 attempt,
2513 timeout_duration.as_millis()
2514 ),
2515 });
2516 check_cancelled(cancellation)?;
2517 continue;
2518 }
2519 }
2520 } else {
2521 execution.await
2522 };
2523
2524 match outcome {
2525 Ok(output) => return Ok((output, retry_events)),
2526 Err(last_error) => {
2527 if attempt == max_attempts {
2528 return Err(WorkflowRuntimeError::LlmRetryExhausted {
2529 node_id: node.id.clone(),
2530 attempts: attempt,
2531 last_error,
2532 });
2533 }
2534 retry_events.push(WorkflowRetryEvent {
2535 step,
2536 node_id: node.id.clone(),
2537 operation: "llm".to_string(),
2538 failed_attempt: attempt,
2539 reason: last_error.to_string(),
2540 });
2541 check_cancelled(cancellation)?;
2542 }
2543 }
2544 }
2545
2546 unreachable!("llm attempts loop always returns")
2547 }
2548
2549 async fn execute_tool_with_policy_for_scope(
2550 &self,
2551 request: ToolPolicyRequest<'_>,
2552 ) -> Result<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
2553 let ToolPolicyRequest {
2554 step,
2555 node,
2556 tool,
2557 input,
2558 executor,
2559 scoped_input,
2560 cancellation,
2561 } = request;
2562 let max_attempts = self.options.tool_node_policy.max_retries.saturating_add(1);
2563 let mut retry_events = Vec::new();
2564
2565 for attempt in 1..=max_attempts {
2566 check_cancelled(cancellation)?;
2567
2568 let execution = executor.execute_tool(ToolExecutionInput {
2569 node_id: node.id.clone(),
2570 tool: tool.to_string(),
2571 input: input.clone(),
2572 scoped_input: scoped_input.clone(),
2573 });
2574
2575 let outcome = if let Some(timeout_duration) = self.options.tool_node_policy.timeout {
2576 match timeout(timeout_duration, execution).await {
2577 Ok(result) => result,
2578 Err(_) => {
2579 if attempt == max_attempts {
2580 return Err(WorkflowRuntimeError::ToolTimeout {
2581 node_id: node.id.clone(),
2582 timeout_ms: timeout_duration.as_millis(),
2583 attempts: attempt,
2584 });
2585 }
2586 retry_events.push(WorkflowRetryEvent {
2587 step,
2588 node_id: node.id.clone(),
2589 operation: "tool".to_string(),
2590 failed_attempt: attempt,
2591 reason: format!(
2592 "attempt {} timed out after {} ms",
2593 attempt,
2594 timeout_duration.as_millis()
2595 ),
2596 });
2597 check_cancelled(cancellation)?;
2598 continue;
2599 }
2600 }
2601 } else {
2602 execution.await
2603 };
2604
2605 match outcome {
2606 Ok(output) => return Ok((output, retry_events)),
2607 Err(last_error) => {
2608 if attempt == max_attempts {
2609 return Err(WorkflowRuntimeError::ToolRetryExhausted {
2610 node_id: node.id.clone(),
2611 attempts: attempt,
2612 last_error,
2613 });
2614 }
2615 retry_events.push(WorkflowRetryEvent {
2616 step,
2617 node_id: node.id.clone(),
2618 operation: "tool".to_string(),
2619 failed_attempt: attempt,
2620 reason: last_error.to_string(),
2621 });
2622 check_cancelled(cancellation)?;
2623 }
2624 }
2625 }
2626
2627 unreachable!("tool attempts loop always returns")
2628 }
2629}
2630
2631#[derive(Debug)]
2632struct RuntimeScope {
2633 workflow_input: Value,
2634 node_outputs: BTreeMap<String, Value>,
2635 loop_iterations: HashMap<String, u32>,
2636 debounce_last_seen: HashMap<String, usize>,
2637 throttle_last_pass: HashMap<String, usize>,
2638 cache_entries: BTreeMap<String, Value>,
2639 last_llm_output: Option<String>,
2640 last_tool_output: Option<Value>,
2641}
2642
2643struct ToolPolicyRequest<'a> {
2644 step: usize,
2645 node: &'a Node,
2646 tool: &'a str,
2647 input: &'a Value,
2648 executor: &'a dyn ToolExecutor,
2649 scoped_input: Value,
2650 cancellation: Option<&'a dyn CancellationSignal>,
2651}
2652
2653struct LlmNodeSpec<'a> {
2654 model: &'a str,
2655 prompt: &'a str,
2656 next: &'a Option<String>,
2657}
2658
2659struct ToolNodeSpec<'a> {
2660 tool: &'a str,
2661 input: &'a Value,
2662 next: &'a Option<String>,
2663}
2664
2665struct ConditionNodeSpec<'a> {
2666 expression: &'a str,
2667 on_true: &'a str,
2668 on_false: &'a str,
2669}
2670
2671struct ParallelNodeSpec<'a> {
2672 node_index: &'a HashMap<&'a str, &'a Node>,
2673 branches: &'a [String],
2674 next: &'a str,
2675 max_in_flight: Option<usize>,
2676}
2677
2678struct MergeNodeSpec<'a> {
2679 sources: &'a [String],
2680 policy: &'a MergePolicy,
2681 quorum: Option<usize>,
2682 next: &'a str,
2683}
2684
2685struct MapNodeSpec<'a> {
2686 tool: &'a str,
2687 items_path: &'a str,
2688 next: &'a str,
2689 max_in_flight: Option<usize>,
2690}
2691
2692impl RuntimeScope {
2693 fn new(workflow_input: Value) -> Self {
2694 Self {
2695 workflow_input,
2696 node_outputs: BTreeMap::new(),
2697 loop_iterations: HashMap::new(),
2698 debounce_last_seen: HashMap::new(),
2699 throttle_last_pass: HashMap::new(),
2700 cache_entries: BTreeMap::new(),
2701 last_llm_output: None,
2702 last_tool_output: None,
2703 }
2704 }
2705
2706 fn scoped_input(&self, capability: ScopeCapability) -> Result<Value, ScopeAccessError> {
2707 if !matches!(
2708 capability,
2709 ScopeCapability::LlmRead
2710 | ScopeCapability::ToolRead
2711 | ScopeCapability::ConditionRead
2712 | ScopeCapability::MapRead
2713 ) {
2714 return Err(ScopeAccessError::ReadDenied {
2715 capability: capability.as_str(),
2716 });
2717 }
2718
2719 let mut object = Map::new();
2720 object.insert("input".to_string(), self.workflow_input.clone());
2721 object.insert(
2722 "last_llm_output".to_string(),
2723 self.last_llm_output
2724 .as_ref()
2725 .map_or(Value::Null, |value| Value::String(value.clone())),
2726 );
2727 object.insert(
2728 "last_tool_output".to_string(),
2729 self.last_tool_output.clone().unwrap_or(Value::Null),
2730 );
2731 object.insert(
2732 "node_outputs".to_string(),
2733 Value::Object(
2734 self.node_outputs
2735 .iter()
2736 .map(|(key, value)| (key.clone(), value.clone()))
2737 .collect(),
2738 ),
2739 );
2740 Ok(Value::Object(object))
2741 }
2742
2743 fn record_llm_output(
2744 &mut self,
2745 node_id: &str,
2746 output: String,
2747 capability: ScopeCapability,
2748 ) -> Result<(), ScopeAccessError> {
2749 if capability != ScopeCapability::LlmWrite {
2750 return Err(ScopeAccessError::WriteDenied {
2751 capability: capability.as_str(),
2752 });
2753 }
2754
2755 self.last_llm_output = Some(output.clone());
2756 self.node_outputs
2757 .insert(node_id.to_string(), Value::String(output));
2758 Ok(())
2759 }
2760
2761 fn record_tool_output(
2762 &mut self,
2763 node_id: &str,
2764 output: Value,
2765 capability: ScopeCapability,
2766 ) -> Result<(), ScopeAccessError> {
2767 if capability != ScopeCapability::ToolWrite {
2768 return Err(ScopeAccessError::WriteDenied {
2769 capability: capability.as_str(),
2770 });
2771 }
2772
2773 self.last_tool_output = Some(output.clone());
2774 self.node_outputs.insert(node_id.to_string(), output);
2775 Ok(())
2776 }
2777
2778 fn record_condition_output(
2779 &mut self,
2780 node_id: &str,
2781 evaluated: bool,
2782 capability: ScopeCapability,
2783 ) -> Result<(), ScopeAccessError> {
2784 if capability != ScopeCapability::ConditionWrite {
2785 return Err(ScopeAccessError::WriteDenied {
2786 capability: capability.as_str(),
2787 });
2788 }
2789
2790 self.node_outputs
2791 .insert(node_id.to_string(), Value::Bool(evaluated));
2792 Ok(())
2793 }
2794
2795 fn record_node_output(
2796 &mut self,
2797 node_id: &str,
2798 output: Value,
2799 capability: ScopeCapability,
2800 ) -> Result<(), ScopeAccessError> {
2801 if !matches!(
2802 capability,
2803 ScopeCapability::MapWrite | ScopeCapability::ReduceWrite
2804 ) {
2805 return Err(ScopeAccessError::WriteDenied {
2806 capability: capability.as_str(),
2807 });
2808 }
2809
2810 self.node_outputs.insert(node_id.to_string(), output);
2811 Ok(())
2812 }
2813
2814 fn node_output(&self, node_id: &str) -> Option<&Value> {
2815 self.node_outputs.get(node_id)
2816 }
2817
2818 fn loop_iteration(&self, node_id: &str) -> u32 {
2819 self.loop_iterations.get(node_id).copied().unwrap_or(0)
2820 }
2821
2822 fn set_loop_iteration(&mut self, node_id: &str, iteration: u32) {
2823 self.loop_iterations.insert(node_id.to_string(), iteration);
2824 }
2825
2826 fn clear_loop_iteration(&mut self, node_id: &str) {
2827 self.loop_iterations.remove(node_id);
2828 }
2829
2830 fn debounce(&mut self, node_id: &str, key: &str, step: usize, window_steps: u32) -> bool {
2831 let namespaced = format!("{node_id}:{key}");
2832 let window = window_steps as usize;
2833 let suppressed = self
2834 .debounce_last_seen
2835 .get(&namespaced)
2836 .is_some_and(|last| step.saturating_sub(*last) < window);
2837 self.debounce_last_seen.insert(namespaced, step);
2838 suppressed
2839 }
2840
2841 fn throttle(&mut self, node_id: &str, key: &str, step: usize, window_steps: u32) -> bool {
2842 let namespaced = format!("{node_id}:{key}");
2843 let window = window_steps as usize;
2844 let throttled = self
2845 .throttle_last_pass
2846 .get(&namespaced)
2847 .is_some_and(|last| step.saturating_sub(*last) < window);
2848 if !throttled {
2849 self.throttle_last_pass.insert(namespaced, step);
2850 }
2851 throttled
2852 }
2853
2854 fn put_cache(&mut self, key: &str, value: Value) {
2855 self.cache_entries.insert(key.to_string(), value);
2856 }
2857
2858 fn cache_value(&self, key: &str) -> Option<&Value> {
2859 self.cache_entries.get(key)
2860 }
2861}
2862
2863fn check_cancelled(
2864 cancellation: Option<&dyn CancellationSignal>,
2865) -> Result<(), WorkflowRuntimeError> {
2866 if cancellation.is_some_and(CancellationSignal::is_cancelled) {
2867 Err(WorkflowRuntimeError::Cancelled)
2868 } else {
2869 Ok(())
2870 }
2871}
2872
2873fn next_trace_timestamp(clock: &mut u64) -> u64 {
2874 let timestamp = *clock;
2875 *clock = clock.saturating_add(1);
2876 timestamp
2877}
2878
2879fn build_prompt_with_scope(prompt: &str, scoped_input: &Value) -> String {
2880 format!("{}\n\nScoped context:\n{}", prompt, scoped_input)
2881}
2882
2883fn build_node_index(workflow: &WorkflowDefinition) -> HashMap<&str, &Node> {
2884 let mut index = HashMap::with_capacity(workflow.nodes.len());
2885 for node in &workflow.nodes {
2886 index.insert(node.id.as_str(), node);
2887 }
2888 index
2889}
2890
2891fn find_start_node_id(workflow: &WorkflowDefinition) -> Result<String, WorkflowRuntimeError> {
2892 workflow
2893 .nodes
2894 .iter()
2895 .find_map(|node| match node.kind {
2896 NodeKind::Start { .. } => Some(node.id.clone()),
2897 _ => None,
2898 })
2899 .ok_or(WorkflowRuntimeError::MissingStartNode)
2900}
2901
2902fn next_node_id(data: &NodeExecutionData) -> Option<String> {
2903 match data {
2904 NodeExecutionData::Start { next }
2905 | NodeExecutionData::Llm { next, .. }
2906 | NodeExecutionData::Tool { next, .. }
2907 | NodeExecutionData::Condition { next, .. }
2908 | NodeExecutionData::Debounce { next, .. }
2909 | NodeExecutionData::Throttle { next, .. }
2910 | NodeExecutionData::RetryCompensate { next, .. }
2911 | NodeExecutionData::HumanInTheLoop { next, .. }
2912 | NodeExecutionData::CacheRead { next, .. }
2913 | NodeExecutionData::CacheWrite { next, .. }
2914 | NodeExecutionData::EventTrigger { next, .. }
2915 | NodeExecutionData::Router { next, .. }
2916 | NodeExecutionData::Transform { next, .. }
2917 | NodeExecutionData::Loop { next, .. }
2918 | NodeExecutionData::Subgraph { next, .. }
2919 | NodeExecutionData::Batch { next, .. }
2920 | NodeExecutionData::Filter { next, .. }
2921 | NodeExecutionData::Parallel { next, .. }
2922 | NodeExecutionData::Merge { next, .. }
2923 | NodeExecutionData::Map { next, .. }
2924 | NodeExecutionData::Reduce { next, .. } => Some(next.clone()),
2925 NodeExecutionData::End => None,
2926 }
2927}
2928
2929fn resolve_path<'a>(scope: &'a Value, path: &str) -> Option<&'a Value> {
2930 let mut current = scope;
2931 for segment in path.split('.') {
2932 if segment.is_empty() {
2933 continue;
2934 }
2935 current = current.get(segment)?;
2936 }
2937 Some(current)
2938}
2939
2940fn resolve_string_path(scope: &Value, path: &str) -> Option<String> {
2941 resolve_path(scope, path)
2942 .and_then(Value::as_str)
2943 .map(str::to_string)
2944}
2945
2946fn evaluate_human_decision(value: &Value) -> Option<bool> {
2947 match value {
2948 Value::Bool(flag) => Some(*flag),
2949 Value::String(text) => {
2950 let normalized = text.trim().to_ascii_lowercase();
2951 match normalized.as_str() {
2952 "approve" | "approved" | "yes" | "true" => Some(true),
2953 "reject" | "rejected" | "no" | "false" => Some(false),
2954 _ => None,
2955 }
2956 }
2957 _ => None,
2958 }
2959}
2960
2961fn evaluate_transform_expression(expression: &str, scope: &Value) -> Result<Value, String> {
2962 let trimmed = expression.trim();
2963 if trimmed.is_empty() {
2964 return Err("expression is empty".to_string());
2965 }
2966
2967 if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
2968 return Ok(value);
2969 }
2970
2971 let path = trimmed.strip_prefix("$.").unwrap_or(trimmed);
2972 resolve_path(scope, path)
2973 .cloned()
2974 .ok_or_else(|| format!("path '{path}' not found in scoped input"))
2975}
2976
2977fn map_item_scoped_input(base_scope: &Value, item: &Value, index: usize) -> Value {
2978 let mut object = match base_scope {
2979 Value::Object(map) => map.clone(),
2980 _ => Map::new(),
2981 };
2982 object.insert("map_item".to_string(), item.clone());
2983 object.insert("map_index".to_string(), Value::from(index as u64));
2984 Value::Object(object)
2985}
2986
2987fn enforce_expression_scope_budget(
2988 node_id: &str,
2989 scoped_input: &Value,
2990 max_expression_scope_bytes: usize,
2991) -> Result<(), WorkflowRuntimeError> {
2992 let size = serde_json::to_vec(scoped_input)
2993 .map(|bytes| bytes.len())
2994 .unwrap_or(max_expression_scope_bytes.saturating_add(1));
2995 if size > max_expression_scope_bytes {
2996 return Err(WorkflowRuntimeError::ExpressionScopeLimitExceeded {
2997 node_id: node_id.to_string(),
2998 actual_bytes: size,
2999 limit_bytes: max_expression_scope_bytes,
3000 });
3001 }
3002 Ok(())
3003}
3004
3005fn reduce_value(
3006 node_id: &str,
3007 source: &str,
3008 operation: &ReduceOperation,
3009 source_value: Value,
3010) -> Result<Value, WorkflowRuntimeError> {
3011 let items =
3012 source_value
3013 .as_array()
3014 .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
3015 node_id: node_id.to_string(),
3016 source_node: source.to_string(),
3017 reason: "expected source output to be an array".to_string(),
3018 })?;
3019
3020 match operation {
3021 ReduceOperation::Count => Ok(Value::from(items.len() as u64)),
3022 ReduceOperation::Sum => {
3023 let mut sum = 0.0f64;
3024 for value in items {
3025 let number =
3026 value
3027 .as_f64()
3028 .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
3029 node_id: node_id.to_string(),
3030 source_node: source.to_string(),
3031 reason: "sum operation requires numeric array values".to_string(),
3032 })?;
3033 sum += number;
3034 }
3035 let number = serde_json::Number::from_f64(sum).ok_or_else(|| {
3036 WorkflowRuntimeError::InvalidReduceInput {
3037 node_id: node_id.to_string(),
3038 source_node: source.to_string(),
3039 reason: "sum produced non-finite value".to_string(),
3040 }
3041 })?;
3042 Ok(Value::Number(number))
3043 }
3044 }
3045}
3046
3047#[cfg(test)]
3048mod tests {
3049 use std::sync::atomic::{AtomicUsize, Ordering};
3050 use std::sync::{Arc, Mutex};
3051 use std::time::Duration;
3052
3053 use async_trait::async_trait;
3054 use serde_json::json;
3055 use tokio::time::sleep;
3056
3057 use super::*;
3058 use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
3059
3060 struct MockLlmExecutor {
3061 output: String,
3062 }
3063
3064 #[async_trait]
3065 impl LlmExecutor for MockLlmExecutor {
3066 async fn execute(
3067 &self,
3068 _input: LlmExecutionInput,
3069 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3070 Ok(LlmExecutionOutput {
3071 content: self.output.clone(),
3072 })
3073 }
3074 }
3075
3076 struct MockToolExecutor {
3077 output: Value,
3078 fail: bool,
3079 }
3080
3081 #[async_trait]
3082 impl ToolExecutor for MockToolExecutor {
3083 async fn execute_tool(
3084 &self,
3085 input: ToolExecutionInput,
3086 ) -> Result<Value, ToolExecutionError> {
3087 if self.fail {
3088 return Err(ToolExecutionError::Failed(format!(
3089 "tool '{}' failed intentionally",
3090 input.tool
3091 )));
3092 }
3093 Ok(self.output.clone())
3094 }
3095 }
3096
3097 struct SequencedLlmExecutor {
3098 responses: Mutex<Vec<Result<LlmExecutionOutput, LlmExecutionError>>>,
3099 calls: AtomicUsize,
3100 }
3101
3102 #[async_trait]
3103 impl LlmExecutor for SequencedLlmExecutor {
3104 async fn execute(
3105 &self,
3106 _input: LlmExecutionInput,
3107 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3108 self.calls.fetch_add(1, Ordering::Relaxed);
3109 self.responses
3110 .lock()
3111 .expect("sequenced llm lock poisoned")
3112 .remove(0)
3113 }
3114 }
3115
3116 struct SlowToolExecutor {
3117 delay: Duration,
3118 }
3119
3120 #[async_trait]
3121 impl ToolExecutor for SlowToolExecutor {
3122 async fn execute_tool(
3123 &self,
3124 _input: ToolExecutionInput,
3125 ) -> Result<Value, ToolExecutionError> {
3126 sleep(self.delay).await;
3127 Ok(json!({"status": "slow-ok"}))
3128 }
3129 }
3130
3131 struct IncrementingToolExecutor {
3132 value: AtomicUsize,
3133 }
3134
3135 #[async_trait]
3136 impl ToolExecutor for IncrementingToolExecutor {
3137 async fn execute_tool(
3138 &self,
3139 _input: ToolExecutionInput,
3140 ) -> Result<Value, ToolExecutionError> {
3141 let next = self.value.fetch_add(1, Ordering::Relaxed) + 1;
3142 Ok(json!(next))
3143 }
3144 }
3145
3146 struct EchoInputToolExecutor;
3147
3148 #[async_trait]
3149 impl ToolExecutor for EchoInputToolExecutor {
3150 async fn execute_tool(
3151 &self,
3152 input: ToolExecutionInput,
3153 ) -> Result<Value, ToolExecutionError> {
3154 Ok(input.input)
3155 }
3156 }
3157
3158 struct RetryCompensateToolExecutor {
3159 attempts: AtomicUsize,
3160 }
3161
3162 #[async_trait]
3163 impl ToolExecutor for RetryCompensateToolExecutor {
3164 async fn execute_tool(
3165 &self,
3166 input: ToolExecutionInput,
3167 ) -> Result<Value, ToolExecutionError> {
3168 match input.tool.as_str() {
3169 "unstable_primary" => {
3170 let current = self.attempts.fetch_add(1, Ordering::Relaxed) + 1;
3171 if current <= 1 {
3172 Err(ToolExecutionError::Failed("primary failed".to_string()))
3173 } else {
3174 Ok(json!({"primary_attempt": current}))
3175 }
3176 }
3177 "always_fail" => Err(ToolExecutionError::Failed("always fail".to_string())),
3178 "compensate" => Ok(json!({"compensated": true})),
3179 _ => Err(ToolExecutionError::NotFound {
3180 tool: input.tool.clone(),
3181 }),
3182 }
3183 }
3184 }
3185
3186 struct CancellingLlmExecutor {
3187 cancel_flag: Arc<AtomicBool>,
3188 calls: AtomicUsize,
3189 }
3190
3191 #[async_trait]
3192 impl LlmExecutor for CancellingLlmExecutor {
3193 async fn execute(
3194 &self,
3195 _input: LlmExecutionInput,
3196 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
3197 self.calls.fetch_add(1, Ordering::Relaxed);
3198 self.cancel_flag.store(true, Ordering::Relaxed);
3199 Err(LlmExecutionError::Client("transient failure".to_string()))
3200 }
3201 }
3202
3203 fn linear_workflow() -> WorkflowDefinition {
3204 WorkflowDefinition {
3205 version: "v0".to_string(),
3206 name: "linear".to_string(),
3207 nodes: vec![
3208 Node {
3209 id: "start".to_string(),
3210 kind: NodeKind::Start {
3211 next: "llm".to_string(),
3212 },
3213 },
3214 Node {
3215 id: "llm".to_string(),
3216 kind: NodeKind::Llm {
3217 model: "gpt-4".to_string(),
3218 prompt: "Summarize".to_string(),
3219 next: Some("tool".to_string()),
3220 },
3221 },
3222 Node {
3223 id: "tool".to_string(),
3224 kind: NodeKind::Tool {
3225 tool: "extract".to_string(),
3226 input: json!({"k": "v"}),
3227 next: Some("end".to_string()),
3228 },
3229 },
3230 Node {
3231 id: "end".to_string(),
3232 kind: NodeKind::End,
3233 },
3234 ],
3235 }
3236 }
3237
3238 fn llm_only_workflow() -> WorkflowDefinition {
3239 WorkflowDefinition {
3240 version: "v0".to_string(),
3241 name: "llm-only".to_string(),
3242 nodes: vec![
3243 Node {
3244 id: "start".to_string(),
3245 kind: NodeKind::Start {
3246 next: "llm".to_string(),
3247 },
3248 },
3249 Node {
3250 id: "llm".to_string(),
3251 kind: NodeKind::Llm {
3252 model: "gpt-4".to_string(),
3253 prompt: "Summarize".to_string(),
3254 next: Some("end".to_string()),
3255 },
3256 },
3257 Node {
3258 id: "end".to_string(),
3259 kind: NodeKind::End,
3260 },
3261 ],
3262 }
3263 }
3264
3265 fn loop_workflow(max_iterations: Option<u32>) -> WorkflowDefinition {
3266 WorkflowDefinition {
3267 version: "v0".to_string(),
3268 name: "loop-workflow".to_string(),
3269 nodes: vec![
3270 Node {
3271 id: "start".to_string(),
3272 kind: NodeKind::Start {
3273 next: "loop".to_string(),
3274 },
3275 },
3276 Node {
3277 id: "loop".to_string(),
3278 kind: NodeKind::Loop {
3279 condition: "last_tool_output != 3".to_string(),
3280 body: "counter".to_string(),
3281 next: "end".to_string(),
3282 max_iterations,
3283 },
3284 },
3285 Node {
3286 id: "counter".to_string(),
3287 kind: NodeKind::Tool {
3288 tool: "counter".to_string(),
3289 input: json!({}),
3290 next: Some("loop".to_string()),
3291 },
3292 },
3293 Node {
3294 id: "end".to_string(),
3295 kind: NodeKind::End,
3296 },
3297 ],
3298 }
3299 }
3300
3301 fn parallel_merge_workflow(policy: MergePolicy, quorum: Option<usize>) -> WorkflowDefinition {
3302 WorkflowDefinition {
3303 version: "v0".to_string(),
3304 name: "parallel-merge".to_string(),
3305 nodes: vec![
3306 Node {
3307 id: "start".to_string(),
3308 kind: NodeKind::Start {
3309 next: "parallel".to_string(),
3310 },
3311 },
3312 Node {
3313 id: "parallel".to_string(),
3314 kind: NodeKind::Parallel {
3315 branches: vec!["tool_a".to_string(), "tool_b".to_string()],
3316 next: "merge".to_string(),
3317 max_in_flight: Some(2),
3318 },
3319 },
3320 Node {
3321 id: "tool_a".to_string(),
3322 kind: NodeKind::Tool {
3323 tool: "extract".to_string(),
3324 input: json!({"value": 1}),
3325 next: Some("end".to_string()),
3326 },
3327 },
3328 Node {
3329 id: "tool_b".to_string(),
3330 kind: NodeKind::Tool {
3331 tool: "extract".to_string(),
3332 input: json!({"value": 2}),
3333 next: Some("end".to_string()),
3334 },
3335 },
3336 Node {
3337 id: "merge".to_string(),
3338 kind: NodeKind::Merge {
3339 sources: vec!["tool_a".to_string(), "tool_b".to_string()],
3340 policy,
3341 quorum,
3342 next: "end".to_string(),
3343 },
3344 },
3345 Node {
3346 id: "end".to_string(),
3347 kind: NodeKind::End,
3348 },
3349 ],
3350 }
3351 }
3352
3353 fn map_reduce_workflow(operation: ReduceOperation) -> WorkflowDefinition {
3354 WorkflowDefinition {
3355 version: "v0".to_string(),
3356 name: "map-reduce".to_string(),
3357 nodes: vec![
3358 Node {
3359 id: "start".to_string(),
3360 kind: NodeKind::Start {
3361 next: "map".to_string(),
3362 },
3363 },
3364 Node {
3365 id: "map".to_string(),
3366 kind: NodeKind::Map {
3367 tool: "counter".to_string(),
3368 items_path: "input.values".to_string(),
3369 next: "reduce".to_string(),
3370 max_in_flight: Some(3),
3371 },
3372 },
3373 Node {
3374 id: "reduce".to_string(),
3375 kind: NodeKind::Reduce {
3376 source: "map".to_string(),
3377 operation,
3378 next: "end".to_string(),
3379 },
3380 },
3381 Node {
3382 id: "end".to_string(),
3383 kind: NodeKind::End,
3384 },
3385 ],
3386 }
3387 }
3388
3389 fn debounce_and_throttle_workflow() -> WorkflowDefinition {
3390 WorkflowDefinition {
3391 version: "v0".to_string(),
3392 name: "debounce-throttle".to_string(),
3393 nodes: vec![
3394 Node {
3395 id: "start".to_string(),
3396 kind: NodeKind::Start {
3397 next: "debounce_a".to_string(),
3398 },
3399 },
3400 Node {
3401 id: "debounce_a".to_string(),
3402 kind: NodeKind::Debounce {
3403 key_path: "input.key".to_string(),
3404 window_steps: 3,
3405 next: "debounce_a".to_string(),
3406 on_suppressed: Some("throttle_a".to_string()),
3407 },
3408 },
3409 Node {
3410 id: "throttle_a".to_string(),
3411 kind: NodeKind::Throttle {
3412 key_path: "input.key".to_string(),
3413 window_steps: 3,
3414 next: "throttle_a".to_string(),
3415 on_throttled: Some("end_throttled".to_string()),
3416 },
3417 },
3418 Node {
3419 id: "end_throttled".to_string(),
3420 kind: NodeKind::End,
3421 },
3422 ],
3423 }
3424 }
3425
3426 fn extended_nodes_workflow() -> WorkflowDefinition {
3427 WorkflowDefinition {
3428 version: "v0".to_string(),
3429 name: "extended-nodes".to_string(),
3430 nodes: vec![
3431 Node {
3432 id: "start".to_string(),
3433 kind: NodeKind::Start {
3434 next: "event".to_string(),
3435 },
3436 },
3437 Node {
3438 id: "event".to_string(),
3439 kind: NodeKind::EventTrigger {
3440 event: "webhook".to_string(),
3441 event_path: "input.event_type".to_string(),
3442 next: "cache_write".to_string(),
3443 on_mismatch: Some("end_mismatch".to_string()),
3444 },
3445 },
3446 Node {
3447 id: "cache_write".to_string(),
3448 kind: NodeKind::CacheWrite {
3449 key_path: "input.cache_key".to_string(),
3450 value_path: "input.payload".to_string(),
3451 next: "cache_read".to_string(),
3452 },
3453 },
3454 Node {
3455 id: "cache_read".to_string(),
3456 kind: NodeKind::CacheRead {
3457 key_path: "input.cache_key".to_string(),
3458 next: "router".to_string(),
3459 on_miss: Some("end_miss".to_string()),
3460 },
3461 },
3462 Node {
3463 id: "router".to_string(),
3464 kind: NodeKind::Router {
3465 routes: vec![crate::ir::RouterRoute {
3466 when: "input.mode == 'manual'".to_string(),
3467 next: "human".to_string(),
3468 }],
3469 default: "transform".to_string(),
3470 },
3471 },
3472 Node {
3473 id: "human".to_string(),
3474 kind: NodeKind::HumanInTheLoop {
3475 decision_path: "input.approval".to_string(),
3476 response_path: Some("input.review_notes".to_string()),
3477 on_approve: "transform".to_string(),
3478 on_reject: "end_rejected".to_string(),
3479 },
3480 },
3481 Node {
3482 id: "transform".to_string(),
3483 kind: NodeKind::Transform {
3484 expression: "node_outputs.cache_read.value".to_string(),
3485 next: "end".to_string(),
3486 },
3487 },
3488 Node {
3489 id: "end".to_string(),
3490 kind: NodeKind::End,
3491 },
3492 Node {
3493 id: "end_mismatch".to_string(),
3494 kind: NodeKind::End,
3495 },
3496 Node {
3497 id: "end_miss".to_string(),
3498 kind: NodeKind::End,
3499 },
3500 Node {
3501 id: "end_rejected".to_string(),
3502 kind: NodeKind::End,
3503 },
3504 ],
3505 }
3506 }
3507
3508 fn retry_compensate_workflow(primary_tool: &str) -> WorkflowDefinition {
3509 WorkflowDefinition {
3510 version: "v0".to_string(),
3511 name: "retry-compensate".to_string(),
3512 nodes: vec![
3513 Node {
3514 id: "start".to_string(),
3515 kind: NodeKind::Start {
3516 next: "retry_comp".to_string(),
3517 },
3518 },
3519 Node {
3520 id: "retry_comp".to_string(),
3521 kind: NodeKind::RetryCompensate {
3522 tool: primary_tool.to_string(),
3523 input: json!({"job": "run"}),
3524 max_retries: 1,
3525 compensate_tool: "compensate".to_string(),
3526 compensate_input: json!({"job": "rollback"}),
3527 next: "end".to_string(),
3528 on_compensated: Some("end_compensated".to_string()),
3529 },
3530 },
3531 Node {
3532 id: "end".to_string(),
3533 kind: NodeKind::End,
3534 },
3535 Node {
3536 id: "end_compensated".to_string(),
3537 kind: NodeKind::End,
3538 },
3539 ],
3540 }
3541 }
3542
3543 #[tokio::test]
3544 async fn executes_happy_path_linear_flow() {
3545 let llm = MockLlmExecutor {
3546 output: "ok".to_string(),
3547 };
3548 let tools = MockToolExecutor {
3549 output: json!({"status": "done"}),
3550 fail: false,
3551 };
3552 let runtime = WorkflowRuntime::new(
3553 linear_workflow(),
3554 &llm,
3555 Some(&tools),
3556 WorkflowRuntimeOptions::default(),
3557 );
3558
3559 let result = runtime
3560 .execute(json!({"request_id": "r1"}), None)
3561 .await
3562 .expect("linear workflow should succeed");
3563
3564 assert_eq!(result.workflow_name, "linear");
3565 assert_eq!(result.terminal_node_id, "end");
3566 assert_eq!(result.node_executions.len(), 4);
3567 assert_eq!(
3568 result.node_outputs.get("llm"),
3569 Some(&Value::String("ok".to_string()))
3570 );
3571 assert_eq!(
3572 result.node_outputs.get("tool"),
3573 Some(&json!({"status": "done"}))
3574 );
3575 assert_eq!(result.events.len(), 8);
3576 assert!(result.retry_events.is_empty());
3577 assert!(result.trace.is_some());
3578 assert_eq!(result.replay_report, None);
3579 }
3580
3581 #[tokio::test]
3582 async fn linear_flow_records_expected_node_execution_payloads() {
3583 let llm = MockLlmExecutor {
3584 output: "ok".to_string(),
3585 };
3586 let tools = MockToolExecutor {
3587 output: json!({"status": "done"}),
3588 fail: false,
3589 };
3590 let runtime = WorkflowRuntime::new(
3591 linear_workflow(),
3592 &llm,
3593 Some(&tools),
3594 WorkflowRuntimeOptions::default(),
3595 );
3596
3597 let result = runtime
3598 .execute(json!({"request_id": "r1"}), None)
3599 .await
3600 .expect("linear workflow should succeed");
3601
3602 assert!(matches!(
3603 result.node_executions[0].data,
3604 NodeExecutionData::Start { ref next } if next == "llm"
3605 ));
3606 assert!(matches!(
3607 result.node_executions[1].data,
3608 NodeExecutionData::Llm {
3609 ref model,
3610 ref output,
3611 ref next
3612 } if model == "gpt-4" && output == "ok" && next == "tool"
3613 ));
3614 assert!(matches!(
3615 result.node_executions[2].data,
3616 NodeExecutionData::Tool {
3617 ref tool,
3618 ref next,
3619 ..
3620 } if tool == "extract" && next == "end"
3621 ));
3622 }
3623
3624 #[tokio::test]
3625 async fn executes_conditional_branching() {
3626 let workflow = WorkflowDefinition {
3627 version: "v0".to_string(),
3628 name: "conditional".to_string(),
3629 nodes: vec![
3630 Node {
3631 id: "start".to_string(),
3632 kind: NodeKind::Start {
3633 next: "condition".to_string(),
3634 },
3635 },
3636 Node {
3637 id: "condition".to_string(),
3638 kind: NodeKind::Condition {
3639 expression: "input.approved".to_string(),
3640 on_true: "end_true".to_string(),
3641 on_false: "end_false".to_string(),
3642 },
3643 },
3644 Node {
3645 id: "end_true".to_string(),
3646 kind: NodeKind::End,
3647 },
3648 Node {
3649 id: "end_false".to_string(),
3650 kind: NodeKind::End,
3651 },
3652 ],
3653 };
3654
3655 let llm = MockLlmExecutor {
3656 output: "unused".to_string(),
3657 };
3658 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3659
3660 let result = runtime
3661 .execute(json!({"approved": true}), None)
3662 .await
3663 .expect("conditional workflow should succeed");
3664
3665 assert_eq!(result.terminal_node_id, "end_true");
3666 }
3667
3668 #[tokio::test]
3669 async fn executes_conditional_false_branch() {
3670 let workflow = WorkflowDefinition {
3671 version: "v0".to_string(),
3672 name: "conditional-false".to_string(),
3673 nodes: vec![
3674 Node {
3675 id: "start".to_string(),
3676 kind: NodeKind::Start {
3677 next: "condition".to_string(),
3678 },
3679 },
3680 Node {
3681 id: "condition".to_string(),
3682 kind: NodeKind::Condition {
3683 expression: "input.approved".to_string(),
3684 on_true: "end_true".to_string(),
3685 on_false: "end_false".to_string(),
3686 },
3687 },
3688 Node {
3689 id: "end_true".to_string(),
3690 kind: NodeKind::End,
3691 },
3692 Node {
3693 id: "end_false".to_string(),
3694 kind: NodeKind::End,
3695 },
3696 ],
3697 };
3698 let llm = MockLlmExecutor {
3699 output: "unused".to_string(),
3700 };
3701 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3702
3703 let result = runtime
3704 .execute(json!({"approved": false}), None)
3705 .await
3706 .expect("conditional workflow should take false branch");
3707
3708 assert_eq!(result.terminal_node_id, "end_false");
3709 }
3710
3711 #[tokio::test]
3712 async fn fails_when_tool_executor_is_missing() {
3713 let llm = MockLlmExecutor {
3714 output: "ok".to_string(),
3715 };
3716 let runtime = WorkflowRuntime::new(
3717 linear_workflow(),
3718 &llm,
3719 None,
3720 WorkflowRuntimeOptions::default(),
3721 );
3722
3723 let error = runtime
3724 .execute(json!({}), None)
3725 .await
3726 .expect_err("workflow should fail without tool executor");
3727
3728 assert!(matches!(
3729 error,
3730 WorkflowRuntimeError::MissingToolExecutor { node_id } if node_id == "tool"
3731 ));
3732 }
3733
3734 #[tokio::test]
3735 async fn fails_on_tool_execution_error() {
3736 let llm = MockLlmExecutor {
3737 output: "ok".to_string(),
3738 };
3739 let tools = MockToolExecutor {
3740 output: json!({"status": "unused"}),
3741 fail: true,
3742 };
3743 let runtime = WorkflowRuntime::new(
3744 linear_workflow(),
3745 &llm,
3746 Some(&tools),
3747 WorkflowRuntimeOptions::default(),
3748 );
3749
3750 let error = runtime
3751 .execute(json!({}), None)
3752 .await
3753 .expect_err("workflow should fail on tool error");
3754
3755 assert!(matches!(
3756 error,
3757 WorkflowRuntimeError::ToolRetryExhausted { node_id, attempts: 1, .. }
3758 if node_id == "tool"
3759 ));
3760 }
3761
3762 #[tokio::test]
3763 async fn retries_llm_after_transient_failure() {
3764 let llm = SequencedLlmExecutor {
3765 responses: Mutex::new(vec![
3766 Err(LlmExecutionError::Client("temporary".to_string())),
3767 Ok(LlmExecutionOutput {
3768 content: "recovered".to_string(),
3769 }),
3770 ]),
3771 calls: AtomicUsize::new(0),
3772 };
3773 let runtime = WorkflowRuntime::new(
3774 llm_only_workflow(),
3775 &llm,
3776 None,
3777 WorkflowRuntimeOptions {
3778 llm_node_policy: NodeExecutionPolicy {
3779 timeout: None,
3780 max_retries: 1,
3781 },
3782 ..WorkflowRuntimeOptions::default()
3783 },
3784 );
3785
3786 let result = runtime
3787 .execute(json!({"request_id": "r2"}), None)
3788 .await
3789 .expect("llm retry should recover");
3790
3791 assert_eq!(result.terminal_node_id, "end");
3792 assert_eq!(result.node_outputs.get("llm"), Some(&json!("recovered")));
3793 assert_eq!(result.retry_events.len(), 1);
3794 assert_eq!(result.retry_events[0].operation, "llm");
3795 assert_eq!(llm.calls.load(Ordering::Relaxed), 2);
3796 }
3797
3798 #[tokio::test]
3799 async fn times_out_tool_execution_per_policy() {
3800 let llm = MockLlmExecutor {
3801 output: "ok".to_string(),
3802 };
3803 let tool = SlowToolExecutor {
3804 delay: Duration::from_millis(50),
3805 };
3806 let runtime = WorkflowRuntime::new(
3807 linear_workflow(),
3808 &llm,
3809 Some(&tool),
3810 WorkflowRuntimeOptions {
3811 tool_node_policy: NodeExecutionPolicy {
3812 timeout: Some(Duration::from_millis(5)),
3813 max_retries: 0,
3814 },
3815 ..WorkflowRuntimeOptions::default()
3816 },
3817 );
3818
3819 let error = runtime
3820 .execute(json!({}), None)
3821 .await
3822 .expect_err("tool execution should time out");
3823
3824 assert!(matches!(
3825 error,
3826 WorkflowRuntimeError::ToolTimeout {
3827 node_id,
3828 timeout_ms: 5,
3829 attempts: 1,
3830 } if node_id == "tool"
3831 ));
3832 }
3833
3834 #[tokio::test]
3835 async fn cancels_between_retry_attempts() {
3836 let cancel_flag = Arc::new(AtomicBool::new(false));
3837 let llm = CancellingLlmExecutor {
3838 cancel_flag: Arc::clone(&cancel_flag),
3839 calls: AtomicUsize::new(0),
3840 };
3841 let runtime = WorkflowRuntime::new(
3842 llm_only_workflow(),
3843 &llm,
3844 None,
3845 WorkflowRuntimeOptions {
3846 llm_node_policy: NodeExecutionPolicy {
3847 timeout: None,
3848 max_retries: 3,
3849 },
3850 ..WorkflowRuntimeOptions::default()
3851 },
3852 );
3853
3854 let error = runtime
3855 .execute(json!({}), Some(cancel_flag.as_ref()))
3856 .await
3857 .expect_err("workflow should stop when cancellation is observed");
3858
3859 assert!(matches!(error, WorkflowRuntimeError::Cancelled));
3860 assert_eq!(llm.calls.load(Ordering::Relaxed), 1);
3861 }
3862
3863 #[tokio::test]
3864 async fn enforces_step_limit_guard() {
3865 let workflow = WorkflowDefinition {
3866 version: "v0".to_string(),
3867 name: "loop".to_string(),
3868 nodes: vec![
3869 Node {
3870 id: "start".to_string(),
3871 kind: NodeKind::Start {
3872 next: "condition".to_string(),
3873 },
3874 },
3875 Node {
3876 id: "condition".to_string(),
3877 kind: NodeKind::Condition {
3878 expression: "true".to_string(),
3879 on_true: "condition".to_string(),
3880 on_false: "end".to_string(),
3881 },
3882 },
3883 Node {
3884 id: "end".to_string(),
3885 kind: NodeKind::End,
3886 },
3887 ],
3888 };
3889
3890 let llm = MockLlmExecutor {
3891 output: "unused".to_string(),
3892 };
3893 let runtime = WorkflowRuntime::new(
3894 workflow,
3895 &llm,
3896 None,
3897 WorkflowRuntimeOptions {
3898 max_steps: 3,
3899 ..WorkflowRuntimeOptions::default()
3900 },
3901 );
3902
3903 let error = runtime
3904 .execute(json!({}), None)
3905 .await
3906 .expect_err("workflow should fail on step limit");
3907
3908 assert!(matches!(
3909 error,
3910 WorkflowRuntimeError::StepLimitExceeded { max_steps: 3 }
3911 ));
3912 }
3913
3914 #[tokio::test]
3915 async fn validates_recorded_trace_in_replay_mode() {
3916 let llm = MockLlmExecutor {
3917 output: "ok".to_string(),
3918 };
3919 let tools = MockToolExecutor {
3920 output: json!({"status": "done"}),
3921 fail: false,
3922 };
3923 let runtime = WorkflowRuntime::new(
3924 linear_workflow(),
3925 &llm,
3926 Some(&tools),
3927 WorkflowRuntimeOptions {
3928 replay_mode: WorkflowReplayMode::ValidateRecordedTrace,
3929 ..WorkflowRuntimeOptions::default()
3930 },
3931 );
3932
3933 let result = runtime
3934 .execute(json!({"request_id": "r1"}), None)
3935 .await
3936 .expect("replay validation should pass");
3937
3938 assert!(result.trace.is_some());
3939 assert_eq!(
3940 result.replay_report.as_ref().map(|r| r.total_events),
3941 Some(9)
3942 );
3943 }
3944
3945 #[tokio::test]
3946 async fn executes_loop_until_condition_fails() {
3947 let llm = MockLlmExecutor {
3948 output: "unused".to_string(),
3949 };
3950 let counter = IncrementingToolExecutor {
3951 value: AtomicUsize::new(0),
3952 };
3953 let runtime = WorkflowRuntime::new(
3954 loop_workflow(Some(8)),
3955 &llm,
3956 Some(&counter),
3957 WorkflowRuntimeOptions::default(),
3958 );
3959
3960 let result = runtime
3961 .execute(json!({}), None)
3962 .await
3963 .expect("loop workflow should terminate at end");
3964
3965 assert_eq!(result.terminal_node_id, "end");
3966 assert!(result.node_executions.iter().any(|step| matches!(
3967 step.data,
3968 NodeExecutionData::Loop {
3969 evaluated: false,
3970 ..
3971 }
3972 )));
3973 }
3974
3975 #[tokio::test]
3976 async fn fails_when_loop_exceeds_max_iterations() {
3977 let llm = MockLlmExecutor {
3978 output: "unused".to_string(),
3979 };
3980 let counter = MockToolExecutor {
3981 output: json!(0),
3982 fail: false,
3983 };
3984 let runtime = WorkflowRuntime::new(
3985 loop_workflow(Some(2)),
3986 &llm,
3987 Some(&counter),
3988 WorkflowRuntimeOptions {
3989 max_steps: 20,
3990 ..WorkflowRuntimeOptions::default()
3991 },
3992 );
3993
3994 let error = runtime
3995 .execute(json!({}), None)
3996 .await
3997 .expect_err("loop should fail once max iterations are exceeded");
3998
3999 assert!(matches!(
4000 error,
4001 WorkflowRuntimeError::LoopIterationLimitExceeded {
4002 node_id,
4003 max_iterations: 2
4004 } if node_id == "loop"
4005 ));
4006 }
4007
4008 #[tokio::test]
4009 async fn executes_parallel_then_merge_all() {
4010 let llm = MockLlmExecutor {
4011 output: "unused".to_string(),
4012 };
4013 let tool = EchoInputToolExecutor;
4014 let runtime = WorkflowRuntime::new(
4015 parallel_merge_workflow(MergePolicy::All, None),
4016 &llm,
4017 Some(&tool),
4018 WorkflowRuntimeOptions::default(),
4019 );
4020
4021 let result = runtime
4022 .execute(json!({}), None)
4023 .await
4024 .expect("parallel merge workflow should succeed");
4025
4026 assert_eq!(result.terminal_node_id, "end");
4027 assert_eq!(
4028 result.node_outputs.get("tool_a"),
4029 Some(&json!({"value": 1}))
4030 );
4031 assert_eq!(
4032 result.node_outputs.get("tool_b"),
4033 Some(&json!({"value": 2}))
4034 );
4035 assert_eq!(
4036 result.node_outputs.get("merge"),
4037 Some(&json!([{"value": 1}, {"value": 2}]))
4038 );
4039 }
4040
4041 #[tokio::test]
4042 async fn executes_map_reduce_sum() {
4043 let llm = MockLlmExecutor {
4044 output: "unused".to_string(),
4045 };
4046 let tool = EchoInputToolExecutor;
4047 let runtime = WorkflowRuntime::new(
4048 map_reduce_workflow(ReduceOperation::Sum),
4049 &llm,
4050 Some(&tool),
4051 WorkflowRuntimeOptions::default(),
4052 );
4053
4054 let result = runtime
4055 .execute(json!({"values": [1, 2, 3]}), None)
4056 .await
4057 .expect("map reduce workflow should succeed");
4058
4059 assert_eq!(result.node_outputs.get("map"), Some(&json!([1, 2, 3])));
4060 assert_eq!(result.node_outputs.get("reduce"), Some(&json!(6.0)));
4061 }
4062
4063 #[tokio::test]
4064 async fn fails_map_when_items_path_is_not_array() {
4065 let llm = MockLlmExecutor {
4066 output: "unused".to_string(),
4067 };
4068 let tool = EchoInputToolExecutor;
4069 let runtime = WorkflowRuntime::new(
4070 map_reduce_workflow(ReduceOperation::Count),
4071 &llm,
4072 Some(&tool),
4073 WorkflowRuntimeOptions::default(),
4074 );
4075
4076 let error = runtime
4077 .execute(json!({"values": {"not": "array"}}), None)
4078 .await
4079 .expect_err("map node should fail on non-array path");
4080
4081 assert!(matches!(
4082 error,
4083 WorkflowRuntimeError::MapItemsNotArray {
4084 node_id,
4085 items_path
4086 } if node_id == "map" && items_path == "input.values"
4087 ));
4088 }
4089
4090 #[tokio::test]
4091 async fn executes_subgraph_via_registry() {
4092 let llm = MockLlmExecutor {
4093 output: "nested-ok".to_string(),
4094 };
4095 let subgraph = WorkflowDefinition {
4096 version: "v0".to_string(),
4097 name: "child".to_string(),
4098 nodes: vec![
4099 Node {
4100 id: "start".to_string(),
4101 kind: NodeKind::Start {
4102 next: "llm".to_string(),
4103 },
4104 },
4105 Node {
4106 id: "llm".to_string(),
4107 kind: NodeKind::Llm {
4108 model: "gpt-4".to_string(),
4109 prompt: "child".to_string(),
4110 next: Some("end".to_string()),
4111 },
4112 },
4113 Node {
4114 id: "end".to_string(),
4115 kind: NodeKind::End,
4116 },
4117 ],
4118 };
4119 let parent = WorkflowDefinition {
4120 version: "v0".to_string(),
4121 name: "parent".to_string(),
4122 nodes: vec![
4123 Node {
4124 id: "start".to_string(),
4125 kind: NodeKind::Start {
4126 next: "sub".to_string(),
4127 },
4128 },
4129 Node {
4130 id: "sub".to_string(),
4131 kind: NodeKind::Subgraph {
4132 graph: "child_graph".to_string(),
4133 next: Some("end".to_string()),
4134 },
4135 },
4136 Node {
4137 id: "end".to_string(),
4138 kind: NodeKind::End,
4139 },
4140 ],
4141 };
4142
4143 let mut registry = BTreeMap::new();
4144 registry.insert("child_graph".to_string(), subgraph);
4145 let runtime = WorkflowRuntime::new(
4146 parent,
4147 &llm,
4148 None,
4149 WorkflowRuntimeOptions {
4150 subgraph_registry: registry,
4151 ..WorkflowRuntimeOptions::default()
4152 },
4153 );
4154
4155 let result = runtime
4156 .execute(json!({"approved": true}), None)
4157 .await
4158 .expect("subgraph workflow should execute");
4159
4160 assert_eq!(result.terminal_node_id, "end");
4161 assert!(matches!(
4162 result.node_executions[1].data,
4163 NodeExecutionData::Subgraph { .. }
4164 ));
4165 }
4166
4167 #[tokio::test]
4168 async fn executes_batch_and_filter_nodes() {
4169 let llm = MockLlmExecutor {
4170 output: "unused".to_string(),
4171 };
4172 let workflow = WorkflowDefinition {
4173 version: "v0".to_string(),
4174 name: "batch-filter".to_string(),
4175 nodes: vec![
4176 Node {
4177 id: "start".to_string(),
4178 kind: NodeKind::Start {
4179 next: "batch".to_string(),
4180 },
4181 },
4182 Node {
4183 id: "batch".to_string(),
4184 kind: NodeKind::Batch {
4185 items_path: "input.items".to_string(),
4186 next: "filter".to_string(),
4187 },
4188 },
4189 Node {
4190 id: "filter".to_string(),
4191 kind: NodeKind::Filter {
4192 items_path: "node_outputs.batch".to_string(),
4193 expression: "item.keep == true".to_string(),
4194 next: "end".to_string(),
4195 },
4196 },
4197 Node {
4198 id: "end".to_string(),
4199 kind: NodeKind::End,
4200 },
4201 ],
4202 };
4203
4204 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
4205 let result = runtime
4206 .execute(
4207 json!({
4208 "items": [
4209 {"id": 1, "keep": true},
4210 {"id": 2, "keep": false},
4211 {"id": 3, "keep": true}
4212 ]
4213 }),
4214 None,
4215 )
4216 .await
4217 .expect("batch/filter workflow should execute");
4218
4219 assert_eq!(
4220 result
4221 .node_outputs
4222 .get("batch")
4223 .and_then(Value::as_array)
4224 .map(Vec::len),
4225 Some(3)
4226 );
4227 assert_eq!(
4228 result
4229 .node_outputs
4230 .get("filter")
4231 .and_then(Value::as_array)
4232 .map(Vec::len),
4233 Some(2)
4234 );
4235 }
4236
4237 #[tokio::test]
4238 async fn executes_debounce_and_throttle_nodes() {
4239 let llm = MockLlmExecutor {
4240 output: "unused".to_string(),
4241 };
4242 let runtime = WorkflowRuntime::new(
4243 debounce_and_throttle_workflow(),
4244 &llm,
4245 None,
4246 WorkflowRuntimeOptions::default(),
4247 );
4248
4249 let result = runtime
4250 .execute(json!({"key": "k1"}), None)
4251 .await
4252 .expect("debounce/throttle workflow should execute");
4253
4254 assert_eq!(result.terminal_node_id, "end_throttled");
4255 assert_eq!(
4256 result.node_outputs.get("debounce_a"),
4257 Some(&json!({"key": "k1", "suppressed": true}))
4258 );
4259 assert_eq!(
4260 result.node_outputs.get("throttle_a"),
4261 Some(&json!({"key": "k1", "throttled": true}))
4262 );
4263 }
4264
4265 #[tokio::test]
4266 async fn executes_retry_compensate_successfully_without_compensation() {
4267 let llm = MockLlmExecutor {
4268 output: "unused".to_string(),
4269 };
4270 let tools = RetryCompensateToolExecutor {
4271 attempts: AtomicUsize::new(0),
4272 };
4273 let runtime = WorkflowRuntime::new(
4274 retry_compensate_workflow("unstable_primary"),
4275 &llm,
4276 Some(&tools),
4277 WorkflowRuntimeOptions::default(),
4278 );
4279
4280 let result = runtime
4281 .execute(json!({}), None)
4282 .await
4283 .expect("retry_compensate should recover by retry");
4284
4285 assert_eq!(result.terminal_node_id, "end");
4286 assert_eq!(tools.attempts.load(Ordering::Relaxed), 2);
4287 assert_eq!(result.retry_events.len(), 1);
4288 }
4289
4290 #[tokio::test]
4291 async fn executes_retry_compensate_with_compensation_route() {
4292 let llm = MockLlmExecutor {
4293 output: "unused".to_string(),
4294 };
4295 let tools = RetryCompensateToolExecutor {
4296 attempts: AtomicUsize::new(0),
4297 };
4298 let runtime = WorkflowRuntime::new(
4299 retry_compensate_workflow("always_fail"),
4300 &llm,
4301 Some(&tools),
4302 WorkflowRuntimeOptions::default(),
4303 );
4304
4305 let result = runtime
4306 .execute(json!({}), None)
4307 .await
4308 .expect("retry_compensate should use compensation");
4309
4310 assert_eq!(result.terminal_node_id, "end_compensated");
4311 assert_eq!(result.retry_events.len(), 1);
4312 assert_eq!(
4313 result
4314 .node_outputs
4315 .get("retry_comp")
4316 .and_then(|value| value.get("status")),
4317 Some(&json!("compensated"))
4318 );
4319 }
4320
4321 #[tokio::test]
4322 async fn executes_event_cache_router_human_transform_nodes() {
4323 let llm = MockLlmExecutor {
4324 output: "unused".to_string(),
4325 };
4326 let runtime = WorkflowRuntime::new(
4327 extended_nodes_workflow(),
4328 &llm,
4329 None,
4330 WorkflowRuntimeOptions::default(),
4331 );
4332
4333 let result = runtime
4334 .execute(
4335 json!({
4336 "event_type": "webhook",
4337 "cache_key": "customer-1",
4338 "payload": {"value": 99},
4339 "mode": "manual",
4340 "approval": "approve",
4341 "review_notes": {"editor": "ops"}
4342 }),
4343 None,
4344 )
4345 .await
4346 .expect("extended nodes workflow should execute");
4347
4348 assert_eq!(result.terminal_node_id, "end");
4349 assert_eq!(
4350 result.node_outputs.get("cache_read"),
4351 Some(&json!({"key": "customer-1", "hit": true, "value": {"value": 99}}))
4352 );
4353 assert_eq!(
4354 result.node_outputs.get("router"),
4355 Some(&json!({"selected": "human"}))
4356 );
4357 assert_eq!(
4358 result.node_outputs.get("transform"),
4359 Some(&json!({"value": 99}))
4360 );
4361 }
4362
4363 #[tokio::test]
4364 async fn routes_event_trigger_mismatch_to_fallback() {
4365 let llm = MockLlmExecutor {
4366 output: "unused".to_string(),
4367 };
4368 let runtime = WorkflowRuntime::new(
4369 extended_nodes_workflow(),
4370 &llm,
4371 None,
4372 WorkflowRuntimeOptions::default(),
4373 );
4374
4375 let result = runtime
4376 .execute(
4377 json!({
4378 "event_type": "cron",
4379 "cache_key": "customer-1",
4380 "payload": {"value": 99},
4381 "mode": "manual",
4382 "approval": "approve"
4383 }),
4384 None,
4385 )
4386 .await
4387 .expect("event mismatch should route to fallback");
4388
4389 assert_eq!(result.terminal_node_id, "end_mismatch");
4390 }
4391
4392 #[tokio::test]
4393 async fn cache_read_routes_to_on_miss_when_value_absent() {
4394 let llm = MockLlmExecutor {
4395 output: "unused".to_string(),
4396 };
4397 let workflow = WorkflowDefinition {
4398 version: "v0".to_string(),
4399 name: "cache-read-miss".to_string(),
4400 nodes: vec![
4401 Node {
4402 id: "start".to_string(),
4403 kind: NodeKind::Start {
4404 next: "cache_read".to_string(),
4405 },
4406 },
4407 Node {
4408 id: "cache_read".to_string(),
4409 kind: NodeKind::CacheRead {
4410 key_path: "input.cache_key".to_string(),
4411 next: "end_hit".to_string(),
4412 on_miss: Some("end_miss".to_string()),
4413 },
4414 },
4415 Node {
4416 id: "end_hit".to_string(),
4417 kind: NodeKind::End,
4418 },
4419 Node {
4420 id: "end_miss".to_string(),
4421 kind: NodeKind::End,
4422 },
4423 ],
4424 };
4425 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
4426
4427 let result = runtime
4428 .execute(json!({"cache_key": "new-customer"}), None)
4429 .await
4430 .expect("cache read miss should still execute via on_miss route");
4431
4432 assert_eq!(
4433 result.node_outputs.get("cache_read"),
4434 Some(&json!({"key": "new-customer", "hit": false, "value": null}))
4435 );
4436 assert_eq!(result.terminal_node_id, "end_miss");
4437 }
4438
4439 #[tokio::test]
4440 async fn rejects_condition_when_expression_scope_exceeds_limit() {
4441 let llm = MockLlmExecutor {
4442 output: "unused".to_string(),
4443 };
4444 let workflow = WorkflowDefinition {
4445 version: "v0".to_string(),
4446 name: "scope-limit".to_string(),
4447 nodes: vec![
4448 Node {
4449 id: "start".to_string(),
4450 kind: NodeKind::Start {
4451 next: "condition".to_string(),
4452 },
4453 },
4454 Node {
4455 id: "condition".to_string(),
4456 kind: NodeKind::Condition {
4457 expression: "input.flag == true".to_string(),
4458 on_true: "end".to_string(),
4459 on_false: "end".to_string(),
4460 },
4461 },
4462 Node {
4463 id: "end".to_string(),
4464 kind: NodeKind::End,
4465 },
4466 ],
4467 };
4468
4469 let runtime = WorkflowRuntime::new(
4470 workflow,
4471 &llm,
4472 None,
4473 WorkflowRuntimeOptions {
4474 security_limits: RuntimeSecurityLimits {
4475 max_expression_scope_bytes: 32,
4476 ..RuntimeSecurityLimits::default()
4477 },
4478 ..WorkflowRuntimeOptions::default()
4479 },
4480 );
4481
4482 let error = runtime
4483 .execute(
4484 json!({"flag": true, "payload": "this-is-too-large-for-limit"}),
4485 None,
4486 )
4487 .await
4488 .expect_err("condition should fail when scope budget is exceeded");
4489 assert!(matches!(
4490 error,
4491 WorkflowRuntimeError::ExpressionScopeLimitExceeded { node_id, .. }
4492 if node_id == "condition"
4493 ));
4494 }
4495
4496 #[tokio::test]
4497 async fn rejects_map_when_item_count_exceeds_limit() {
4498 let llm = MockLlmExecutor {
4499 output: "unused".to_string(),
4500 };
4501 let tool = EchoInputToolExecutor;
4502 let runtime = WorkflowRuntime::new(
4503 map_reduce_workflow(ReduceOperation::Count),
4504 &llm,
4505 Some(&tool),
4506 WorkflowRuntimeOptions {
4507 security_limits: RuntimeSecurityLimits {
4508 max_map_items: 2,
4509 ..RuntimeSecurityLimits::default()
4510 },
4511 ..WorkflowRuntimeOptions::default()
4512 },
4513 );
4514
4515 let error = runtime
4516 .execute(json!({"values": [1, 2, 3]}), None)
4517 .await
4518 .expect_err("map should fail when item guard is exceeded");
4519 assert!(matches!(
4520 error,
4521 WorkflowRuntimeError::MapItemLimitExceeded {
4522 node_id,
4523 actual_items: 3,
4524 max_items: 2,
4525 } if node_id == "map"
4526 ));
4527 }
4528
4529 #[tokio::test]
4530 async fn resumes_from_checkpoint() {
4531 let llm = MockLlmExecutor {
4532 output: "unused".to_string(),
4533 };
4534 let tool = MockToolExecutor {
4535 output: json!({"ok": true}),
4536 fail: false,
4537 };
4538 let runtime = WorkflowRuntime::new(
4539 linear_workflow(),
4540 &llm,
4541 Some(&tool),
4542 WorkflowRuntimeOptions::default(),
4543 );
4544
4545 let checkpoint = WorkflowCheckpoint {
4546 run_id: "run-1".to_string(),
4547 workflow_name: "linear".to_string(),
4548 step: 2,
4549 next_node_id: "tool".to_string(),
4550 scope_snapshot: json!({"input": {"request_id": "r-resume"}}),
4551 };
4552
4553 let resumed = runtime
4554 .execute_resume_from_failure(&checkpoint, None)
4555 .await
4556 .expect("resume should continue from checkpoint node");
4557 assert_eq!(resumed.terminal_node_id, "end");
4558 assert_eq!(resumed.node_executions[0].node_id, "tool");
4559 }
4560
4561 #[test]
4562 fn scope_capabilities_enforce_read_write_boundaries() {
4563 let mut scope = RuntimeScope::new(json!({"k": "v"}));
4564
4565 let read_error = scope
4566 .scoped_input(ScopeCapability::LlmWrite)
4567 .expect_err("write capability should not read scope");
4568 assert!(matches!(read_error, ScopeAccessError::ReadDenied { .. }));
4569
4570 let write_error = scope
4571 .record_tool_output("tool", json!({"ok": true}), ScopeCapability::LlmWrite)
4572 .expect_err("llm write capability should not write tool output");
4573 assert!(matches!(write_error, ScopeAccessError::WriteDenied { .. }));
4574 }
4575}