1use std::collections::{BTreeMap, HashMap};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use serde_json::{json, Map, Value};
7use simple_agent_type::message::Message;
8use simple_agent_type::request::CompletionRequest;
9use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
10use thiserror::Error;
11use tokio::time::timeout;
12
13use crate::checkpoint::WorkflowCheckpoint;
14use crate::expressions;
15use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
16use crate::recorder::{TraceRecordError, TraceRecorder};
17use crate::replay::{replay_trace, ReplayError, ReplayReport};
18use crate::scheduler::DagScheduler;
19pub use crate::state::ScopeAccessError;
20use crate::trace::{TraceTerminalStatus, WorkflowTrace, WorkflowTraceMetadata};
21use crate::validation::{validate_and_normalize, ValidationErrors};
22
23mod engine;
24mod scope;
25use scope::{RuntimeScope, ScopeCapability};
26
27#[derive(Debug, Clone)]
29pub struct WorkflowRuntimeOptions {
30 pub max_steps: usize,
32 pub validate_before_run: bool,
34 pub llm_node_policy: NodeExecutionPolicy,
36 pub tool_node_policy: NodeExecutionPolicy,
38 pub enable_trace_recording: bool,
40 pub replay_mode: WorkflowReplayMode,
42 pub scheduler_max_in_flight: usize,
44 pub subgraph_registry: BTreeMap<String, WorkflowDefinition>,
46 pub security_limits: RuntimeSecurityLimits,
48}
49
50#[derive(Debug, Clone)]
52pub struct RuntimeSecurityLimits {
53 pub max_expression_scope_bytes: usize,
55 pub max_map_items: usize,
57 pub max_parallel_branches: usize,
59 pub max_filter_items: usize,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum WorkflowReplayMode {
66 Disabled,
68 ValidateRecordedTrace,
70}
71
72#[derive(Debug, Clone, Default)]
74pub struct NodeExecutionPolicy {
75 pub timeout: Option<Duration>,
77 pub max_retries: usize,
79}
80
81impl Default for WorkflowRuntimeOptions {
82 fn default() -> Self {
83 Self {
84 max_steps: 256,
85 validate_before_run: true,
86 llm_node_policy: NodeExecutionPolicy::default(),
87 tool_node_policy: NodeExecutionPolicy::default(),
88 enable_trace_recording: true,
89 replay_mode: WorkflowReplayMode::Disabled,
90 scheduler_max_in_flight: 8,
91 subgraph_registry: BTreeMap::new(),
92 security_limits: RuntimeSecurityLimits::default(),
93 }
94 }
95}
96
97impl Default for RuntimeSecurityLimits {
98 fn default() -> Self {
99 Self {
100 max_expression_scope_bytes: 128 * 1024,
101 max_map_items: 4096,
102 max_parallel_branches: 128,
103 max_filter_items: 8192,
104 }
105 }
106}
107
108pub trait CancellationSignal: Send + Sync {
110 fn is_cancelled(&self) -> bool;
112}
113
114impl CancellationSignal for AtomicBool {
115 fn is_cancelled(&self) -> bool {
116 self.load(Ordering::Relaxed)
117 }
118}
119
120impl CancellationSignal for bool {
121 fn is_cancelled(&self) -> bool {
122 *self
123 }
124}
125
126#[derive(Debug, Clone, PartialEq)]
128pub struct LlmExecutionInput {
129 pub node_id: String,
131 pub model: String,
133 pub prompt: String,
135 pub scoped_input: Value,
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct LlmExecutionOutput {
142 pub content: String,
144}
145
146#[derive(Debug, Error, Clone, PartialEq, Eq)]
148pub enum LlmExecutionError {
149 #[error("invalid completion request: {0}")]
151 InvalidRequest(String),
152 #[error("llm client execution failed: {0}")]
154 Client(String),
155 #[error("unexpected completion outcome: {0}")]
157 UnexpectedOutcome(&'static str),
158 #[error("llm response had no content")]
160 EmptyResponse,
161}
162
163#[async_trait]
165pub trait LlmExecutor: Send + Sync {
166 async fn execute(
168 &self,
169 input: LlmExecutionInput,
170 ) -> Result<LlmExecutionOutput, LlmExecutionError>;
171}
172
173#[async_trait]
174impl LlmExecutor for SimpleAgentsClient {
175 async fn execute(
176 &self,
177 input: LlmExecutionInput,
178 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
179 let user_prompt = build_prompt_with_scope(&input.prompt, &input.scoped_input);
180 let request = CompletionRequest::builder()
181 .model(input.model)
182 .message(Message::user(user_prompt))
183 .build()
184 .map_err(|error| LlmExecutionError::InvalidRequest(error.to_string()))?;
185
186 let outcome = self
187 .complete(&request, CompletionOptions::default())
188 .await
189 .map_err(|error| LlmExecutionError::Client(error.to_string()))?;
190
191 let response = match outcome {
192 CompletionOutcome::Response(response) => response,
193 CompletionOutcome::Stream(_) => {
194 return Err(LlmExecutionError::UnexpectedOutcome("stream"));
195 }
196 CompletionOutcome::HealedJson(_) => {
197 return Err(LlmExecutionError::UnexpectedOutcome("healed_json"));
198 }
199 CompletionOutcome::CoercedSchema(_) => {
200 return Err(LlmExecutionError::UnexpectedOutcome("coerced_schema"));
201 }
202 };
203
204 let content = response
205 .content()
206 .ok_or(LlmExecutionError::EmptyResponse)?
207 .to_string();
208
209 Ok(LlmExecutionOutput { content })
210 }
211}
212
213#[derive(Debug, Clone, PartialEq)]
215pub struct ToolExecutionInput {
216 pub node_id: String,
218 pub tool: String,
220 pub input: Value,
222 pub scoped_input: Value,
224}
225
226#[derive(Debug, Error, Clone, PartialEq, Eq)]
228pub enum ToolExecutionError {
229 #[error("tool handler not found: {tool}")]
231 NotFound {
232 tool: String,
234 },
235 #[error("tool execution failed: {0}")]
237 Failed(String),
238}
239
240#[async_trait]
242pub trait ToolExecutor: Send + Sync {
243 async fn execute_tool(&self, input: ToolExecutionInput) -> Result<Value, ToolExecutionError>;
245}
246
247#[derive(Debug, Clone, PartialEq)]
249pub struct NodeExecution {
250 pub step: usize,
252 pub node_id: String,
254 pub data: NodeExecutionData,
256}
257
258#[derive(Debug, Clone, PartialEq)]
260pub enum NodeExecutionData {
261 Start {
263 next: String,
265 },
266 Llm {
268 model: String,
270 output: String,
272 next: String,
274 },
275 Tool {
277 tool: String,
279 output: Value,
281 next: String,
283 },
284 Condition {
286 expression: String,
288 evaluated: bool,
290 next: String,
292 },
293 Debounce {
295 key: String,
297 suppressed: bool,
299 next: String,
301 },
302 Throttle {
304 key: String,
306 throttled: bool,
308 next: String,
310 },
311 RetryCompensate {
313 tool: String,
315 attempts: usize,
317 compensated: bool,
319 output: Value,
321 next: String,
323 },
324 HumanInTheLoop {
326 approved: bool,
328 response: Value,
330 next: String,
332 },
333 CacheRead {
335 key: String,
337 hit: bool,
339 value: Value,
341 next: String,
343 },
344 CacheWrite {
346 key: String,
348 value: Value,
350 next: String,
352 },
353 EventTrigger {
355 event: String,
357 matched: bool,
359 next: String,
361 },
362 Router {
364 selected: String,
366 next: String,
368 },
369 Transform {
371 expression: String,
373 output: Value,
375 next: String,
377 },
378 Loop {
380 condition: String,
382 evaluated: bool,
384 iteration: u32,
386 next: String,
388 },
389 Subgraph {
391 graph: String,
393 terminal_node_id: String,
395 output: Value,
397 next: String,
399 },
400 Batch {
402 items_path: String,
404 item_count: usize,
406 next: String,
408 },
409 Filter {
411 items_path: String,
413 expression: String,
415 kept: usize,
417 next: String,
419 },
420 Parallel {
422 branches: Vec<String>,
424 outputs: BTreeMap<String, Value>,
426 next: String,
428 },
429 Merge {
431 policy: MergePolicy,
433 sources: Vec<String>,
435 output: Value,
437 next: String,
439 },
440 Map {
442 item_count: usize,
444 output: Value,
446 next: String,
448 },
449 Reduce {
451 operation: ReduceOperation,
453 output: Value,
455 next: String,
457 },
458 End,
460}
461
462#[derive(Debug, Clone, PartialEq)]
464pub struct WorkflowEvent {
465 pub step: usize,
467 pub node_id: String,
469 pub kind: WorkflowEventKind,
471}
472
473#[derive(Debug, Clone, PartialEq)]
475pub enum WorkflowEventKind {
476 NodeStarted,
478 NodeCompleted {
480 data: NodeExecutionData,
482 },
483 NodeFailed {
485 message: String,
487 },
488}
489
490#[derive(Debug, Clone, PartialEq)]
492pub struct WorkflowRunResult {
493 pub workflow_name: String,
495 pub terminal_node_id: String,
497 pub node_executions: Vec<NodeExecution>,
499 pub events: Vec<WorkflowEvent>,
501 pub retry_events: Vec<WorkflowRetryEvent>,
503 pub node_outputs: BTreeMap<String, Value>,
505 pub trace: Option<WorkflowTrace>,
507 pub replay_report: Option<ReplayReport>,
509}
510
511#[derive(Debug, Clone, PartialEq, Eq)]
513pub struct WorkflowRetryEvent {
514 pub step: usize,
516 pub node_id: String,
518 pub operation: String,
520 pub failed_attempt: usize,
522 pub reason: String,
524}
525
526#[derive(Debug, Error)]
528pub enum WorkflowRuntimeError {
529 #[error(transparent)]
531 Validation(#[from] ValidationErrors),
532 #[error("workflow has no start node")]
534 MissingStartNode,
535 #[error("node not found: {node_id}")]
537 NodeNotFound {
538 node_id: String,
540 },
541 #[error("workflow exceeded max step limit ({max_steps})")]
543 StepLimitExceeded {
544 max_steps: usize,
546 },
547 #[error("workflow execution cancelled")]
549 Cancelled,
550 #[error("llm node '{node_id}' failed: {source}")]
552 Llm {
553 node_id: String,
555 source: LlmExecutionError,
557 },
558 #[error("tool node '{node_id}' failed: {source}")]
560 Tool {
561 node_id: String,
563 source: ToolExecutionError,
565 },
566 #[error("llm node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
568 LlmRetryExhausted {
569 node_id: String,
571 attempts: usize,
573 last_error: LlmExecutionError,
575 },
576 #[error("tool node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
578 ToolRetryExhausted {
579 node_id: String,
581 attempts: usize,
583 last_error: ToolExecutionError,
585 },
586 #[error(
588 "llm node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
589 )]
590 LlmTimeout {
591 node_id: String,
593 timeout_ms: u128,
595 attempts: usize,
597 },
598 #[error(
600 "tool node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
601 )]
602 ToolTimeout {
603 node_id: String,
605 timeout_ms: u128,
607 attempts: usize,
609 },
610 #[error("tool node '{node_id}' requires a tool executor")]
612 MissingToolExecutor {
613 node_id: String,
615 },
616 #[error("node '{node_id}' is missing required next edge")]
618 MissingNextEdge {
619 node_id: String,
621 },
622 #[error("condition node '{node_id}' has invalid expression '{expression}': {reason}")]
624 InvalidCondition {
625 node_id: String,
627 expression: String,
629 reason: String,
631 },
632 #[error("loop node '{node_id}' has invalid condition '{expression}': {reason}")]
634 InvalidLoopCondition {
635 node_id: String,
637 expression: String,
639 reason: String,
641 },
642 #[error("loop node '{node_id}' exceeded max iterations ({max_iterations})")]
644 LoopIterationLimitExceeded {
645 node_id: String,
647 max_iterations: u32,
649 },
650 #[error("node '{node_id}' did not provide a next transition")]
652 MissingNextTransition {
653 node_id: String,
655 },
656 #[error("runtime dispatch invariant failed for node '{node_id}': {reason}")]
658 DispatchInvariant {
659 node_id: String,
661 reason: String,
663 },
664 #[error("scope access failed on node '{node_id}': {source}")]
666 ScopeAccess {
667 node_id: String,
669 source: ScopeAccessError,
671 },
672 #[error(transparent)]
674 TraceRecording(#[from] TraceRecordError),
675 #[error(transparent)]
677 Replay(#[from] ReplayError),
678 #[error("replay validation requires trace recording to be enabled")]
680 ReplayRequiresTraceRecording,
681 #[error("parallel node '{node_id}' cannot execute branch '{branch_id}': {reason}")]
683 ParallelBranchUnsupported {
684 node_id: String,
686 branch_id: String,
688 reason: String,
690 },
691 #[error("merge node '{node_id}' missing source output from '{source_id}'")]
693 MissingMergeSource {
694 node_id: String,
696 source_id: String,
698 },
699 #[error("merge node '{node_id}' quorum not met: required {required}, resolved {resolved}")]
701 MergeQuorumNotMet {
702 node_id: String,
704 required: usize,
706 resolved: usize,
708 },
709 #[error("map node '{node_id}' items_path '{items_path}' did not resolve to an array")]
711 MapItemsNotArray {
712 node_id: String,
714 items_path: String,
716 },
717 #[error("reduce node '{node_id}' source '{source_node}' is not reducible: {reason}")]
719 InvalidReduceInput {
720 node_id: String,
722 source_node: String,
724 reason: String,
726 },
727 #[error("subgraph node '{node_id}' references unknown graph '{graph}'")]
729 SubgraphNotFound { node_id: String, graph: String },
730 #[error("batch node '{node_id}' items_path '{items_path}' did not resolve to an array")]
732 BatchItemsNotArray { node_id: String, items_path: String },
733 #[error("filter node '{node_id}' items_path '{items_path}' did not resolve to an array")]
735 FilterItemsNotArray { node_id: String, items_path: String },
736 #[error("filter node '{node_id}' expression '{expression}' failed: {reason}")]
738 InvalidFilterExpression {
739 node_id: String,
740 expression: String,
741 reason: String,
742 },
743 #[error(
745 "expression scope too large on node '{node_id}': {actual_bytes} bytes exceeds limit {limit_bytes}"
746 )]
747 ExpressionScopeLimitExceeded {
748 node_id: String,
749 actual_bytes: usize,
750 limit_bytes: usize,
751 },
752 #[error(
754 "parallel node '{node_id}' has {actual_branches} branches, exceeding limit {max_branches}"
755 )]
756 ParallelBranchLimitExceeded {
757 node_id: String,
758 actual_branches: usize,
759 max_branches: usize,
760 },
761 #[error("map node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
763 MapItemLimitExceeded {
764 node_id: String,
765 actual_items: usize,
766 max_items: usize,
767 },
768 #[error("filter node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
770 FilterItemLimitExceeded {
771 node_id: String,
772 actual_items: usize,
773 max_items: usize,
774 },
775 #[error("node '{node_id}' could not resolve required path '{path}'")]
777 MissingPath { node_id: String, path: String },
778 #[error("node '{node_id}' path '{path}' did not resolve to a string cache key")]
780 CacheKeyNotString { node_id: String, path: String },
781 #[error("human node '{node_id}' has unsupported decision value at '{path}': {value}")]
783 InvalidHumanDecision {
784 node_id: String,
785 path: String,
786 value: String,
787 },
788 #[error("event trigger node '{node_id}' path '{path}' did not resolve to an event string")]
790 InvalidEventValue { node_id: String, path: String },
791 #[error("router node '{node_id}' route expression '{expression}' failed: {reason}")]
793 InvalidRouterExpression {
794 node_id: String,
795 expression: String,
796 reason: String,
797 },
798 #[error("transform node '{node_id}' expression '{expression}' failed: {reason}")]
800 InvalidTransformExpression {
801 node_id: String,
802 expression: String,
803 reason: String,
804 },
805 #[error(
807 "retry_compensate node '{node_id}' failed after {attempts} primary attempt(s) and compensation error: {compensation_error}"
808 )]
809 RetryCompensateFailed {
810 node_id: String,
811 attempts: usize,
812 compensation_error: ToolExecutionError,
813 },
814}
815
816pub struct WorkflowRuntime<'a> {
818 definition: WorkflowDefinition,
819 llm_executor: &'a dyn LlmExecutor,
820 tool_executor: Option<&'a dyn ToolExecutor>,
821 options: WorkflowRuntimeOptions,
822}
823
824impl<'a> WorkflowRuntime<'a> {
825 pub fn new(
827 definition: WorkflowDefinition,
828 llm_executor: &'a dyn LlmExecutor,
829 tool_executor: Option<&'a dyn ToolExecutor>,
830 options: WorkflowRuntimeOptions,
831 ) -> Self {
832 Self {
833 definition,
834 llm_executor,
835 tool_executor,
836 options,
837 }
838 }
839
840 pub async fn execute(
842 &self,
843 input: Value,
844 cancellation: Option<&dyn CancellationSignal>,
845 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
846 let workflow = if self.options.validate_before_run {
847 validate_and_normalize(&self.definition)?
848 } else {
849 self.definition.normalized()
850 };
851
852 let start_id = find_start_node_id(&workflow)?;
853 self.execute_from_node(
854 workflow,
855 RuntimeScope::new(input),
856 start_id,
857 0,
858 cancellation,
859 )
860 .await
861 }
862
863 pub async fn execute_resume_from_failure(
865 &self,
866 checkpoint: &WorkflowCheckpoint,
867 cancellation: Option<&dyn CancellationSignal>,
868 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
869 let workflow = if self.options.validate_before_run {
870 validate_and_normalize(&self.definition)?
871 } else {
872 self.definition.normalized()
873 };
874
875 let scope_input = checkpoint
876 .scope_snapshot
877 .get("input")
878 .cloned()
879 .unwrap_or_else(|| checkpoint.scope_snapshot.clone());
880
881 self.execute_from_node(
882 workflow,
883 RuntimeScope::new(scope_input),
884 checkpoint.next_node_id.clone(),
885 checkpoint.step,
886 cancellation,
887 )
888 .await
889 }
890
891 async fn execute_from_node(
892 &self,
893 workflow: WorkflowDefinition,
894 scope: RuntimeScope,
895 start_node_id: String,
896 starting_step: usize,
897 cancellation: Option<&dyn CancellationSignal>,
898 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
899 engine::execute_from_node(
900 self,
901 workflow,
902 scope,
903 start_node_id,
904 starting_step,
905 cancellation,
906 )
907 .await
908 }
909
910 async fn execute_node(
911 &self,
912 node: &Node,
913 node_index: &HashMap<&str, &Node>,
914 step: usize,
915 scope: &mut RuntimeScope,
916 cancellation: Option<&dyn CancellationSignal>,
917 retry_events: &mut Vec<WorkflowRetryEvent>,
918 ) -> Result<NodeExecution, WorkflowRuntimeError> {
919 engine::execute_node(
920 self,
921 node,
922 node_index,
923 step,
924 scope,
925 cancellation,
926 retry_events,
927 )
928 .await
929 }
930
931 fn execute_start_node(&self, step: usize, node: &Node, next: &str) -> NodeExecution {
932 NodeExecution {
933 step,
934 node_id: node.id.clone(),
935 data: NodeExecutionData::Start {
936 next: next.to_string(),
937 },
938 }
939 }
940
941 async fn execute_llm_node(
942 &self,
943 step: usize,
944 node: &Node,
945 spec: LlmNodeSpec<'_>,
946 scope: &mut RuntimeScope,
947 cancellation: Option<&dyn CancellationSignal>,
948 retry_events: &mut Vec<WorkflowRetryEvent>,
949 ) -> Result<NodeExecution, WorkflowRuntimeError> {
950 let next_node = spec
951 .next
952 .clone()
953 .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
954 node_id: node.id.clone(),
955 })?;
956
957 let (output, llm_retries) = self
958 .execute_llm_with_policy(step, node, spec.model, spec.prompt, scope, cancellation)
959 .await?;
960 retry_events.extend(llm_retries);
961
962 scope
963 .record_llm_output(&node.id, output.content.clone(), ScopeCapability::LlmWrite)
964 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
965 node_id: node.id.clone(),
966 source,
967 })?;
968
969 Ok(NodeExecution {
970 step,
971 node_id: node.id.clone(),
972 data: NodeExecutionData::Llm {
973 model: spec.model.to_string(),
974 output: output.content,
975 next: next_node,
976 },
977 })
978 }
979
980 async fn execute_tool_node(
981 &self,
982 step: usize,
983 node: &Node,
984 spec: ToolNodeSpec<'_>,
985 scope: &mut RuntimeScope,
986 cancellation: Option<&dyn CancellationSignal>,
987 retry_events: &mut Vec<WorkflowRetryEvent>,
988 ) -> Result<NodeExecution, WorkflowRuntimeError> {
989 let next_node = spec
990 .next
991 .clone()
992 .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
993 node_id: node.id.clone(),
994 })?;
995
996 let executor =
997 self.tool_executor
998 .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
999 node_id: node.id.clone(),
1000 })?;
1001
1002 let scoped_input = scope
1003 .scoped_input(ScopeCapability::ToolRead)
1004 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1005 node_id: node.id.clone(),
1006 source,
1007 })?;
1008
1009 let (tool_output, tool_retries) = self
1010 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1011 step,
1012 node,
1013 tool: spec.tool,
1014 input: spec.input,
1015 executor,
1016 scoped_input,
1017 cancellation,
1018 })
1019 .await?;
1020 retry_events.extend(tool_retries);
1021
1022 scope
1023 .record_tool_output(&node.id, tool_output.clone(), ScopeCapability::ToolWrite)
1024 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1025 node_id: node.id.clone(),
1026 source,
1027 })?;
1028
1029 Ok(NodeExecution {
1030 step,
1031 node_id: node.id.clone(),
1032 data: NodeExecutionData::Tool {
1033 tool: spec.tool.to_string(),
1034 output: tool_output,
1035 next: next_node,
1036 },
1037 })
1038 }
1039
1040 fn execute_condition_node(
1041 &self,
1042 step: usize,
1043 node: &Node,
1044 spec: ConditionNodeSpec<'_>,
1045 scope: &mut RuntimeScope,
1046 cancellation: Option<&dyn CancellationSignal>,
1047 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1048 check_cancelled(cancellation)?;
1049 let scoped_input =
1050 scope
1051 .scoped_input(ScopeCapability::ConditionRead)
1052 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1053 node_id: node.id.clone(),
1054 source,
1055 })?;
1056 enforce_expression_scope_budget(
1057 &node.id,
1058 &scoped_input,
1059 self.options.security_limits.max_expression_scope_bytes,
1060 )?;
1061 let evaluated =
1062 expressions::evaluate_bool(spec.expression, &scoped_input).map_err(|reason| {
1063 WorkflowRuntimeError::InvalidCondition {
1064 node_id: node.id.clone(),
1065 expression: spec.expression.to_string(),
1066 reason: reason.to_string(),
1067 }
1068 })?;
1069 let next = if evaluated {
1070 spec.on_true.to_string()
1071 } else {
1072 spec.on_false.to_string()
1073 };
1074
1075 scope
1076 .record_condition_output(&node.id, evaluated, ScopeCapability::ConditionWrite)
1077 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1078 node_id: node.id.clone(),
1079 source,
1080 })?;
1081
1082 Ok(NodeExecution {
1083 step,
1084 node_id: node.id.clone(),
1085 data: NodeExecutionData::Condition {
1086 expression: spec.expression.to_string(),
1087 evaluated,
1088 next,
1089 },
1090 })
1091 }
1092
1093 async fn execute_parallel_node(
1094 &self,
1095 step: usize,
1096 node: &Node,
1097 spec: ParallelNodeSpec<'_>,
1098 scope: &mut RuntimeScope,
1099 cancellation: Option<&dyn CancellationSignal>,
1100 retry_events: &mut Vec<WorkflowRetryEvent>,
1101 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1102 check_cancelled(cancellation)?;
1103 if spec.branches.len() > self.options.security_limits.max_parallel_branches {
1104 return Err(WorkflowRuntimeError::ParallelBranchLimitExceeded {
1105 node_id: node.id.clone(),
1106 actual_branches: spec.branches.len(),
1107 max_branches: self.options.security_limits.max_parallel_branches,
1108 });
1109 }
1110 let base_scope = scope
1111 .scoped_input(ScopeCapability::MapRead)
1112 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1113 node_id: node.id.clone(),
1114 source,
1115 })?;
1116 let scheduler = DagScheduler::new(
1117 spec.max_in_flight
1118 .unwrap_or(self.options.scheduler_max_in_flight),
1119 );
1120 let parallel_node_id = node.id.clone();
1121
1122 let branch_outputs: Vec<(String, Value, Vec<WorkflowRetryEvent>)> = scheduler
1123 .run_bounded(spec.branches.iter().cloned(), |branch_id| {
1124 let parallel_node_id = parallel_node_id.clone();
1125 let base_scope = base_scope.clone();
1126 async move {
1127 let branch_node = spec.node_index.get(branch_id.as_str()).ok_or_else(|| {
1128 WorkflowRuntimeError::NodeNotFound {
1129 node_id: branch_id.clone(),
1130 }
1131 })?;
1132 self.execute_parallel_branch(
1133 step,
1134 ¶llel_node_id,
1135 branch_node,
1136 base_scope,
1137 cancellation,
1138 )
1139 .await
1140 }
1141 })
1142 .await?;
1143
1144 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
1145 for (branch_id, output, branch_retry_events) in branch_outputs {
1146 retry_events.extend(branch_retry_events);
1147 scope
1148 .record_node_output(&branch_id, output.clone(), ScopeCapability::MapWrite)
1149 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1150 node_id: node.id.clone(),
1151 source,
1152 })?;
1153 outputs.insert(branch_id, output);
1154 }
1155
1156 scope
1157 .record_node_output(
1158 &node.id,
1159 Value::Object(
1160 outputs
1161 .iter()
1162 .map(|(key, value)| (key.clone(), value.clone()))
1163 .collect(),
1164 ),
1165 ScopeCapability::MapWrite,
1166 )
1167 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1168 node_id: node.id.clone(),
1169 source,
1170 })?;
1171
1172 Ok(NodeExecution {
1173 step,
1174 node_id: node.id.clone(),
1175 data: NodeExecutionData::Parallel {
1176 branches: spec.branches.to_vec(),
1177 outputs,
1178 next: spec.next.to_string(),
1179 },
1180 })
1181 }
1182
1183 fn execute_merge_node(
1184 &self,
1185 step: usize,
1186 node: &Node,
1187 spec: MergeNodeSpec<'_>,
1188 scope: &mut RuntimeScope,
1189 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1190 let mut resolved = Vec::with_capacity(spec.sources.len());
1191 for source in spec.sources {
1192 let Some(value) = scope.node_output(source).cloned() else {
1193 return Err(WorkflowRuntimeError::MissingMergeSource {
1194 node_id: node.id.clone(),
1195 source_id: source.clone(),
1196 });
1197 };
1198 resolved.push((source.clone(), value));
1199 }
1200
1201 let output = match spec.policy {
1202 MergePolicy::First => resolved
1203 .first()
1204 .map(|(_, value)| value.clone())
1205 .unwrap_or(Value::Null),
1206 MergePolicy::All => Value::Array(
1207 resolved
1208 .iter()
1209 .map(|(_, value)| value.clone())
1210 .collect::<Vec<_>>(),
1211 ),
1212 MergePolicy::Quorum => {
1213 let required = spec.quorum.unwrap_or_default();
1214 let resolved_count = resolved.len();
1215 if resolved_count < required {
1216 return Err(WorkflowRuntimeError::MergeQuorumNotMet {
1217 node_id: node.id.clone(),
1218 required,
1219 resolved: resolved_count,
1220 });
1221 }
1222 Value::Array(
1223 resolved
1224 .iter()
1225 .take(required)
1226 .map(|(_, value)| value.clone())
1227 .collect::<Vec<_>>(),
1228 )
1229 }
1230 };
1231
1232 scope
1233 .record_node_output(&node.id, output.clone(), ScopeCapability::ReduceWrite)
1234 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1235 node_id: node.id.clone(),
1236 source,
1237 })?;
1238
1239 Ok(NodeExecution {
1240 step,
1241 node_id: node.id.clone(),
1242 data: NodeExecutionData::Merge {
1243 policy: spec.policy.clone(),
1244 sources: spec.sources.to_vec(),
1245 output,
1246 next: spec.next.to_string(),
1247 },
1248 })
1249 }
1250
1251 async fn execute_map_node(
1252 &self,
1253 step: usize,
1254 node: &Node,
1255 spec: MapNodeSpec<'_>,
1256 scope: &mut RuntimeScope,
1257 cancellation: Option<&dyn CancellationSignal>,
1258 retry_events: &mut Vec<WorkflowRetryEvent>,
1259 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1260 let executor =
1261 self.tool_executor
1262 .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
1263 node_id: node.id.clone(),
1264 })?;
1265
1266 let scoped_input = scope
1267 .scoped_input(ScopeCapability::MapRead)
1268 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1269 node_id: node.id.clone(),
1270 source,
1271 })?;
1272 let items = resolve_path(&scoped_input, spec.items_path)
1273 .and_then(Value::as_array)
1274 .ok_or_else(|| WorkflowRuntimeError::MapItemsNotArray {
1275 node_id: node.id.clone(),
1276 items_path: spec.items_path.to_string(),
1277 })?
1278 .clone();
1279 if items.len() > self.options.security_limits.max_map_items {
1280 return Err(WorkflowRuntimeError::MapItemLimitExceeded {
1281 node_id: node.id.clone(),
1282 actual_items: items.len(),
1283 max_items: self.options.security_limits.max_map_items,
1284 });
1285 }
1286
1287 let scheduler = DagScheduler::new(
1288 spec.max_in_flight
1289 .unwrap_or(self.options.scheduler_max_in_flight),
1290 );
1291 let map_node = node.clone();
1292 let mapped: Vec<(Value, Vec<WorkflowRetryEvent>)> = scheduler
1293 .run_bounded(items.into_iter().enumerate(), |(index, item)| {
1294 let scoped_input = scoped_input.clone();
1295 let map_node = map_node.clone();
1296 async move {
1297 let item_scope = map_item_scoped_input(&scoped_input, &item, index);
1298 let (output, retries) = self
1299 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1300 step,
1301 node: &map_node,
1302 tool: spec.tool,
1303 input: &item,
1304 executor,
1305 scoped_input: item_scope,
1306 cancellation,
1307 })
1308 .await?;
1309 Ok::<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError>((output, retries))
1310 }
1311 })
1312 .await?;
1313
1314 let mut outputs = Vec::with_capacity(mapped.len());
1315 for (output, local_retries) in mapped {
1316 outputs.push(output);
1317 retry_events.extend(local_retries);
1318 }
1319
1320 let output = Value::Array(outputs);
1321 scope
1322 .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1323 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1324 node_id: node.id.clone(),
1325 source,
1326 })?;
1327
1328 Ok(NodeExecution {
1329 step,
1330 node_id: node.id.clone(),
1331 data: NodeExecutionData::Map {
1332 item_count: output.as_array().map_or(0, Vec::len),
1333 output,
1334 next: spec.next.to_string(),
1335 },
1336 })
1337 }
1338
1339 fn execute_reduce_node(
1340 &self,
1341 step: usize,
1342 node: &Node,
1343 source: &str,
1344 operation: &ReduceOperation,
1345 next: &str,
1346 scope: &mut RuntimeScope,
1347 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1348 let source_value = scope.node_output(source).cloned().ok_or_else(|| {
1349 WorkflowRuntimeError::MissingMergeSource {
1350 node_id: node.id.clone(),
1351 source_id: source.to_string(),
1352 }
1353 })?;
1354
1355 let reduced = reduce_value(&node.id, source, operation, source_value)?;
1356 scope
1357 .record_node_output(&node.id, reduced.clone(), ScopeCapability::ReduceWrite)
1358 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1359 node_id: node.id.clone(),
1360 source,
1361 })?;
1362
1363 Ok(NodeExecution {
1364 step,
1365 node_id: node.id.clone(),
1366 data: NodeExecutionData::Reduce {
1367 operation: operation.clone(),
1368 output: reduced,
1369 next: next.to_string(),
1370 },
1371 })
1372 }
1373
1374 fn execute_batch_node(
1375 &self,
1376 step: usize,
1377 node: &Node,
1378 items_path: &str,
1379 next: &str,
1380 scope: &mut RuntimeScope,
1381 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1382 let scoped = scope
1383 .scoped_input(ScopeCapability::MapRead)
1384 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1385 node_id: node.id.clone(),
1386 source,
1387 })?;
1388 let items = resolve_path(&scoped, items_path).ok_or_else(|| {
1389 WorkflowRuntimeError::BatchItemsNotArray {
1390 node_id: node.id.clone(),
1391 items_path: items_path.to_string(),
1392 }
1393 })?;
1394 let array = items
1395 .as_array()
1396 .ok_or_else(|| WorkflowRuntimeError::BatchItemsNotArray {
1397 node_id: node.id.clone(),
1398 items_path: items_path.to_string(),
1399 })?;
1400
1401 let output = Value::Array(array.clone());
1402 scope
1403 .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1404 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1405 node_id: node.id.clone(),
1406 source,
1407 })?;
1408
1409 Ok(NodeExecution {
1410 step,
1411 node_id: node.id.clone(),
1412 data: NodeExecutionData::Batch {
1413 items_path: items_path.to_string(),
1414 item_count: array.len(),
1415 next: next.to_string(),
1416 },
1417 })
1418 }
1419
1420 fn execute_filter_node(
1421 &self,
1422 step: usize,
1423 node: &Node,
1424 items_path: &str,
1425 expression: &str,
1426 next: &str,
1427 scope: &mut RuntimeScope,
1428 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1429 let scoped = scope
1430 .scoped_input(ScopeCapability::MapRead)
1431 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1432 node_id: node.id.clone(),
1433 source,
1434 })?;
1435 let items_value = resolve_path(&scoped, items_path).ok_or_else(|| {
1436 WorkflowRuntimeError::FilterItemsNotArray {
1437 node_id: node.id.clone(),
1438 items_path: items_path.to_string(),
1439 }
1440 })?;
1441 let array =
1442 items_value
1443 .as_array()
1444 .ok_or_else(|| WorkflowRuntimeError::FilterItemsNotArray {
1445 node_id: node.id.clone(),
1446 items_path: items_path.to_string(),
1447 })?;
1448 if array.len() > self.options.security_limits.max_filter_items {
1449 return Err(WorkflowRuntimeError::FilterItemLimitExceeded {
1450 node_id: node.id.clone(),
1451 actual_items: array.len(),
1452 max_items: self.options.security_limits.max_filter_items,
1453 });
1454 }
1455
1456 let mut kept = Vec::new();
1457 for (index, item) in array.iter().enumerate() {
1458 let mut eval_scope = scoped.clone();
1459 if let Some(object) = eval_scope.as_object_mut() {
1460 object.insert("item".to_string(), item.clone());
1461 object.insert("item_index".to_string(), Value::from(index as u64));
1462 }
1463 enforce_expression_scope_budget(
1464 &node.id,
1465 &eval_scope,
1466 self.options.security_limits.max_expression_scope_bytes,
1467 )?;
1468 let include =
1469 expressions::evaluate_bool(expression, &eval_scope).map_err(|reason| {
1470 WorkflowRuntimeError::InvalidFilterExpression {
1471 node_id: node.id.clone(),
1472 expression: expression.to_string(),
1473 reason: reason.to_string(),
1474 }
1475 })?;
1476 if include {
1477 kept.push(item.clone());
1478 }
1479 }
1480 let output = Value::Array(kept.clone());
1481 scope
1482 .record_node_output(&node.id, output, ScopeCapability::MapWrite)
1483 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1484 node_id: node.id.clone(),
1485 source,
1486 })?;
1487
1488 Ok(NodeExecution {
1489 step,
1490 node_id: node.id.clone(),
1491 data: NodeExecutionData::Filter {
1492 items_path: items_path.to_string(),
1493 expression: expression.to_string(),
1494 kept: kept.len(),
1495 next: next.to_string(),
1496 },
1497 })
1498 }
1499
1500 async fn execute_llm_with_policy(
1501 &self,
1502 step: usize,
1503 node: &Node,
1504 model: &str,
1505 prompt: &str,
1506 scope: &RuntimeScope,
1507 cancellation: Option<&dyn CancellationSignal>,
1508 ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1509 let scoped_input = scope
1510 .scoped_input(ScopeCapability::LlmRead)
1511 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1512 node_id: node.id.clone(),
1513 source,
1514 })?;
1515
1516 self.execute_llm_with_policy_for_scope(
1517 step,
1518 node,
1519 model,
1520 prompt,
1521 scoped_input,
1522 cancellation,
1523 )
1524 .await
1525 }
1526
1527 async fn execute_parallel_branch(
1528 &self,
1529 step: usize,
1530 parallel_node_id: &str,
1531 branch_node: &Node,
1532 scoped_input: Value,
1533 cancellation: Option<&dyn CancellationSignal>,
1534 ) -> Result<(String, Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1535 match &branch_node.kind {
1536 NodeKind::Llm {
1537 model,
1538 prompt,
1539 next: _,
1540 } => {
1541 let (output, retries) = self
1542 .execute_llm_with_policy_for_scope(
1543 step,
1544 branch_node,
1545 model,
1546 prompt,
1547 scoped_input,
1548 cancellation,
1549 )
1550 .await?;
1551 Ok((
1552 branch_node.id.clone(),
1553 Value::String(output.content),
1554 retries,
1555 ))
1556 }
1557 NodeKind::Tool { tool, input, .. } => {
1558 let executor = self.tool_executor.ok_or_else(|| {
1559 WorkflowRuntimeError::MissingToolExecutor {
1560 node_id: branch_node.id.clone(),
1561 }
1562 })?;
1563 let (output, retries) = self
1564 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1565 step,
1566 node: branch_node,
1567 tool,
1568 input,
1569 executor,
1570 scoped_input,
1571 cancellation,
1572 })
1573 .await?;
1574 Ok((branch_node.id.clone(), output, retries))
1575 }
1576 _ => Err(WorkflowRuntimeError::ParallelBranchUnsupported {
1577 node_id: parallel_node_id.to_string(),
1578 branch_id: branch_node.id.clone(),
1579 reason: "only llm/tool branches are supported".to_string(),
1580 }),
1581 }
1582 }
1583
1584 async fn execute_llm_with_policy_for_scope(
1585 &self,
1586 step: usize,
1587 node: &Node,
1588 model: &str,
1589 prompt: &str,
1590 scoped_input: Value,
1591 cancellation: Option<&dyn CancellationSignal>,
1592 ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1593 let max_attempts = self.options.llm_node_policy.max_retries.saturating_add(1);
1594 let mut retry_events = Vec::new();
1595
1596 for attempt in 1..=max_attempts {
1597 check_cancelled(cancellation)?;
1598
1599 let execution = self.llm_executor.execute(LlmExecutionInput {
1600 node_id: node.id.clone(),
1601 model: model.to_string(),
1602 prompt: prompt.to_string(),
1603 scoped_input: scoped_input.clone(),
1604 });
1605
1606 let outcome = if let Some(timeout_duration) = self.options.llm_node_policy.timeout {
1607 match timeout(timeout_duration, execution).await {
1608 Ok(result) => result,
1609 Err(_) => {
1610 if attempt == max_attempts {
1611 return Err(WorkflowRuntimeError::LlmTimeout {
1612 node_id: node.id.clone(),
1613 timeout_ms: timeout_duration.as_millis(),
1614 attempts: attempt,
1615 });
1616 }
1617 retry_events.push(WorkflowRetryEvent {
1618 step,
1619 node_id: node.id.clone(),
1620 operation: "llm".to_string(),
1621 failed_attempt: attempt,
1622 reason: format!(
1623 "attempt {} timed out after {} ms",
1624 attempt,
1625 timeout_duration.as_millis()
1626 ),
1627 });
1628 check_cancelled(cancellation)?;
1629 continue;
1630 }
1631 }
1632 } else {
1633 execution.await
1634 };
1635
1636 match outcome {
1637 Ok(output) => return Ok((output, retry_events)),
1638 Err(last_error) => {
1639 if attempt == max_attempts {
1640 return Err(WorkflowRuntimeError::LlmRetryExhausted {
1641 node_id: node.id.clone(),
1642 attempts: attempt,
1643 last_error,
1644 });
1645 }
1646 retry_events.push(WorkflowRetryEvent {
1647 step,
1648 node_id: node.id.clone(),
1649 operation: "llm".to_string(),
1650 failed_attempt: attempt,
1651 reason: last_error.to_string(),
1652 });
1653 check_cancelled(cancellation)?;
1654 }
1655 }
1656 }
1657
1658 unreachable!("llm attempts loop always returns")
1659 }
1660
1661 async fn execute_tool_with_policy_for_scope(
1662 &self,
1663 request: ToolPolicyRequest<'_>,
1664 ) -> Result<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1665 let ToolPolicyRequest {
1666 step,
1667 node,
1668 tool,
1669 input,
1670 executor,
1671 scoped_input,
1672 cancellation,
1673 } = request;
1674 let max_attempts = self.options.tool_node_policy.max_retries.saturating_add(1);
1675 let mut retry_events = Vec::new();
1676
1677 for attempt in 1..=max_attempts {
1678 check_cancelled(cancellation)?;
1679
1680 let execution = executor.execute_tool(ToolExecutionInput {
1681 node_id: node.id.clone(),
1682 tool: tool.to_string(),
1683 input: input.clone(),
1684 scoped_input: scoped_input.clone(),
1685 });
1686
1687 let outcome = if let Some(timeout_duration) = self.options.tool_node_policy.timeout {
1688 match timeout(timeout_duration, execution).await {
1689 Ok(result) => result,
1690 Err(_) => {
1691 if attempt == max_attempts {
1692 return Err(WorkflowRuntimeError::ToolTimeout {
1693 node_id: node.id.clone(),
1694 timeout_ms: timeout_duration.as_millis(),
1695 attempts: attempt,
1696 });
1697 }
1698 retry_events.push(WorkflowRetryEvent {
1699 step,
1700 node_id: node.id.clone(),
1701 operation: "tool".to_string(),
1702 failed_attempt: attempt,
1703 reason: format!(
1704 "attempt {} timed out after {} ms",
1705 attempt,
1706 timeout_duration.as_millis()
1707 ),
1708 });
1709 check_cancelled(cancellation)?;
1710 continue;
1711 }
1712 }
1713 } else {
1714 execution.await
1715 };
1716
1717 match outcome {
1718 Ok(output) => return Ok((output, retry_events)),
1719 Err(last_error) => {
1720 if attempt == max_attempts {
1721 return Err(WorkflowRuntimeError::ToolRetryExhausted {
1722 node_id: node.id.clone(),
1723 attempts: attempt,
1724 last_error,
1725 });
1726 }
1727 retry_events.push(WorkflowRetryEvent {
1728 step,
1729 node_id: node.id.clone(),
1730 operation: "tool".to_string(),
1731 failed_attempt: attempt,
1732 reason: last_error.to_string(),
1733 });
1734 check_cancelled(cancellation)?;
1735 }
1736 }
1737 }
1738
1739 unreachable!("tool attempts loop always returns")
1740 }
1741}
1742
1743struct ToolPolicyRequest<'a> {
1744 step: usize,
1745 node: &'a Node,
1746 tool: &'a str,
1747 input: &'a Value,
1748 executor: &'a dyn ToolExecutor,
1749 scoped_input: Value,
1750 cancellation: Option<&'a dyn CancellationSignal>,
1751}
1752
1753struct LlmNodeSpec<'a> {
1754 model: &'a str,
1755 prompt: &'a str,
1756 next: &'a Option<String>,
1757}
1758
1759struct ToolNodeSpec<'a> {
1760 tool: &'a str,
1761 input: &'a Value,
1762 next: &'a Option<String>,
1763}
1764
1765struct ConditionNodeSpec<'a> {
1766 expression: &'a str,
1767 on_true: &'a str,
1768 on_false: &'a str,
1769}
1770
1771struct ParallelNodeSpec<'a> {
1772 node_index: &'a HashMap<&'a str, &'a Node>,
1773 branches: &'a [String],
1774 next: &'a str,
1775 max_in_flight: Option<usize>,
1776}
1777
1778struct MergeNodeSpec<'a> {
1779 sources: &'a [String],
1780 policy: &'a MergePolicy,
1781 quorum: Option<usize>,
1782 next: &'a str,
1783}
1784
1785struct MapNodeSpec<'a> {
1786 tool: &'a str,
1787 items_path: &'a str,
1788 next: &'a str,
1789 max_in_flight: Option<usize>,
1790}
1791
1792fn check_cancelled(
1793 cancellation: Option<&dyn CancellationSignal>,
1794) -> Result<(), WorkflowRuntimeError> {
1795 if cancellation.is_some_and(CancellationSignal::is_cancelled) {
1796 Err(WorkflowRuntimeError::Cancelled)
1797 } else {
1798 Ok(())
1799 }
1800}
1801
1802fn next_trace_timestamp(clock: &mut u64) -> u64 {
1803 let timestamp = *clock;
1804 *clock = clock.saturating_add(1);
1805 timestamp
1806}
1807
1808fn build_prompt_with_scope(prompt: &str, scoped_input: &Value) -> String {
1809 format!("{}\n\nScoped context:\n{}", prompt, scoped_input)
1810}
1811
1812fn build_node_index(workflow: &WorkflowDefinition) -> HashMap<&str, &Node> {
1813 let mut index = HashMap::with_capacity(workflow.nodes.len());
1814 for node in &workflow.nodes {
1815 index.insert(node.id.as_str(), node);
1816 }
1817 index
1818}
1819
1820fn find_start_node_id(workflow: &WorkflowDefinition) -> Result<String, WorkflowRuntimeError> {
1821 workflow
1822 .nodes
1823 .iter()
1824 .find_map(|node| match node.kind {
1825 NodeKind::Start { .. } => Some(node.id.clone()),
1826 _ => None,
1827 })
1828 .ok_or(WorkflowRuntimeError::MissingStartNode)
1829}
1830
1831fn next_node_id(data: &NodeExecutionData) -> Option<String> {
1832 match data {
1833 NodeExecutionData::Start { next }
1834 | NodeExecutionData::Llm { next, .. }
1835 | NodeExecutionData::Tool { next, .. }
1836 | NodeExecutionData::Condition { next, .. }
1837 | NodeExecutionData::Debounce { next, .. }
1838 | NodeExecutionData::Throttle { next, .. }
1839 | NodeExecutionData::RetryCompensate { next, .. }
1840 | NodeExecutionData::HumanInTheLoop { next, .. }
1841 | NodeExecutionData::CacheRead { next, .. }
1842 | NodeExecutionData::CacheWrite { next, .. }
1843 | NodeExecutionData::EventTrigger { next, .. }
1844 | NodeExecutionData::Router { next, .. }
1845 | NodeExecutionData::Transform { next, .. }
1846 | NodeExecutionData::Loop { next, .. }
1847 | NodeExecutionData::Subgraph { next, .. }
1848 | NodeExecutionData::Batch { next, .. }
1849 | NodeExecutionData::Filter { next, .. }
1850 | NodeExecutionData::Parallel { next, .. }
1851 | NodeExecutionData::Merge { next, .. }
1852 | NodeExecutionData::Map { next, .. }
1853 | NodeExecutionData::Reduce { next, .. } => Some(next.clone()),
1854 NodeExecutionData::End => None,
1855 }
1856}
1857
1858fn resolve_path<'a>(scope: &'a Value, path: &str) -> Option<&'a Value> {
1859 let mut current = scope;
1860 for segment in path.split('.') {
1861 if segment.is_empty() {
1862 continue;
1863 }
1864 current = current.get(segment)?;
1865 }
1866 Some(current)
1867}
1868
1869fn resolve_string_path(scope: &Value, path: &str) -> Option<String> {
1870 resolve_path(scope, path)
1871 .and_then(Value::as_str)
1872 .map(str::to_string)
1873}
1874
1875fn evaluate_human_decision(value: &Value) -> Option<bool> {
1876 match value {
1877 Value::Bool(flag) => Some(*flag),
1878 Value::String(text) => {
1879 let normalized = text.trim().to_ascii_lowercase();
1880 match normalized.as_str() {
1881 "approve" | "approved" | "yes" | "true" => Some(true),
1882 "reject" | "rejected" | "no" | "false" => Some(false),
1883 _ => None,
1884 }
1885 }
1886 _ => None,
1887 }
1888}
1889
1890fn evaluate_transform_expression(expression: &str, scope: &Value) -> Result<Value, String> {
1891 let trimmed = expression.trim();
1892 if trimmed.is_empty() {
1893 return Err("expression is empty".to_string());
1894 }
1895
1896 if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
1897 return Ok(value);
1898 }
1899
1900 let path = trimmed.strip_prefix("$.").unwrap_or(trimmed);
1901 resolve_path(scope, path)
1902 .cloned()
1903 .ok_or_else(|| format!("path '{path}' not found in scoped input"))
1904}
1905
1906fn map_item_scoped_input(base_scope: &Value, item: &Value, index: usize) -> Value {
1907 let mut object = match base_scope {
1908 Value::Object(map) => map.clone(),
1909 _ => Map::new(),
1910 };
1911 object.insert("map_item".to_string(), item.clone());
1912 object.insert("map_index".to_string(), Value::from(index as u64));
1913 Value::Object(object)
1914}
1915
1916fn enforce_expression_scope_budget(
1917 node_id: &str,
1918 scoped_input: &Value,
1919 max_expression_scope_bytes: usize,
1920) -> Result<(), WorkflowRuntimeError> {
1921 let size = serde_json::to_vec(scoped_input)
1922 .map(|bytes| bytes.len())
1923 .unwrap_or(max_expression_scope_bytes.saturating_add(1));
1924 if size > max_expression_scope_bytes {
1925 return Err(WorkflowRuntimeError::ExpressionScopeLimitExceeded {
1926 node_id: node_id.to_string(),
1927 actual_bytes: size,
1928 limit_bytes: max_expression_scope_bytes,
1929 });
1930 }
1931 Ok(())
1932}
1933
1934fn reduce_value(
1935 node_id: &str,
1936 source: &str,
1937 operation: &ReduceOperation,
1938 source_value: Value,
1939) -> Result<Value, WorkflowRuntimeError> {
1940 let items =
1941 source_value
1942 .as_array()
1943 .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
1944 node_id: node_id.to_string(),
1945 source_node: source.to_string(),
1946 reason: "expected source output to be an array".to_string(),
1947 })?;
1948
1949 match operation {
1950 ReduceOperation::Count => Ok(Value::from(items.len() as u64)),
1951 ReduceOperation::Sum => {
1952 let mut sum = 0.0f64;
1953 for value in items {
1954 let number =
1955 value
1956 .as_f64()
1957 .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
1958 node_id: node_id.to_string(),
1959 source_node: source.to_string(),
1960 reason: "sum operation requires numeric array values".to_string(),
1961 })?;
1962 sum += number;
1963 }
1964 let number = serde_json::Number::from_f64(sum).ok_or_else(|| {
1965 WorkflowRuntimeError::InvalidReduceInput {
1966 node_id: node_id.to_string(),
1967 source_node: source.to_string(),
1968 reason: "sum produced non-finite value".to_string(),
1969 }
1970 })?;
1971 Ok(Value::Number(number))
1972 }
1973 }
1974}
1975
1976#[cfg(test)]
1977mod tests {
1978 use std::sync::atomic::{AtomicUsize, Ordering};
1979 use std::sync::{Arc, Mutex};
1980 use std::time::Duration;
1981
1982 use async_trait::async_trait;
1983 use serde_json::json;
1984 use tokio::time::sleep;
1985
1986 use super::*;
1987 use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
1988
1989 struct MockLlmExecutor {
1990 output: String,
1991 }
1992
1993 #[async_trait]
1994 impl LlmExecutor for MockLlmExecutor {
1995 async fn execute(
1996 &self,
1997 _input: LlmExecutionInput,
1998 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
1999 Ok(LlmExecutionOutput {
2000 content: self.output.clone(),
2001 })
2002 }
2003 }
2004
2005 struct MockToolExecutor {
2006 output: Value,
2007 fail: bool,
2008 }
2009
2010 #[async_trait]
2011 impl ToolExecutor for MockToolExecutor {
2012 async fn execute_tool(
2013 &self,
2014 input: ToolExecutionInput,
2015 ) -> Result<Value, ToolExecutionError> {
2016 if self.fail {
2017 return Err(ToolExecutionError::Failed(format!(
2018 "tool '{}' failed intentionally",
2019 input.tool
2020 )));
2021 }
2022 Ok(self.output.clone())
2023 }
2024 }
2025
2026 struct SequencedLlmExecutor {
2027 responses: Mutex<Vec<Result<LlmExecutionOutput, LlmExecutionError>>>,
2028 calls: AtomicUsize,
2029 }
2030
2031 #[async_trait]
2032 impl LlmExecutor for SequencedLlmExecutor {
2033 async fn execute(
2034 &self,
2035 _input: LlmExecutionInput,
2036 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2037 self.calls.fetch_add(1, Ordering::Relaxed);
2038 self.responses
2039 .lock()
2040 .expect("sequenced llm lock poisoned")
2041 .remove(0)
2042 }
2043 }
2044
2045 struct SlowToolExecutor {
2046 delay: Duration,
2047 }
2048
2049 #[async_trait]
2050 impl ToolExecutor for SlowToolExecutor {
2051 async fn execute_tool(
2052 &self,
2053 _input: ToolExecutionInput,
2054 ) -> Result<Value, ToolExecutionError> {
2055 sleep(self.delay).await;
2056 Ok(json!({"status": "slow-ok"}))
2057 }
2058 }
2059
2060 struct IncrementingToolExecutor {
2061 value: AtomicUsize,
2062 }
2063
2064 #[async_trait]
2065 impl ToolExecutor for IncrementingToolExecutor {
2066 async fn execute_tool(
2067 &self,
2068 _input: ToolExecutionInput,
2069 ) -> Result<Value, ToolExecutionError> {
2070 let next = self.value.fetch_add(1, Ordering::Relaxed) + 1;
2071 Ok(json!(next))
2072 }
2073 }
2074
2075 struct EchoInputToolExecutor;
2076
2077 #[async_trait]
2078 impl ToolExecutor for EchoInputToolExecutor {
2079 async fn execute_tool(
2080 &self,
2081 input: ToolExecutionInput,
2082 ) -> Result<Value, ToolExecutionError> {
2083 Ok(input.input)
2084 }
2085 }
2086
2087 struct RetryCompensateToolExecutor {
2088 attempts: AtomicUsize,
2089 }
2090
2091 #[async_trait]
2092 impl ToolExecutor for RetryCompensateToolExecutor {
2093 async fn execute_tool(
2094 &self,
2095 input: ToolExecutionInput,
2096 ) -> Result<Value, ToolExecutionError> {
2097 match input.tool.as_str() {
2098 "unstable_primary" => {
2099 let current = self.attempts.fetch_add(1, Ordering::Relaxed) + 1;
2100 if current <= 1 {
2101 Err(ToolExecutionError::Failed("primary failed".to_string()))
2102 } else {
2103 Ok(json!({"primary_attempt": current}))
2104 }
2105 }
2106 "always_fail" => Err(ToolExecutionError::Failed("always fail".to_string())),
2107 "compensate" => Ok(json!({"compensated": true})),
2108 _ => Err(ToolExecutionError::NotFound {
2109 tool: input.tool.clone(),
2110 }),
2111 }
2112 }
2113 }
2114
2115 struct CancellingLlmExecutor {
2116 cancel_flag: Arc<AtomicBool>,
2117 calls: AtomicUsize,
2118 }
2119
2120 #[async_trait]
2121 impl LlmExecutor for CancellingLlmExecutor {
2122 async fn execute(
2123 &self,
2124 _input: LlmExecutionInput,
2125 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2126 self.calls.fetch_add(1, Ordering::Relaxed);
2127 self.cancel_flag.store(true, Ordering::Relaxed);
2128 Err(LlmExecutionError::Client("transient failure".to_string()))
2129 }
2130 }
2131
2132 fn linear_workflow() -> WorkflowDefinition {
2133 WorkflowDefinition {
2134 version: "v0".to_string(),
2135 name: "linear".to_string(),
2136 nodes: vec![
2137 Node {
2138 id: "start".to_string(),
2139 kind: NodeKind::Start {
2140 next: "llm".to_string(),
2141 },
2142 },
2143 Node {
2144 id: "llm".to_string(),
2145 kind: NodeKind::Llm {
2146 model: "gpt-4".to_string(),
2147 prompt: "Summarize".to_string(),
2148 next: Some("tool".to_string()),
2149 },
2150 },
2151 Node {
2152 id: "tool".to_string(),
2153 kind: NodeKind::Tool {
2154 tool: "extract".to_string(),
2155 input: json!({"k": "v"}),
2156 next: Some("end".to_string()),
2157 },
2158 },
2159 Node {
2160 id: "end".to_string(),
2161 kind: NodeKind::End,
2162 },
2163 ],
2164 }
2165 }
2166
2167 fn llm_only_workflow() -> WorkflowDefinition {
2168 WorkflowDefinition {
2169 version: "v0".to_string(),
2170 name: "llm-only".to_string(),
2171 nodes: vec![
2172 Node {
2173 id: "start".to_string(),
2174 kind: NodeKind::Start {
2175 next: "llm".to_string(),
2176 },
2177 },
2178 Node {
2179 id: "llm".to_string(),
2180 kind: NodeKind::Llm {
2181 model: "gpt-4".to_string(),
2182 prompt: "Summarize".to_string(),
2183 next: Some("end".to_string()),
2184 },
2185 },
2186 Node {
2187 id: "end".to_string(),
2188 kind: NodeKind::End,
2189 },
2190 ],
2191 }
2192 }
2193
2194 fn loop_workflow(max_iterations: Option<u32>) -> WorkflowDefinition {
2195 WorkflowDefinition {
2196 version: "v0".to_string(),
2197 name: "loop-workflow".to_string(),
2198 nodes: vec![
2199 Node {
2200 id: "start".to_string(),
2201 kind: NodeKind::Start {
2202 next: "loop".to_string(),
2203 },
2204 },
2205 Node {
2206 id: "loop".to_string(),
2207 kind: NodeKind::Loop {
2208 condition: "last_tool_output != 3".to_string(),
2209 body: "counter".to_string(),
2210 next: "end".to_string(),
2211 max_iterations,
2212 },
2213 },
2214 Node {
2215 id: "counter".to_string(),
2216 kind: NodeKind::Tool {
2217 tool: "counter".to_string(),
2218 input: json!({}),
2219 next: Some("loop".to_string()),
2220 },
2221 },
2222 Node {
2223 id: "end".to_string(),
2224 kind: NodeKind::End,
2225 },
2226 ],
2227 }
2228 }
2229
2230 fn parallel_merge_workflow(policy: MergePolicy, quorum: Option<usize>) -> WorkflowDefinition {
2231 WorkflowDefinition {
2232 version: "v0".to_string(),
2233 name: "parallel-merge".to_string(),
2234 nodes: vec![
2235 Node {
2236 id: "start".to_string(),
2237 kind: NodeKind::Start {
2238 next: "parallel".to_string(),
2239 },
2240 },
2241 Node {
2242 id: "parallel".to_string(),
2243 kind: NodeKind::Parallel {
2244 branches: vec!["tool_a".to_string(), "tool_b".to_string()],
2245 next: "merge".to_string(),
2246 max_in_flight: Some(2),
2247 },
2248 },
2249 Node {
2250 id: "tool_a".to_string(),
2251 kind: NodeKind::Tool {
2252 tool: "extract".to_string(),
2253 input: json!({"value": 1}),
2254 next: Some("end".to_string()),
2255 },
2256 },
2257 Node {
2258 id: "tool_b".to_string(),
2259 kind: NodeKind::Tool {
2260 tool: "extract".to_string(),
2261 input: json!({"value": 2}),
2262 next: Some("end".to_string()),
2263 },
2264 },
2265 Node {
2266 id: "merge".to_string(),
2267 kind: NodeKind::Merge {
2268 sources: vec!["tool_a".to_string(), "tool_b".to_string()],
2269 policy,
2270 quorum,
2271 next: "end".to_string(),
2272 },
2273 },
2274 Node {
2275 id: "end".to_string(),
2276 kind: NodeKind::End,
2277 },
2278 ],
2279 }
2280 }
2281
2282 fn map_reduce_workflow(operation: ReduceOperation) -> WorkflowDefinition {
2283 WorkflowDefinition {
2284 version: "v0".to_string(),
2285 name: "map-reduce".to_string(),
2286 nodes: vec![
2287 Node {
2288 id: "start".to_string(),
2289 kind: NodeKind::Start {
2290 next: "map".to_string(),
2291 },
2292 },
2293 Node {
2294 id: "map".to_string(),
2295 kind: NodeKind::Map {
2296 tool: "counter".to_string(),
2297 items_path: "input.values".to_string(),
2298 next: "reduce".to_string(),
2299 max_in_flight: Some(3),
2300 },
2301 },
2302 Node {
2303 id: "reduce".to_string(),
2304 kind: NodeKind::Reduce {
2305 source: "map".to_string(),
2306 operation,
2307 next: "end".to_string(),
2308 },
2309 },
2310 Node {
2311 id: "end".to_string(),
2312 kind: NodeKind::End,
2313 },
2314 ],
2315 }
2316 }
2317
2318 fn debounce_and_throttle_workflow() -> WorkflowDefinition {
2319 WorkflowDefinition {
2320 version: "v0".to_string(),
2321 name: "debounce-throttle".to_string(),
2322 nodes: vec![
2323 Node {
2324 id: "start".to_string(),
2325 kind: NodeKind::Start {
2326 next: "debounce_a".to_string(),
2327 },
2328 },
2329 Node {
2330 id: "debounce_a".to_string(),
2331 kind: NodeKind::Debounce {
2332 key_path: "input.key".to_string(),
2333 window_steps: 3,
2334 next: "debounce_a".to_string(),
2335 on_suppressed: Some("throttle_a".to_string()),
2336 },
2337 },
2338 Node {
2339 id: "throttle_a".to_string(),
2340 kind: NodeKind::Throttle {
2341 key_path: "input.key".to_string(),
2342 window_steps: 3,
2343 next: "throttle_a".to_string(),
2344 on_throttled: Some("end_throttled".to_string()),
2345 },
2346 },
2347 Node {
2348 id: "end_throttled".to_string(),
2349 kind: NodeKind::End,
2350 },
2351 ],
2352 }
2353 }
2354
2355 fn extended_nodes_workflow() -> WorkflowDefinition {
2356 WorkflowDefinition {
2357 version: "v0".to_string(),
2358 name: "extended-nodes".to_string(),
2359 nodes: vec![
2360 Node {
2361 id: "start".to_string(),
2362 kind: NodeKind::Start {
2363 next: "event".to_string(),
2364 },
2365 },
2366 Node {
2367 id: "event".to_string(),
2368 kind: NodeKind::EventTrigger {
2369 event: "webhook".to_string(),
2370 event_path: "input.event_type".to_string(),
2371 next: "cache_write".to_string(),
2372 on_mismatch: Some("end_mismatch".to_string()),
2373 },
2374 },
2375 Node {
2376 id: "cache_write".to_string(),
2377 kind: NodeKind::CacheWrite {
2378 key_path: "input.cache_key".to_string(),
2379 value_path: "input.payload".to_string(),
2380 next: "cache_read".to_string(),
2381 },
2382 },
2383 Node {
2384 id: "cache_read".to_string(),
2385 kind: NodeKind::CacheRead {
2386 key_path: "input.cache_key".to_string(),
2387 next: "router".to_string(),
2388 on_miss: Some("end_miss".to_string()),
2389 },
2390 },
2391 Node {
2392 id: "router".to_string(),
2393 kind: NodeKind::Router {
2394 routes: vec![crate::ir::RouterRoute {
2395 when: "input.mode == 'manual'".to_string(),
2396 next: "human".to_string(),
2397 }],
2398 default: "transform".to_string(),
2399 },
2400 },
2401 Node {
2402 id: "human".to_string(),
2403 kind: NodeKind::HumanInTheLoop {
2404 decision_path: "input.approval".to_string(),
2405 response_path: Some("input.review_notes".to_string()),
2406 on_approve: "transform".to_string(),
2407 on_reject: "end_rejected".to_string(),
2408 },
2409 },
2410 Node {
2411 id: "transform".to_string(),
2412 kind: NodeKind::Transform {
2413 expression: "node_outputs.cache_read.value".to_string(),
2414 next: "end".to_string(),
2415 },
2416 },
2417 Node {
2418 id: "end".to_string(),
2419 kind: NodeKind::End,
2420 },
2421 Node {
2422 id: "end_mismatch".to_string(),
2423 kind: NodeKind::End,
2424 },
2425 Node {
2426 id: "end_miss".to_string(),
2427 kind: NodeKind::End,
2428 },
2429 Node {
2430 id: "end_rejected".to_string(),
2431 kind: NodeKind::End,
2432 },
2433 ],
2434 }
2435 }
2436
2437 fn retry_compensate_workflow(primary_tool: &str) -> WorkflowDefinition {
2438 WorkflowDefinition {
2439 version: "v0".to_string(),
2440 name: "retry-compensate".to_string(),
2441 nodes: vec![
2442 Node {
2443 id: "start".to_string(),
2444 kind: NodeKind::Start {
2445 next: "retry_comp".to_string(),
2446 },
2447 },
2448 Node {
2449 id: "retry_comp".to_string(),
2450 kind: NodeKind::RetryCompensate {
2451 tool: primary_tool.to_string(),
2452 input: json!({"job": "run"}),
2453 max_retries: 1,
2454 compensate_tool: "compensate".to_string(),
2455 compensate_input: json!({"job": "rollback"}),
2456 next: "end".to_string(),
2457 on_compensated: Some("end_compensated".to_string()),
2458 },
2459 },
2460 Node {
2461 id: "end".to_string(),
2462 kind: NodeKind::End,
2463 },
2464 Node {
2465 id: "end_compensated".to_string(),
2466 kind: NodeKind::End,
2467 },
2468 ],
2469 }
2470 }
2471
2472 #[tokio::test]
2473 async fn executes_happy_path_linear_flow() {
2474 let llm = MockLlmExecutor {
2475 output: "ok".to_string(),
2476 };
2477 let tools = MockToolExecutor {
2478 output: json!({"status": "done"}),
2479 fail: false,
2480 };
2481 let runtime = WorkflowRuntime::new(
2482 linear_workflow(),
2483 &llm,
2484 Some(&tools),
2485 WorkflowRuntimeOptions::default(),
2486 );
2487
2488 let result = runtime
2489 .execute(json!({"request_id": "r1"}), None)
2490 .await
2491 .expect("linear workflow should succeed");
2492
2493 assert_eq!(result.workflow_name, "linear");
2494 assert_eq!(result.terminal_node_id, "end");
2495 assert_eq!(result.node_executions.len(), 4);
2496 assert_eq!(
2497 result.node_outputs.get("llm"),
2498 Some(&Value::String("ok".to_string()))
2499 );
2500 assert_eq!(
2501 result.node_outputs.get("tool"),
2502 Some(&json!({"status": "done"}))
2503 );
2504 assert_eq!(result.events.len(), 8);
2505 assert!(result.retry_events.is_empty());
2506 assert!(result.trace.is_some());
2507 assert_eq!(result.replay_report, None);
2508 }
2509
2510 #[tokio::test]
2511 async fn linear_flow_records_expected_node_execution_payloads() {
2512 let llm = MockLlmExecutor {
2513 output: "ok".to_string(),
2514 };
2515 let tools = MockToolExecutor {
2516 output: json!({"status": "done"}),
2517 fail: false,
2518 };
2519 let runtime = WorkflowRuntime::new(
2520 linear_workflow(),
2521 &llm,
2522 Some(&tools),
2523 WorkflowRuntimeOptions::default(),
2524 );
2525
2526 let result = runtime
2527 .execute(json!({"request_id": "r1"}), None)
2528 .await
2529 .expect("linear workflow should succeed");
2530
2531 assert!(matches!(
2532 result.node_executions[0].data,
2533 NodeExecutionData::Start { ref next } if next == "llm"
2534 ));
2535 assert!(matches!(
2536 result.node_executions[1].data,
2537 NodeExecutionData::Llm {
2538 ref model,
2539 ref output,
2540 ref next
2541 } if model == "gpt-4" && output == "ok" && next == "tool"
2542 ));
2543 assert!(matches!(
2544 result.node_executions[2].data,
2545 NodeExecutionData::Tool {
2546 ref tool,
2547 ref next,
2548 ..
2549 } if tool == "extract" && next == "end"
2550 ));
2551 }
2552
2553 #[tokio::test]
2554 async fn executes_conditional_branching() {
2555 let workflow = WorkflowDefinition {
2556 version: "v0".to_string(),
2557 name: "conditional".to_string(),
2558 nodes: vec![
2559 Node {
2560 id: "start".to_string(),
2561 kind: NodeKind::Start {
2562 next: "condition".to_string(),
2563 },
2564 },
2565 Node {
2566 id: "condition".to_string(),
2567 kind: NodeKind::Condition {
2568 expression: "input.approved".to_string(),
2569 on_true: "end_true".to_string(),
2570 on_false: "end_false".to_string(),
2571 },
2572 },
2573 Node {
2574 id: "end_true".to_string(),
2575 kind: NodeKind::End,
2576 },
2577 Node {
2578 id: "end_false".to_string(),
2579 kind: NodeKind::End,
2580 },
2581 ],
2582 };
2583
2584 let llm = MockLlmExecutor {
2585 output: "unused".to_string(),
2586 };
2587 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
2588
2589 let result = runtime
2590 .execute(json!({"approved": true}), None)
2591 .await
2592 .expect("conditional workflow should succeed");
2593
2594 assert_eq!(result.terminal_node_id, "end_true");
2595 }
2596
2597 #[tokio::test]
2598 async fn executes_conditional_false_branch() {
2599 let workflow = WorkflowDefinition {
2600 version: "v0".to_string(),
2601 name: "conditional-false".to_string(),
2602 nodes: vec![
2603 Node {
2604 id: "start".to_string(),
2605 kind: NodeKind::Start {
2606 next: "condition".to_string(),
2607 },
2608 },
2609 Node {
2610 id: "condition".to_string(),
2611 kind: NodeKind::Condition {
2612 expression: "input.approved".to_string(),
2613 on_true: "end_true".to_string(),
2614 on_false: "end_false".to_string(),
2615 },
2616 },
2617 Node {
2618 id: "end_true".to_string(),
2619 kind: NodeKind::End,
2620 },
2621 Node {
2622 id: "end_false".to_string(),
2623 kind: NodeKind::End,
2624 },
2625 ],
2626 };
2627 let llm = MockLlmExecutor {
2628 output: "unused".to_string(),
2629 };
2630 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
2631
2632 let result = runtime
2633 .execute(json!({"approved": false}), None)
2634 .await
2635 .expect("conditional workflow should take false branch");
2636
2637 assert_eq!(result.terminal_node_id, "end_false");
2638 }
2639
2640 #[tokio::test]
2641 async fn fails_when_tool_executor_is_missing() {
2642 let llm = MockLlmExecutor {
2643 output: "ok".to_string(),
2644 };
2645 let runtime = WorkflowRuntime::new(
2646 linear_workflow(),
2647 &llm,
2648 None,
2649 WorkflowRuntimeOptions::default(),
2650 );
2651
2652 let error = runtime
2653 .execute(json!({}), None)
2654 .await
2655 .expect_err("workflow should fail without tool executor");
2656
2657 assert!(matches!(
2658 error,
2659 WorkflowRuntimeError::MissingToolExecutor { node_id } if node_id == "tool"
2660 ));
2661 }
2662
2663 #[tokio::test]
2664 async fn fails_on_tool_execution_error() {
2665 let llm = MockLlmExecutor {
2666 output: "ok".to_string(),
2667 };
2668 let tools = MockToolExecutor {
2669 output: json!({"status": "unused"}),
2670 fail: true,
2671 };
2672 let runtime = WorkflowRuntime::new(
2673 linear_workflow(),
2674 &llm,
2675 Some(&tools),
2676 WorkflowRuntimeOptions::default(),
2677 );
2678
2679 let error = runtime
2680 .execute(json!({}), None)
2681 .await
2682 .expect_err("workflow should fail on tool error");
2683
2684 assert!(matches!(
2685 error,
2686 WorkflowRuntimeError::ToolRetryExhausted { node_id, attempts: 1, .. }
2687 if node_id == "tool"
2688 ));
2689 }
2690
2691 #[tokio::test]
2692 async fn retries_llm_after_transient_failure() {
2693 let llm = SequencedLlmExecutor {
2694 responses: Mutex::new(vec![
2695 Err(LlmExecutionError::Client("temporary".to_string())),
2696 Ok(LlmExecutionOutput {
2697 content: "recovered".to_string(),
2698 }),
2699 ]),
2700 calls: AtomicUsize::new(0),
2701 };
2702 let runtime = WorkflowRuntime::new(
2703 llm_only_workflow(),
2704 &llm,
2705 None,
2706 WorkflowRuntimeOptions {
2707 llm_node_policy: NodeExecutionPolicy {
2708 timeout: None,
2709 max_retries: 1,
2710 },
2711 ..WorkflowRuntimeOptions::default()
2712 },
2713 );
2714
2715 let result = runtime
2716 .execute(json!({"request_id": "r2"}), None)
2717 .await
2718 .expect("llm retry should recover");
2719
2720 assert_eq!(result.terminal_node_id, "end");
2721 assert_eq!(result.node_outputs.get("llm"), Some(&json!("recovered")));
2722 assert_eq!(result.retry_events.len(), 1);
2723 assert_eq!(result.retry_events[0].operation, "llm");
2724 assert_eq!(llm.calls.load(Ordering::Relaxed), 2);
2725 }
2726
2727 #[tokio::test]
2728 async fn times_out_tool_execution_per_policy() {
2729 let llm = MockLlmExecutor {
2730 output: "ok".to_string(),
2731 };
2732 let tool = SlowToolExecutor {
2733 delay: Duration::from_millis(50),
2734 };
2735 let runtime = WorkflowRuntime::new(
2736 linear_workflow(),
2737 &llm,
2738 Some(&tool),
2739 WorkflowRuntimeOptions {
2740 tool_node_policy: NodeExecutionPolicy {
2741 timeout: Some(Duration::from_millis(5)),
2742 max_retries: 0,
2743 },
2744 ..WorkflowRuntimeOptions::default()
2745 },
2746 );
2747
2748 let error = runtime
2749 .execute(json!({}), None)
2750 .await
2751 .expect_err("tool execution should time out");
2752
2753 assert!(matches!(
2754 error,
2755 WorkflowRuntimeError::ToolTimeout {
2756 node_id,
2757 timeout_ms: 5,
2758 attempts: 1,
2759 } if node_id == "tool"
2760 ));
2761 }
2762
2763 #[tokio::test]
2764 async fn cancels_between_retry_attempts() {
2765 let cancel_flag = Arc::new(AtomicBool::new(false));
2766 let llm = CancellingLlmExecutor {
2767 cancel_flag: Arc::clone(&cancel_flag),
2768 calls: AtomicUsize::new(0),
2769 };
2770 let runtime = WorkflowRuntime::new(
2771 llm_only_workflow(),
2772 &llm,
2773 None,
2774 WorkflowRuntimeOptions {
2775 llm_node_policy: NodeExecutionPolicy {
2776 timeout: None,
2777 max_retries: 3,
2778 },
2779 ..WorkflowRuntimeOptions::default()
2780 },
2781 );
2782
2783 let error = runtime
2784 .execute(json!({}), Some(cancel_flag.as_ref()))
2785 .await
2786 .expect_err("workflow should stop when cancellation is observed");
2787
2788 assert!(matches!(error, WorkflowRuntimeError::Cancelled));
2789 assert_eq!(llm.calls.load(Ordering::Relaxed), 1);
2790 }
2791
2792 #[tokio::test]
2793 async fn enforces_step_limit_guard() {
2794 let workflow = WorkflowDefinition {
2795 version: "v0".to_string(),
2796 name: "loop".to_string(),
2797 nodes: vec![
2798 Node {
2799 id: "start".to_string(),
2800 kind: NodeKind::Start {
2801 next: "condition".to_string(),
2802 },
2803 },
2804 Node {
2805 id: "condition".to_string(),
2806 kind: NodeKind::Condition {
2807 expression: "true".to_string(),
2808 on_true: "condition".to_string(),
2809 on_false: "end".to_string(),
2810 },
2811 },
2812 Node {
2813 id: "end".to_string(),
2814 kind: NodeKind::End,
2815 },
2816 ],
2817 };
2818
2819 let llm = MockLlmExecutor {
2820 output: "unused".to_string(),
2821 };
2822 let runtime = WorkflowRuntime::new(
2823 workflow,
2824 &llm,
2825 None,
2826 WorkflowRuntimeOptions {
2827 max_steps: 3,
2828 ..WorkflowRuntimeOptions::default()
2829 },
2830 );
2831
2832 let error = runtime
2833 .execute(json!({}), None)
2834 .await
2835 .expect_err("workflow should fail on step limit");
2836
2837 assert!(matches!(
2838 error,
2839 WorkflowRuntimeError::StepLimitExceeded { max_steps: 3 }
2840 ));
2841 }
2842
2843 #[tokio::test]
2844 async fn validates_recorded_trace_in_replay_mode() {
2845 let llm = MockLlmExecutor {
2846 output: "ok".to_string(),
2847 };
2848 let tools = MockToolExecutor {
2849 output: json!({"status": "done"}),
2850 fail: false,
2851 };
2852 let runtime = WorkflowRuntime::new(
2853 linear_workflow(),
2854 &llm,
2855 Some(&tools),
2856 WorkflowRuntimeOptions {
2857 replay_mode: WorkflowReplayMode::ValidateRecordedTrace,
2858 ..WorkflowRuntimeOptions::default()
2859 },
2860 );
2861
2862 let result = runtime
2863 .execute(json!({"request_id": "r1"}), None)
2864 .await
2865 .expect("replay validation should pass");
2866
2867 assert!(result.trace.is_some());
2868 assert_eq!(
2869 result.replay_report.as_ref().map(|r| r.total_events),
2870 Some(9)
2871 );
2872 }
2873
2874 #[tokio::test]
2875 async fn executes_loop_until_condition_fails() {
2876 let llm = MockLlmExecutor {
2877 output: "unused".to_string(),
2878 };
2879 let counter = IncrementingToolExecutor {
2880 value: AtomicUsize::new(0),
2881 };
2882 let runtime = WorkflowRuntime::new(
2883 loop_workflow(Some(8)),
2884 &llm,
2885 Some(&counter),
2886 WorkflowRuntimeOptions::default(),
2887 );
2888
2889 let result = runtime
2890 .execute(json!({}), None)
2891 .await
2892 .expect("loop workflow should terminate at end");
2893
2894 assert_eq!(result.terminal_node_id, "end");
2895 assert!(result.node_executions.iter().any(|step| matches!(
2896 step.data,
2897 NodeExecutionData::Loop {
2898 evaluated: false,
2899 ..
2900 }
2901 )));
2902 }
2903
2904 #[tokio::test]
2905 async fn fails_when_loop_exceeds_max_iterations() {
2906 let llm = MockLlmExecutor {
2907 output: "unused".to_string(),
2908 };
2909 let counter = MockToolExecutor {
2910 output: json!(0),
2911 fail: false,
2912 };
2913 let runtime = WorkflowRuntime::new(
2914 loop_workflow(Some(2)),
2915 &llm,
2916 Some(&counter),
2917 WorkflowRuntimeOptions {
2918 max_steps: 20,
2919 ..WorkflowRuntimeOptions::default()
2920 },
2921 );
2922
2923 let error = runtime
2924 .execute(json!({}), None)
2925 .await
2926 .expect_err("loop should fail once max iterations are exceeded");
2927
2928 assert!(matches!(
2929 error,
2930 WorkflowRuntimeError::LoopIterationLimitExceeded {
2931 node_id,
2932 max_iterations: 2
2933 } if node_id == "loop"
2934 ));
2935 }
2936
2937 #[tokio::test]
2938 async fn executes_parallel_then_merge_all() {
2939 let llm = MockLlmExecutor {
2940 output: "unused".to_string(),
2941 };
2942 let tool = EchoInputToolExecutor;
2943 let runtime = WorkflowRuntime::new(
2944 parallel_merge_workflow(MergePolicy::All, None),
2945 &llm,
2946 Some(&tool),
2947 WorkflowRuntimeOptions::default(),
2948 );
2949
2950 let result = runtime
2951 .execute(json!({}), None)
2952 .await
2953 .expect("parallel merge workflow should succeed");
2954
2955 assert_eq!(result.terminal_node_id, "end");
2956 assert_eq!(
2957 result.node_outputs.get("tool_a"),
2958 Some(&json!({"value": 1}))
2959 );
2960 assert_eq!(
2961 result.node_outputs.get("tool_b"),
2962 Some(&json!({"value": 2}))
2963 );
2964 assert_eq!(
2965 result.node_outputs.get("merge"),
2966 Some(&json!([{"value": 1}, {"value": 2}]))
2967 );
2968 }
2969
2970 #[tokio::test]
2971 async fn executes_map_reduce_sum() {
2972 let llm = MockLlmExecutor {
2973 output: "unused".to_string(),
2974 };
2975 let tool = EchoInputToolExecutor;
2976 let runtime = WorkflowRuntime::new(
2977 map_reduce_workflow(ReduceOperation::Sum),
2978 &llm,
2979 Some(&tool),
2980 WorkflowRuntimeOptions::default(),
2981 );
2982
2983 let result = runtime
2984 .execute(json!({"values": [1, 2, 3]}), None)
2985 .await
2986 .expect("map reduce workflow should succeed");
2987
2988 assert_eq!(result.node_outputs.get("map"), Some(&json!([1, 2, 3])));
2989 assert_eq!(result.node_outputs.get("reduce"), Some(&json!(6.0)));
2990 }
2991
2992 #[tokio::test]
2993 async fn fails_map_when_items_path_is_not_array() {
2994 let llm = MockLlmExecutor {
2995 output: "unused".to_string(),
2996 };
2997 let tool = EchoInputToolExecutor;
2998 let runtime = WorkflowRuntime::new(
2999 map_reduce_workflow(ReduceOperation::Count),
3000 &llm,
3001 Some(&tool),
3002 WorkflowRuntimeOptions::default(),
3003 );
3004
3005 let error = runtime
3006 .execute(json!({"values": {"not": "array"}}), None)
3007 .await
3008 .expect_err("map node should fail on non-array path");
3009
3010 assert!(matches!(
3011 error,
3012 WorkflowRuntimeError::MapItemsNotArray {
3013 node_id,
3014 items_path
3015 } if node_id == "map" && items_path == "input.values"
3016 ));
3017 }
3018
3019 #[tokio::test]
3020 async fn executes_subgraph_via_registry() {
3021 let llm = MockLlmExecutor {
3022 output: "nested-ok".to_string(),
3023 };
3024 let subgraph = WorkflowDefinition {
3025 version: "v0".to_string(),
3026 name: "child".to_string(),
3027 nodes: vec![
3028 Node {
3029 id: "start".to_string(),
3030 kind: NodeKind::Start {
3031 next: "llm".to_string(),
3032 },
3033 },
3034 Node {
3035 id: "llm".to_string(),
3036 kind: NodeKind::Llm {
3037 model: "gpt-4".to_string(),
3038 prompt: "child".to_string(),
3039 next: Some("end".to_string()),
3040 },
3041 },
3042 Node {
3043 id: "end".to_string(),
3044 kind: NodeKind::End,
3045 },
3046 ],
3047 };
3048 let parent = WorkflowDefinition {
3049 version: "v0".to_string(),
3050 name: "parent".to_string(),
3051 nodes: vec![
3052 Node {
3053 id: "start".to_string(),
3054 kind: NodeKind::Start {
3055 next: "sub".to_string(),
3056 },
3057 },
3058 Node {
3059 id: "sub".to_string(),
3060 kind: NodeKind::Subgraph {
3061 graph: "child_graph".to_string(),
3062 next: Some("end".to_string()),
3063 },
3064 },
3065 Node {
3066 id: "end".to_string(),
3067 kind: NodeKind::End,
3068 },
3069 ],
3070 };
3071
3072 let mut registry = BTreeMap::new();
3073 registry.insert("child_graph".to_string(), subgraph);
3074 let runtime = WorkflowRuntime::new(
3075 parent,
3076 &llm,
3077 None,
3078 WorkflowRuntimeOptions {
3079 subgraph_registry: registry,
3080 ..WorkflowRuntimeOptions::default()
3081 },
3082 );
3083
3084 let result = runtime
3085 .execute(json!({"approved": true}), None)
3086 .await
3087 .expect("subgraph workflow should execute");
3088
3089 assert_eq!(result.terminal_node_id, "end");
3090 assert!(matches!(
3091 result.node_executions[1].data,
3092 NodeExecutionData::Subgraph { .. }
3093 ));
3094 }
3095
3096 #[tokio::test]
3097 async fn executes_batch_and_filter_nodes() {
3098 let llm = MockLlmExecutor {
3099 output: "unused".to_string(),
3100 };
3101 let workflow = WorkflowDefinition {
3102 version: "v0".to_string(),
3103 name: "batch-filter".to_string(),
3104 nodes: vec![
3105 Node {
3106 id: "start".to_string(),
3107 kind: NodeKind::Start {
3108 next: "batch".to_string(),
3109 },
3110 },
3111 Node {
3112 id: "batch".to_string(),
3113 kind: NodeKind::Batch {
3114 items_path: "input.items".to_string(),
3115 next: "filter".to_string(),
3116 },
3117 },
3118 Node {
3119 id: "filter".to_string(),
3120 kind: NodeKind::Filter {
3121 items_path: "node_outputs.batch".to_string(),
3122 expression: "item.keep == true".to_string(),
3123 next: "end".to_string(),
3124 },
3125 },
3126 Node {
3127 id: "end".to_string(),
3128 kind: NodeKind::End,
3129 },
3130 ],
3131 };
3132
3133 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3134 let result = runtime
3135 .execute(
3136 json!({
3137 "items": [
3138 {"id": 1, "keep": true},
3139 {"id": 2, "keep": false},
3140 {"id": 3, "keep": true}
3141 ]
3142 }),
3143 None,
3144 )
3145 .await
3146 .expect("batch/filter workflow should execute");
3147
3148 assert_eq!(
3149 result
3150 .node_outputs
3151 .get("batch")
3152 .and_then(Value::as_array)
3153 .map(Vec::len),
3154 Some(3)
3155 );
3156 assert_eq!(
3157 result
3158 .node_outputs
3159 .get("filter")
3160 .and_then(Value::as_array)
3161 .map(Vec::len),
3162 Some(2)
3163 );
3164 }
3165
3166 #[tokio::test]
3167 async fn executes_debounce_and_throttle_nodes() {
3168 let llm = MockLlmExecutor {
3169 output: "unused".to_string(),
3170 };
3171 let runtime = WorkflowRuntime::new(
3172 debounce_and_throttle_workflow(),
3173 &llm,
3174 None,
3175 WorkflowRuntimeOptions::default(),
3176 );
3177
3178 let result = runtime
3179 .execute(json!({"key": "k1"}), None)
3180 .await
3181 .expect("debounce/throttle workflow should execute");
3182
3183 assert_eq!(result.terminal_node_id, "end_throttled");
3184 assert_eq!(
3185 result.node_outputs.get("debounce_a"),
3186 Some(&json!({"key": "k1", "suppressed": true}))
3187 );
3188 assert_eq!(
3189 result.node_outputs.get("throttle_a"),
3190 Some(&json!({"key": "k1", "throttled": true}))
3191 );
3192 }
3193
3194 #[tokio::test]
3195 async fn executes_retry_compensate_successfully_without_compensation() {
3196 let llm = MockLlmExecutor {
3197 output: "unused".to_string(),
3198 };
3199 let tools = RetryCompensateToolExecutor {
3200 attempts: AtomicUsize::new(0),
3201 };
3202 let runtime = WorkflowRuntime::new(
3203 retry_compensate_workflow("unstable_primary"),
3204 &llm,
3205 Some(&tools),
3206 WorkflowRuntimeOptions::default(),
3207 );
3208
3209 let result = runtime
3210 .execute(json!({}), None)
3211 .await
3212 .expect("retry_compensate should recover by retry");
3213
3214 assert_eq!(result.terminal_node_id, "end");
3215 assert_eq!(tools.attempts.load(Ordering::Relaxed), 2);
3216 assert_eq!(result.retry_events.len(), 1);
3217 }
3218
3219 #[tokio::test]
3220 async fn executes_retry_compensate_with_compensation_route() {
3221 let llm = MockLlmExecutor {
3222 output: "unused".to_string(),
3223 };
3224 let tools = RetryCompensateToolExecutor {
3225 attempts: AtomicUsize::new(0),
3226 };
3227 let runtime = WorkflowRuntime::new(
3228 retry_compensate_workflow("always_fail"),
3229 &llm,
3230 Some(&tools),
3231 WorkflowRuntimeOptions::default(),
3232 );
3233
3234 let result = runtime
3235 .execute(json!({}), None)
3236 .await
3237 .expect("retry_compensate should use compensation");
3238
3239 assert_eq!(result.terminal_node_id, "end_compensated");
3240 assert_eq!(result.retry_events.len(), 1);
3241 assert_eq!(
3242 result
3243 .node_outputs
3244 .get("retry_comp")
3245 .and_then(|value| value.get("status")),
3246 Some(&json!("compensated"))
3247 );
3248 }
3249
3250 #[tokio::test]
3251 async fn executes_event_cache_router_human_transform_nodes() {
3252 let llm = MockLlmExecutor {
3253 output: "unused".to_string(),
3254 };
3255 let runtime = WorkflowRuntime::new(
3256 extended_nodes_workflow(),
3257 &llm,
3258 None,
3259 WorkflowRuntimeOptions::default(),
3260 );
3261
3262 let result = runtime
3263 .execute(
3264 json!({
3265 "event_type": "webhook",
3266 "cache_key": "customer-1",
3267 "payload": {"value": 99},
3268 "mode": "manual",
3269 "approval": "approve",
3270 "review_notes": {"editor": "ops"}
3271 }),
3272 None,
3273 )
3274 .await
3275 .expect("extended nodes workflow should execute");
3276
3277 assert_eq!(result.terminal_node_id, "end");
3278 assert_eq!(
3279 result.node_outputs.get("cache_read"),
3280 Some(&json!({"key": "customer-1", "hit": true, "value": {"value": 99}}))
3281 );
3282 assert_eq!(
3283 result.node_outputs.get("router"),
3284 Some(&json!({"selected": "human"}))
3285 );
3286 assert_eq!(
3287 result.node_outputs.get("transform"),
3288 Some(&json!({"value": 99}))
3289 );
3290 }
3291
3292 #[tokio::test]
3293 async fn routes_event_trigger_mismatch_to_fallback() {
3294 let llm = MockLlmExecutor {
3295 output: "unused".to_string(),
3296 };
3297 let runtime = WorkflowRuntime::new(
3298 extended_nodes_workflow(),
3299 &llm,
3300 None,
3301 WorkflowRuntimeOptions::default(),
3302 );
3303
3304 let result = runtime
3305 .execute(
3306 json!({
3307 "event_type": "cron",
3308 "cache_key": "customer-1",
3309 "payload": {"value": 99},
3310 "mode": "manual",
3311 "approval": "approve"
3312 }),
3313 None,
3314 )
3315 .await
3316 .expect("event mismatch should route to fallback");
3317
3318 assert_eq!(result.terminal_node_id, "end_mismatch");
3319 }
3320
3321 #[tokio::test]
3322 async fn cache_read_routes_to_on_miss_when_value_absent() {
3323 let llm = MockLlmExecutor {
3324 output: "unused".to_string(),
3325 };
3326 let workflow = WorkflowDefinition {
3327 version: "v0".to_string(),
3328 name: "cache-read-miss".to_string(),
3329 nodes: vec![
3330 Node {
3331 id: "start".to_string(),
3332 kind: NodeKind::Start {
3333 next: "cache_read".to_string(),
3334 },
3335 },
3336 Node {
3337 id: "cache_read".to_string(),
3338 kind: NodeKind::CacheRead {
3339 key_path: "input.cache_key".to_string(),
3340 next: "end_hit".to_string(),
3341 on_miss: Some("end_miss".to_string()),
3342 },
3343 },
3344 Node {
3345 id: "end_hit".to_string(),
3346 kind: NodeKind::End,
3347 },
3348 Node {
3349 id: "end_miss".to_string(),
3350 kind: NodeKind::End,
3351 },
3352 ],
3353 };
3354 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3355
3356 let result = runtime
3357 .execute(json!({"cache_key": "new-customer"}), None)
3358 .await
3359 .expect("cache read miss should still execute via on_miss route");
3360
3361 assert_eq!(
3362 result.node_outputs.get("cache_read"),
3363 Some(&json!({"key": "new-customer", "hit": false, "value": null}))
3364 );
3365 assert_eq!(result.terminal_node_id, "end_miss");
3366 }
3367
3368 #[tokio::test]
3369 async fn cache_read_treats_cached_null_as_hit() {
3370 let llm = MockLlmExecutor {
3371 output: "unused".to_string(),
3372 };
3373 let workflow = WorkflowDefinition {
3374 version: "v0".to_string(),
3375 name: "cache-read-null-hit".to_string(),
3376 nodes: vec![
3377 Node {
3378 id: "start".to_string(),
3379 kind: NodeKind::Start {
3380 next: "cache_write".to_string(),
3381 },
3382 },
3383 Node {
3384 id: "cache_write".to_string(),
3385 kind: NodeKind::CacheWrite {
3386 key_path: "input.cache_key".to_string(),
3387 value_path: "input.payload".to_string(),
3388 next: "cache_read".to_string(),
3389 },
3390 },
3391 Node {
3392 id: "cache_read".to_string(),
3393 kind: NodeKind::CacheRead {
3394 key_path: "input.cache_key".to_string(),
3395 next: "end_hit".to_string(),
3396 on_miss: Some("end_miss".to_string()),
3397 },
3398 },
3399 Node {
3400 id: "end_hit".to_string(),
3401 kind: NodeKind::End,
3402 },
3403 Node {
3404 id: "end_miss".to_string(),
3405 kind: NodeKind::End,
3406 },
3407 ],
3408 };
3409 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3410
3411 let result = runtime
3412 .execute(json!({"cache_key": "customer-null", "payload": null}), None)
3413 .await
3414 .expect("cache read hit with null value should execute");
3415
3416 assert_eq!(
3417 result.node_outputs.get("cache_read"),
3418 Some(&json!({"key": "customer-null", "hit": true, "value": null}))
3419 );
3420 assert_eq!(result.terminal_node_id, "end_hit");
3421 }
3422
3423 #[tokio::test]
3424 async fn rejects_condition_when_expression_scope_exceeds_limit() {
3425 let llm = MockLlmExecutor {
3426 output: "unused".to_string(),
3427 };
3428 let workflow = WorkflowDefinition {
3429 version: "v0".to_string(),
3430 name: "scope-limit".to_string(),
3431 nodes: vec![
3432 Node {
3433 id: "start".to_string(),
3434 kind: NodeKind::Start {
3435 next: "condition".to_string(),
3436 },
3437 },
3438 Node {
3439 id: "condition".to_string(),
3440 kind: NodeKind::Condition {
3441 expression: "input.flag == true".to_string(),
3442 on_true: "end".to_string(),
3443 on_false: "end".to_string(),
3444 },
3445 },
3446 Node {
3447 id: "end".to_string(),
3448 kind: NodeKind::End,
3449 },
3450 ],
3451 };
3452
3453 let runtime = WorkflowRuntime::new(
3454 workflow,
3455 &llm,
3456 None,
3457 WorkflowRuntimeOptions {
3458 security_limits: RuntimeSecurityLimits {
3459 max_expression_scope_bytes: 32,
3460 ..RuntimeSecurityLimits::default()
3461 },
3462 ..WorkflowRuntimeOptions::default()
3463 },
3464 );
3465
3466 let error = runtime
3467 .execute(
3468 json!({"flag": true, "payload": "this-is-too-large-for-limit"}),
3469 None,
3470 )
3471 .await
3472 .expect_err("condition should fail when scope budget is exceeded");
3473 assert!(matches!(
3474 error,
3475 WorkflowRuntimeError::ExpressionScopeLimitExceeded { node_id, .. }
3476 if node_id == "condition"
3477 ));
3478 }
3479
3480 #[tokio::test]
3481 async fn rejects_map_when_item_count_exceeds_limit() {
3482 let llm = MockLlmExecutor {
3483 output: "unused".to_string(),
3484 };
3485 let tool = EchoInputToolExecutor;
3486 let runtime = WorkflowRuntime::new(
3487 map_reduce_workflow(ReduceOperation::Count),
3488 &llm,
3489 Some(&tool),
3490 WorkflowRuntimeOptions {
3491 security_limits: RuntimeSecurityLimits {
3492 max_map_items: 2,
3493 ..RuntimeSecurityLimits::default()
3494 },
3495 ..WorkflowRuntimeOptions::default()
3496 },
3497 );
3498
3499 let error = runtime
3500 .execute(json!({"values": [1, 2, 3]}), None)
3501 .await
3502 .expect_err("map should fail when item guard is exceeded");
3503 assert!(matches!(
3504 error,
3505 WorkflowRuntimeError::MapItemLimitExceeded {
3506 node_id,
3507 actual_items: 3,
3508 max_items: 2,
3509 } if node_id == "map"
3510 ));
3511 }
3512
3513 #[tokio::test]
3514 async fn resumes_from_checkpoint() {
3515 let llm = MockLlmExecutor {
3516 output: "unused".to_string(),
3517 };
3518 let tool = MockToolExecutor {
3519 output: json!({"ok": true}),
3520 fail: false,
3521 };
3522 let runtime = WorkflowRuntime::new(
3523 linear_workflow(),
3524 &llm,
3525 Some(&tool),
3526 WorkflowRuntimeOptions::default(),
3527 );
3528
3529 let checkpoint = WorkflowCheckpoint {
3530 run_id: "run-1".to_string(),
3531 workflow_name: "linear".to_string(),
3532 step: 2,
3533 next_node_id: "tool".to_string(),
3534 scope_snapshot: json!({"input": {"request_id": "r-resume"}}),
3535 };
3536
3537 let resumed = runtime
3538 .execute_resume_from_failure(&checkpoint, None)
3539 .await
3540 .expect("resume should continue from checkpoint node");
3541 assert_eq!(resumed.terminal_node_id, "end");
3542 assert_eq!(resumed.node_executions[0].node_id, "tool");
3543 }
3544
3545 #[test]
3546 fn scope_capabilities_enforce_read_write_boundaries() {
3547 let mut scope = RuntimeScope::new(json!({"k": "v"}));
3548
3549 let read_error = scope
3550 .scoped_input(ScopeCapability::LlmWrite)
3551 .expect_err("write capability should not read scope");
3552 assert!(matches!(read_error, ScopeAccessError::ReadDenied { .. }));
3553
3554 let write_error = scope
3555 .record_tool_output("tool", json!({"ok": true}), ScopeCapability::LlmWrite)
3556 .expect_err("llm write capability should not write tool output");
3557 assert!(matches!(write_error, ScopeAccessError::WriteDenied { .. }));
3558 }
3559}