1use std::collections::{BTreeMap, HashMap};
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use serde_json::{json, Map, Value};
7use simple_agent_type::message::Message;
8use simple_agent_type::request::CompletionRequest;
9use simple_agents_core::{CompletionOptions, CompletionOutcome, SimpleAgentsClient};
10use thiserror::Error;
11use tokio::time::timeout;
12
13use crate::checkpoint::WorkflowCheckpoint;
14use crate::expressions;
15use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
16use crate::recorder::{TraceRecordError, TraceRecorder};
17use crate::replay::{replay_trace, ReplayError, ReplayReport};
18use crate::scheduler::DagScheduler;
19pub use crate::state::ScopeAccessError;
20use crate::trace::{TraceTerminalStatus, WorkflowTrace, WorkflowTraceMetadata};
21use crate::validation::{validate_and_normalize, ValidationErrors};
22
23mod engine;
24mod scope;
25use scope::{RuntimeScope, ScopeCapability};
26
27#[derive(Debug, Clone)]
29pub struct WorkflowRuntimeOptions {
30 pub max_steps: usize,
32 pub validate_before_run: bool,
34 pub llm_node_policy: NodeExecutionPolicy,
36 pub tool_node_policy: NodeExecutionPolicy,
38 pub enable_trace_recording: bool,
40 pub replay_mode: WorkflowReplayMode,
42 pub scheduler_max_in_flight: usize,
44 pub subgraph_registry: BTreeMap<String, WorkflowDefinition>,
46 pub security_limits: RuntimeSecurityLimits,
48}
49
50#[derive(Debug, Clone)]
52pub struct RuntimeSecurityLimits {
53 pub max_expression_scope_bytes: usize,
55 pub max_map_items: usize,
57 pub max_parallel_branches: usize,
59 pub max_filter_items: usize,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum WorkflowReplayMode {
66 Disabled,
68 ValidateRecordedTrace,
70}
71
72#[derive(Debug, Clone, Default)]
74pub struct NodeExecutionPolicy {
75 pub timeout: Option<Duration>,
77 pub max_retries: usize,
79}
80
81impl Default for WorkflowRuntimeOptions {
82 fn default() -> Self {
83 Self {
84 max_steps: 256,
85 validate_before_run: true,
86 llm_node_policy: NodeExecutionPolicy::default(),
87 tool_node_policy: NodeExecutionPolicy::default(),
88 enable_trace_recording: true,
89 replay_mode: WorkflowReplayMode::Disabled,
90 scheduler_max_in_flight: 8,
91 subgraph_registry: BTreeMap::new(),
92 security_limits: RuntimeSecurityLimits::default(),
93 }
94 }
95}
96
97impl Default for RuntimeSecurityLimits {
98 fn default() -> Self {
99 Self {
100 max_expression_scope_bytes: 128 * 1024,
101 max_map_items: 4096,
102 max_parallel_branches: 128,
103 max_filter_items: 8192,
104 }
105 }
106}
107
108pub trait CancellationSignal: Send + Sync {
110 fn is_cancelled(&self) -> bool;
112}
113
114impl CancellationSignal for AtomicBool {
115 fn is_cancelled(&self) -> bool {
116 self.load(Ordering::Relaxed)
117 }
118}
119
120impl CancellationSignal for bool {
121 fn is_cancelled(&self) -> bool {
122 *self
123 }
124}
125
126#[derive(Debug, Clone, PartialEq)]
128pub struct LlmExecutionInput {
129 pub node_id: String,
131 pub model: String,
133 pub prompt: String,
135 pub scoped_input: Value,
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
141pub struct LlmExecutionOutput {
142 pub content: String,
144}
145
146#[derive(Debug, Error, Clone, PartialEq, Eq)]
148pub enum LlmExecutionError {
149 #[error("invalid completion request: {0}")]
151 InvalidRequest(String),
152 #[error("llm client execution failed: {0}")]
154 Client(String),
155 #[error("unexpected completion outcome: {0}")]
157 UnexpectedOutcome(&'static str),
158 #[error("llm response had no content")]
160 EmptyResponse,
161}
162
163#[async_trait]
165pub trait LlmExecutor: Send + Sync {
166 async fn execute(
168 &self,
169 input: LlmExecutionInput,
170 ) -> Result<LlmExecutionOutput, LlmExecutionError>;
171}
172
173#[async_trait]
174impl LlmExecutor for SimpleAgentsClient {
175 async fn execute(
176 &self,
177 input: LlmExecutionInput,
178 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
179 let user_prompt = build_prompt_with_scope(&input.prompt, &input.scoped_input);
180 let request = CompletionRequest::builder()
181 .model(input.model)
182 .message(Message::user(user_prompt))
183 .build()
184 .map_err(|error| LlmExecutionError::InvalidRequest(error.to_string()))?;
185
186 let outcome = self
187 .complete(&request, CompletionOptions::default())
188 .await
189 .map_err(|error| LlmExecutionError::Client(error.to_string()))?;
190
191 let response = match outcome {
192 CompletionOutcome::Response(response) => response,
193 CompletionOutcome::Stream(_) => {
194 return Err(LlmExecutionError::UnexpectedOutcome("stream"));
195 }
196 CompletionOutcome::HealedJson(_) => {
197 return Err(LlmExecutionError::UnexpectedOutcome("healed_json"));
198 }
199 CompletionOutcome::CoercedSchema(_) => {
200 return Err(LlmExecutionError::UnexpectedOutcome("coerced_schema"));
201 }
202 };
203
204 let content = response
205 .content()
206 .ok_or(LlmExecutionError::EmptyResponse)?
207 .to_string();
208
209 Ok(LlmExecutionOutput { content })
210 }
211}
212
213#[derive(Debug, Clone, PartialEq)]
215pub struct ToolExecutionInput {
216 pub node_id: String,
218 pub tool: String,
220 pub input: Value,
222 pub scoped_input: Value,
224}
225
226#[derive(Debug, Error, Clone, PartialEq, Eq)]
228pub enum ToolExecutionError {
229 #[error("tool handler not found: {tool}")]
231 NotFound {
232 tool: String,
234 },
235 #[error("tool execution failed: {0}")]
237 Failed(String),
238}
239
240#[async_trait]
242pub trait ToolExecutor: Send + Sync {
243 async fn execute_tool(&self, input: ToolExecutionInput) -> Result<Value, ToolExecutionError>;
245}
246
247#[derive(Debug, Clone, PartialEq)]
249pub struct NodeExecution {
250 pub step: usize,
252 pub node_id: String,
254 pub data: NodeExecutionData,
256}
257
258#[derive(Debug, Clone, PartialEq)]
260pub enum NodeExecutionData {
261 Start {
263 next: String,
265 },
266 Llm {
268 model: String,
270 output: String,
272 next: String,
274 },
275 Tool {
277 tool: String,
279 output: Value,
281 next: String,
283 },
284 Condition {
286 expression: String,
288 evaluated: bool,
290 next: String,
292 },
293 Debounce {
295 key: String,
297 suppressed: bool,
299 next: String,
301 },
302 Throttle {
304 key: String,
306 throttled: bool,
308 next: String,
310 },
311 RetryCompensate {
313 tool: String,
315 attempts: usize,
317 compensated: bool,
319 output: Value,
321 next: String,
323 },
324 HumanInTheLoop {
326 approved: bool,
328 response: Value,
330 next: String,
332 },
333 CacheRead {
335 key: String,
337 hit: bool,
339 value: Value,
341 next: String,
343 },
344 CacheWrite {
346 key: String,
348 value: Value,
350 next: String,
352 },
353 EventTrigger {
355 event: String,
357 matched: bool,
359 next: String,
361 },
362 Router {
364 selected: String,
366 next: String,
368 },
369 Transform {
371 expression: String,
373 output: Value,
375 next: String,
377 },
378 Loop {
380 condition: String,
382 evaluated: bool,
384 iteration: u32,
386 next: String,
388 },
389 Subgraph {
391 graph: String,
393 terminal_node_id: String,
395 output: Value,
397 next: String,
399 },
400 Batch {
402 items_path: String,
404 item_count: usize,
406 next: String,
408 },
409 Filter {
411 items_path: String,
413 expression: String,
415 kept: usize,
417 next: String,
419 },
420 Parallel {
422 branches: Vec<String>,
424 outputs: BTreeMap<String, Value>,
426 next: String,
428 },
429 Merge {
431 policy: MergePolicy,
433 sources: Vec<String>,
435 output: Value,
437 next: String,
439 },
440 Map {
442 item_count: usize,
444 output: Value,
446 next: String,
448 },
449 Reduce {
451 operation: ReduceOperation,
453 output: Value,
455 next: String,
457 },
458 End,
460}
461
462#[derive(Debug, Clone, PartialEq)]
464pub struct WorkflowEvent {
465 pub step: usize,
467 pub node_id: String,
469 pub kind: WorkflowEventKind,
471}
472
473#[derive(Debug, Clone, PartialEq)]
475pub enum WorkflowEventKind {
476 NodeStarted,
478 NodeCompleted {
480 data: NodeExecutionData,
482 },
483 NodeFailed {
485 message: String,
487 },
488}
489
490#[derive(Debug, Clone, PartialEq)]
492pub struct WorkflowRunResult {
493 pub workflow_name: String,
495 pub terminal_node_id: String,
497 pub node_executions: Vec<NodeExecution>,
499 pub events: Vec<WorkflowEvent>,
501 pub retry_events: Vec<WorkflowRetryEvent>,
503 pub node_outputs: BTreeMap<String, Value>,
505 pub trace: Option<WorkflowTrace>,
507 pub replay_report: Option<ReplayReport>,
509}
510
511#[derive(Debug, Clone, PartialEq, Eq)]
513pub struct WorkflowRetryEvent {
514 pub step: usize,
516 pub node_id: String,
518 pub operation: String,
520 pub failed_attempt: usize,
522 pub reason: String,
524}
525
526#[derive(Debug, Error)]
528pub enum WorkflowRuntimeError {
529 #[error(transparent)]
531 Validation(#[from] ValidationErrors),
532 #[error("workflow has no start node")]
534 MissingStartNode,
535 #[error("node not found: {node_id}")]
537 NodeNotFound {
538 node_id: String,
540 },
541 #[error("workflow exceeded max step limit ({max_steps})")]
543 StepLimitExceeded {
544 max_steps: usize,
546 },
547 #[error("workflow execution cancelled")]
549 Cancelled,
550 #[error("llm node '{node_id}' failed: {source}")]
552 Llm {
553 node_id: String,
555 source: LlmExecutionError,
557 },
558 #[error("tool node '{node_id}' failed: {source}")]
560 Tool {
561 node_id: String,
563 source: ToolExecutionError,
565 },
566 #[error("llm node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
568 LlmRetryExhausted {
569 node_id: String,
571 attempts: usize,
573 last_error: LlmExecutionError,
575 },
576 #[error("tool node '{node_id}' exhausted {attempts} attempt(s): {last_error}")]
578 ToolRetryExhausted {
579 node_id: String,
581 attempts: usize,
583 last_error: ToolExecutionError,
585 },
586 #[error(
588 "llm node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
589 )]
590 LlmTimeout {
591 node_id: String,
593 timeout_ms: u128,
595 attempts: usize,
597 },
598 #[error(
600 "tool node '{node_id}' timed out after {attempts} attempt(s) (timeout: {timeout_ms} ms)"
601 )]
602 ToolTimeout {
603 node_id: String,
605 timeout_ms: u128,
607 attempts: usize,
609 },
610 #[error("tool node '{node_id}' requires a tool executor")]
612 MissingToolExecutor {
613 node_id: String,
615 },
616 #[error("node '{node_id}' is missing required next edge")]
618 MissingNextEdge {
619 node_id: String,
621 },
622 #[error("condition node '{node_id}' has invalid expression '{expression}': {reason}")]
624 InvalidCondition {
625 node_id: String,
627 expression: String,
629 reason: String,
631 },
632 #[error("loop node '{node_id}' has invalid condition '{expression}': {reason}")]
634 InvalidLoopCondition {
635 node_id: String,
637 expression: String,
639 reason: String,
641 },
642 #[error("loop node '{node_id}' exceeded max iterations ({max_iterations})")]
644 LoopIterationLimitExceeded {
645 node_id: String,
647 max_iterations: u32,
649 },
650 #[error("node '{node_id}' did not provide a next transition")]
652 MissingNextTransition {
653 node_id: String,
655 },
656 #[error("scope access failed on node '{node_id}': {source}")]
658 ScopeAccess {
659 node_id: String,
661 source: ScopeAccessError,
663 },
664 #[error(transparent)]
666 TraceRecording(#[from] TraceRecordError),
667 #[error(transparent)]
669 Replay(#[from] ReplayError),
670 #[error("replay validation requires trace recording to be enabled")]
672 ReplayRequiresTraceRecording,
673 #[error("parallel node '{node_id}' cannot execute branch '{branch_id}': {reason}")]
675 ParallelBranchUnsupported {
676 node_id: String,
678 branch_id: String,
680 reason: String,
682 },
683 #[error("merge node '{node_id}' missing source output from '{source_id}'")]
685 MissingMergeSource {
686 node_id: String,
688 source_id: String,
690 },
691 #[error("merge node '{node_id}' quorum not met: required {required}, resolved {resolved}")]
693 MergeQuorumNotMet {
694 node_id: String,
696 required: usize,
698 resolved: usize,
700 },
701 #[error("map node '{node_id}' items_path '{items_path}' did not resolve to an array")]
703 MapItemsNotArray {
704 node_id: String,
706 items_path: String,
708 },
709 #[error("reduce node '{node_id}' source '{source_node}' is not reducible: {reason}")]
711 InvalidReduceInput {
712 node_id: String,
714 source_node: String,
716 reason: String,
718 },
719 #[error("subgraph node '{node_id}' references unknown graph '{graph}'")]
721 SubgraphNotFound { node_id: String, graph: String },
722 #[error("batch node '{node_id}' items_path '{items_path}' did not resolve to an array")]
724 BatchItemsNotArray { node_id: String, items_path: String },
725 #[error("filter node '{node_id}' items_path '{items_path}' did not resolve to an array")]
727 FilterItemsNotArray { node_id: String, items_path: String },
728 #[error("filter node '{node_id}' expression '{expression}' failed: {reason}")]
730 InvalidFilterExpression {
731 node_id: String,
732 expression: String,
733 reason: String,
734 },
735 #[error(
737 "expression scope too large on node '{node_id}': {actual_bytes} bytes exceeds limit {limit_bytes}"
738 )]
739 ExpressionScopeLimitExceeded {
740 node_id: String,
741 actual_bytes: usize,
742 limit_bytes: usize,
743 },
744 #[error(
746 "parallel node '{node_id}' has {actual_branches} branches, exceeding limit {max_branches}"
747 )]
748 ParallelBranchLimitExceeded {
749 node_id: String,
750 actual_branches: usize,
751 max_branches: usize,
752 },
753 #[error("map node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
755 MapItemLimitExceeded {
756 node_id: String,
757 actual_items: usize,
758 max_items: usize,
759 },
760 #[error("filter node '{node_id}' has {actual_items} items, exceeding limit {max_items}")]
762 FilterItemLimitExceeded {
763 node_id: String,
764 actual_items: usize,
765 max_items: usize,
766 },
767 #[error("node '{node_id}' could not resolve required path '{path}'")]
769 MissingPath { node_id: String, path: String },
770 #[error("node '{node_id}' path '{path}' did not resolve to a string cache key")]
772 CacheKeyNotString { node_id: String, path: String },
773 #[error("human node '{node_id}' has unsupported decision value at '{path}': {value}")]
775 InvalidHumanDecision {
776 node_id: String,
777 path: String,
778 value: String,
779 },
780 #[error("event trigger node '{node_id}' path '{path}' did not resolve to an event string")]
782 InvalidEventValue { node_id: String, path: String },
783 #[error("router node '{node_id}' route expression '{expression}' failed: {reason}")]
785 InvalidRouterExpression {
786 node_id: String,
787 expression: String,
788 reason: String,
789 },
790 #[error("transform node '{node_id}' expression '{expression}' failed: {reason}")]
792 InvalidTransformExpression {
793 node_id: String,
794 expression: String,
795 reason: String,
796 },
797 #[error(
799 "retry_compensate node '{node_id}' failed after {attempts} primary attempt(s) and compensation error: {compensation_error}"
800 )]
801 RetryCompensateFailed {
802 node_id: String,
803 attempts: usize,
804 compensation_error: ToolExecutionError,
805 },
806}
807
808pub struct WorkflowRuntime<'a> {
810 definition: WorkflowDefinition,
811 llm_executor: &'a dyn LlmExecutor,
812 tool_executor: Option<&'a dyn ToolExecutor>,
813 options: WorkflowRuntimeOptions,
814}
815
816impl<'a> WorkflowRuntime<'a> {
817 pub fn new(
819 definition: WorkflowDefinition,
820 llm_executor: &'a dyn LlmExecutor,
821 tool_executor: Option<&'a dyn ToolExecutor>,
822 options: WorkflowRuntimeOptions,
823 ) -> Self {
824 Self {
825 definition,
826 llm_executor,
827 tool_executor,
828 options,
829 }
830 }
831
832 pub async fn execute(
834 &self,
835 input: Value,
836 cancellation: Option<&dyn CancellationSignal>,
837 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
838 let workflow = if self.options.validate_before_run {
839 validate_and_normalize(&self.definition)?
840 } else {
841 self.definition.normalized()
842 };
843
844 let start_id = find_start_node_id(&workflow)?;
845 self.execute_from_node(
846 workflow,
847 RuntimeScope::new(input),
848 start_id,
849 0,
850 cancellation,
851 )
852 .await
853 }
854
855 pub async fn execute_resume_from_failure(
857 &self,
858 checkpoint: &WorkflowCheckpoint,
859 cancellation: Option<&dyn CancellationSignal>,
860 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
861 let workflow = if self.options.validate_before_run {
862 validate_and_normalize(&self.definition)?
863 } else {
864 self.definition.normalized()
865 };
866
867 let scope_input = checkpoint
868 .scope_snapshot
869 .get("input")
870 .cloned()
871 .unwrap_or_else(|| checkpoint.scope_snapshot.clone());
872
873 self.execute_from_node(
874 workflow,
875 RuntimeScope::new(scope_input),
876 checkpoint.next_node_id.clone(),
877 checkpoint.step,
878 cancellation,
879 )
880 .await
881 }
882
883 async fn execute_from_node(
884 &self,
885 workflow: WorkflowDefinition,
886 scope: RuntimeScope,
887 start_node_id: String,
888 starting_step: usize,
889 cancellation: Option<&dyn CancellationSignal>,
890 ) -> Result<WorkflowRunResult, WorkflowRuntimeError> {
891 engine::execute_from_node(
892 self,
893 workflow,
894 scope,
895 start_node_id,
896 starting_step,
897 cancellation,
898 )
899 .await
900 }
901
902 async fn execute_node(
903 &self,
904 node: &Node,
905 node_index: &HashMap<&str, &Node>,
906 step: usize,
907 scope: &mut RuntimeScope,
908 cancellation: Option<&dyn CancellationSignal>,
909 retry_events: &mut Vec<WorkflowRetryEvent>,
910 ) -> Result<NodeExecution, WorkflowRuntimeError> {
911 engine::execute_node(
912 self,
913 node,
914 node_index,
915 step,
916 scope,
917 cancellation,
918 retry_events,
919 )
920 .await
921 }
922
923 fn execute_start_node(&self, step: usize, node: &Node, next: &str) -> NodeExecution {
924 NodeExecution {
925 step,
926 node_id: node.id.clone(),
927 data: NodeExecutionData::Start {
928 next: next.to_string(),
929 },
930 }
931 }
932
933 async fn execute_llm_node(
934 &self,
935 step: usize,
936 node: &Node,
937 spec: LlmNodeSpec<'_>,
938 scope: &mut RuntimeScope,
939 cancellation: Option<&dyn CancellationSignal>,
940 retry_events: &mut Vec<WorkflowRetryEvent>,
941 ) -> Result<NodeExecution, WorkflowRuntimeError> {
942 let next_node = spec
943 .next
944 .clone()
945 .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
946 node_id: node.id.clone(),
947 })?;
948
949 let (output, llm_retries) = self
950 .execute_llm_with_policy(step, node, spec.model, spec.prompt, scope, cancellation)
951 .await?;
952 retry_events.extend(llm_retries);
953
954 scope
955 .record_llm_output(&node.id, output.content.clone(), ScopeCapability::LlmWrite)
956 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
957 node_id: node.id.clone(),
958 source,
959 })?;
960
961 Ok(NodeExecution {
962 step,
963 node_id: node.id.clone(),
964 data: NodeExecutionData::Llm {
965 model: spec.model.to_string(),
966 output: output.content,
967 next: next_node,
968 },
969 })
970 }
971
972 async fn execute_tool_node(
973 &self,
974 step: usize,
975 node: &Node,
976 spec: ToolNodeSpec<'_>,
977 scope: &mut RuntimeScope,
978 cancellation: Option<&dyn CancellationSignal>,
979 retry_events: &mut Vec<WorkflowRetryEvent>,
980 ) -> Result<NodeExecution, WorkflowRuntimeError> {
981 let next_node = spec
982 .next
983 .clone()
984 .ok_or_else(|| WorkflowRuntimeError::MissingNextEdge {
985 node_id: node.id.clone(),
986 })?;
987
988 let executor =
989 self.tool_executor
990 .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
991 node_id: node.id.clone(),
992 })?;
993
994 let scoped_input = scope
995 .scoped_input(ScopeCapability::ToolRead)
996 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
997 node_id: node.id.clone(),
998 source,
999 })?;
1000
1001 let (tool_output, tool_retries) = self
1002 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1003 step,
1004 node,
1005 tool: spec.tool,
1006 input: spec.input,
1007 executor,
1008 scoped_input,
1009 cancellation,
1010 })
1011 .await?;
1012 retry_events.extend(tool_retries);
1013
1014 scope
1015 .record_tool_output(&node.id, tool_output.clone(), ScopeCapability::ToolWrite)
1016 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1017 node_id: node.id.clone(),
1018 source,
1019 })?;
1020
1021 Ok(NodeExecution {
1022 step,
1023 node_id: node.id.clone(),
1024 data: NodeExecutionData::Tool {
1025 tool: spec.tool.to_string(),
1026 output: tool_output,
1027 next: next_node,
1028 },
1029 })
1030 }
1031
1032 fn execute_condition_node(
1033 &self,
1034 step: usize,
1035 node: &Node,
1036 spec: ConditionNodeSpec<'_>,
1037 scope: &mut RuntimeScope,
1038 cancellation: Option<&dyn CancellationSignal>,
1039 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1040 check_cancelled(cancellation)?;
1041 let scoped_input =
1042 scope
1043 .scoped_input(ScopeCapability::ConditionRead)
1044 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1045 node_id: node.id.clone(),
1046 source,
1047 })?;
1048 enforce_expression_scope_budget(
1049 &node.id,
1050 &scoped_input,
1051 self.options.security_limits.max_expression_scope_bytes,
1052 )?;
1053 let evaluated =
1054 expressions::evaluate_bool(spec.expression, &scoped_input).map_err(|reason| {
1055 WorkflowRuntimeError::InvalidCondition {
1056 node_id: node.id.clone(),
1057 expression: spec.expression.to_string(),
1058 reason: reason.to_string(),
1059 }
1060 })?;
1061 let next = if evaluated {
1062 spec.on_true.to_string()
1063 } else {
1064 spec.on_false.to_string()
1065 };
1066
1067 scope
1068 .record_condition_output(&node.id, evaluated, ScopeCapability::ConditionWrite)
1069 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1070 node_id: node.id.clone(),
1071 source,
1072 })?;
1073
1074 Ok(NodeExecution {
1075 step,
1076 node_id: node.id.clone(),
1077 data: NodeExecutionData::Condition {
1078 expression: spec.expression.to_string(),
1079 evaluated,
1080 next,
1081 },
1082 })
1083 }
1084
1085 async fn execute_parallel_node(
1086 &self,
1087 step: usize,
1088 node: &Node,
1089 spec: ParallelNodeSpec<'_>,
1090 scope: &mut RuntimeScope,
1091 cancellation: Option<&dyn CancellationSignal>,
1092 retry_events: &mut Vec<WorkflowRetryEvent>,
1093 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1094 check_cancelled(cancellation)?;
1095 if spec.branches.len() > self.options.security_limits.max_parallel_branches {
1096 return Err(WorkflowRuntimeError::ParallelBranchLimitExceeded {
1097 node_id: node.id.clone(),
1098 actual_branches: spec.branches.len(),
1099 max_branches: self.options.security_limits.max_parallel_branches,
1100 });
1101 }
1102 let base_scope = scope
1103 .scoped_input(ScopeCapability::MapRead)
1104 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1105 node_id: node.id.clone(),
1106 source,
1107 })?;
1108 let scheduler = DagScheduler::new(
1109 spec.max_in_flight
1110 .unwrap_or(self.options.scheduler_max_in_flight),
1111 );
1112 let parallel_node_id = node.id.clone();
1113
1114 let branch_outputs: Vec<(String, Value, Vec<WorkflowRetryEvent>)> = scheduler
1115 .run_bounded(spec.branches.iter().cloned(), |branch_id| {
1116 let parallel_node_id = parallel_node_id.clone();
1117 let base_scope = base_scope.clone();
1118 async move {
1119 let branch_node = spec.node_index.get(branch_id.as_str()).ok_or_else(|| {
1120 WorkflowRuntimeError::NodeNotFound {
1121 node_id: branch_id.clone(),
1122 }
1123 })?;
1124 self.execute_parallel_branch(
1125 step,
1126 ¶llel_node_id,
1127 branch_node,
1128 base_scope,
1129 cancellation,
1130 )
1131 .await
1132 }
1133 })
1134 .await?;
1135
1136 let mut outputs: BTreeMap<String, Value> = BTreeMap::new();
1137 for (branch_id, output, branch_retry_events) in branch_outputs {
1138 retry_events.extend(branch_retry_events);
1139 scope
1140 .record_node_output(&branch_id, output.clone(), ScopeCapability::MapWrite)
1141 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1142 node_id: node.id.clone(),
1143 source,
1144 })?;
1145 outputs.insert(branch_id, output);
1146 }
1147
1148 scope
1149 .record_node_output(
1150 &node.id,
1151 Value::Object(
1152 outputs
1153 .iter()
1154 .map(|(key, value)| (key.clone(), value.clone()))
1155 .collect(),
1156 ),
1157 ScopeCapability::MapWrite,
1158 )
1159 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1160 node_id: node.id.clone(),
1161 source,
1162 })?;
1163
1164 Ok(NodeExecution {
1165 step,
1166 node_id: node.id.clone(),
1167 data: NodeExecutionData::Parallel {
1168 branches: spec.branches.to_vec(),
1169 outputs,
1170 next: spec.next.to_string(),
1171 },
1172 })
1173 }
1174
1175 fn execute_merge_node(
1176 &self,
1177 step: usize,
1178 node: &Node,
1179 spec: MergeNodeSpec<'_>,
1180 scope: &mut RuntimeScope,
1181 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1182 let mut resolved = Vec::with_capacity(spec.sources.len());
1183 for source in spec.sources {
1184 let Some(value) = scope.node_output(source).cloned() else {
1185 return Err(WorkflowRuntimeError::MissingMergeSource {
1186 node_id: node.id.clone(),
1187 source_id: source.clone(),
1188 });
1189 };
1190 resolved.push((source.clone(), value));
1191 }
1192
1193 let output = match spec.policy {
1194 MergePolicy::First => resolved
1195 .first()
1196 .map(|(_, value)| value.clone())
1197 .unwrap_or(Value::Null),
1198 MergePolicy::All => Value::Array(
1199 resolved
1200 .iter()
1201 .map(|(_, value)| value.clone())
1202 .collect::<Vec<_>>(),
1203 ),
1204 MergePolicy::Quorum => {
1205 let required = spec.quorum.unwrap_or_default();
1206 let resolved_count = resolved.len();
1207 if resolved_count < required {
1208 return Err(WorkflowRuntimeError::MergeQuorumNotMet {
1209 node_id: node.id.clone(),
1210 required,
1211 resolved: resolved_count,
1212 });
1213 }
1214 Value::Array(
1215 resolved
1216 .iter()
1217 .take(required)
1218 .map(|(_, value)| value.clone())
1219 .collect::<Vec<_>>(),
1220 )
1221 }
1222 };
1223
1224 scope
1225 .record_node_output(&node.id, output.clone(), ScopeCapability::ReduceWrite)
1226 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1227 node_id: node.id.clone(),
1228 source,
1229 })?;
1230
1231 Ok(NodeExecution {
1232 step,
1233 node_id: node.id.clone(),
1234 data: NodeExecutionData::Merge {
1235 policy: spec.policy.clone(),
1236 sources: spec.sources.to_vec(),
1237 output,
1238 next: spec.next.to_string(),
1239 },
1240 })
1241 }
1242
1243 async fn execute_map_node(
1244 &self,
1245 step: usize,
1246 node: &Node,
1247 spec: MapNodeSpec<'_>,
1248 scope: &mut RuntimeScope,
1249 cancellation: Option<&dyn CancellationSignal>,
1250 retry_events: &mut Vec<WorkflowRetryEvent>,
1251 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1252 let executor =
1253 self.tool_executor
1254 .ok_or_else(|| WorkflowRuntimeError::MissingToolExecutor {
1255 node_id: node.id.clone(),
1256 })?;
1257
1258 let scoped_input = scope
1259 .scoped_input(ScopeCapability::MapRead)
1260 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1261 node_id: node.id.clone(),
1262 source,
1263 })?;
1264 let items = resolve_path(&scoped_input, spec.items_path)
1265 .and_then(Value::as_array)
1266 .ok_or_else(|| WorkflowRuntimeError::MapItemsNotArray {
1267 node_id: node.id.clone(),
1268 items_path: spec.items_path.to_string(),
1269 })?
1270 .clone();
1271 if items.len() > self.options.security_limits.max_map_items {
1272 return Err(WorkflowRuntimeError::MapItemLimitExceeded {
1273 node_id: node.id.clone(),
1274 actual_items: items.len(),
1275 max_items: self.options.security_limits.max_map_items,
1276 });
1277 }
1278
1279 let scheduler = DagScheduler::new(
1280 spec.max_in_flight
1281 .unwrap_or(self.options.scheduler_max_in_flight),
1282 );
1283 let map_node = node.clone();
1284 let mapped: Vec<(Value, Vec<WorkflowRetryEvent>)> = scheduler
1285 .run_bounded(items.into_iter().enumerate(), |(index, item)| {
1286 let scoped_input = scoped_input.clone();
1287 let map_node = map_node.clone();
1288 async move {
1289 let item_scope = map_item_scoped_input(&scoped_input, &item, index);
1290 let (output, retries) = self
1291 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1292 step,
1293 node: &map_node,
1294 tool: spec.tool,
1295 input: &item,
1296 executor,
1297 scoped_input: item_scope,
1298 cancellation,
1299 })
1300 .await?;
1301 Ok::<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError>((output, retries))
1302 }
1303 })
1304 .await?;
1305
1306 let mut outputs = Vec::with_capacity(mapped.len());
1307 for (output, local_retries) in mapped {
1308 outputs.push(output);
1309 retry_events.extend(local_retries);
1310 }
1311
1312 let output = Value::Array(outputs);
1313 scope
1314 .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1315 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1316 node_id: node.id.clone(),
1317 source,
1318 })?;
1319
1320 Ok(NodeExecution {
1321 step,
1322 node_id: node.id.clone(),
1323 data: NodeExecutionData::Map {
1324 item_count: output.as_array().map_or(0, Vec::len),
1325 output,
1326 next: spec.next.to_string(),
1327 },
1328 })
1329 }
1330
1331 fn execute_reduce_node(
1332 &self,
1333 step: usize,
1334 node: &Node,
1335 source: &str,
1336 operation: &ReduceOperation,
1337 next: &str,
1338 scope: &mut RuntimeScope,
1339 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1340 let source_value = scope.node_output(source).cloned().ok_or_else(|| {
1341 WorkflowRuntimeError::MissingMergeSource {
1342 node_id: node.id.clone(),
1343 source_id: source.to_string(),
1344 }
1345 })?;
1346
1347 let reduced = reduce_value(&node.id, source, operation, source_value)?;
1348 scope
1349 .record_node_output(&node.id, reduced.clone(), ScopeCapability::ReduceWrite)
1350 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1351 node_id: node.id.clone(),
1352 source,
1353 })?;
1354
1355 Ok(NodeExecution {
1356 step,
1357 node_id: node.id.clone(),
1358 data: NodeExecutionData::Reduce {
1359 operation: operation.clone(),
1360 output: reduced,
1361 next: next.to_string(),
1362 },
1363 })
1364 }
1365
1366 fn execute_batch_node(
1367 &self,
1368 step: usize,
1369 node: &Node,
1370 items_path: &str,
1371 next: &str,
1372 scope: &mut RuntimeScope,
1373 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1374 let scoped = scope
1375 .scoped_input(ScopeCapability::MapRead)
1376 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1377 node_id: node.id.clone(),
1378 source,
1379 })?;
1380 let items = resolve_path(&scoped, items_path).ok_or_else(|| {
1381 WorkflowRuntimeError::BatchItemsNotArray {
1382 node_id: node.id.clone(),
1383 items_path: items_path.to_string(),
1384 }
1385 })?;
1386 let array = items
1387 .as_array()
1388 .ok_or_else(|| WorkflowRuntimeError::BatchItemsNotArray {
1389 node_id: node.id.clone(),
1390 items_path: items_path.to_string(),
1391 })?;
1392
1393 let output = Value::Array(array.clone());
1394 scope
1395 .record_node_output(&node.id, output.clone(), ScopeCapability::MapWrite)
1396 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1397 node_id: node.id.clone(),
1398 source,
1399 })?;
1400
1401 Ok(NodeExecution {
1402 step,
1403 node_id: node.id.clone(),
1404 data: NodeExecutionData::Batch {
1405 items_path: items_path.to_string(),
1406 item_count: array.len(),
1407 next: next.to_string(),
1408 },
1409 })
1410 }
1411
1412 fn execute_filter_node(
1413 &self,
1414 step: usize,
1415 node: &Node,
1416 items_path: &str,
1417 expression: &str,
1418 next: &str,
1419 scope: &mut RuntimeScope,
1420 ) -> Result<NodeExecution, WorkflowRuntimeError> {
1421 let scoped = scope
1422 .scoped_input(ScopeCapability::MapRead)
1423 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1424 node_id: node.id.clone(),
1425 source,
1426 })?;
1427 let items_value = resolve_path(&scoped, items_path).ok_or_else(|| {
1428 WorkflowRuntimeError::FilterItemsNotArray {
1429 node_id: node.id.clone(),
1430 items_path: items_path.to_string(),
1431 }
1432 })?;
1433 let array =
1434 items_value
1435 .as_array()
1436 .ok_or_else(|| WorkflowRuntimeError::FilterItemsNotArray {
1437 node_id: node.id.clone(),
1438 items_path: items_path.to_string(),
1439 })?;
1440 if array.len() > self.options.security_limits.max_filter_items {
1441 return Err(WorkflowRuntimeError::FilterItemLimitExceeded {
1442 node_id: node.id.clone(),
1443 actual_items: array.len(),
1444 max_items: self.options.security_limits.max_filter_items,
1445 });
1446 }
1447
1448 let mut kept = Vec::new();
1449 for (index, item) in array.iter().enumerate() {
1450 let mut eval_scope = scoped.clone();
1451 if let Some(object) = eval_scope.as_object_mut() {
1452 object.insert("item".to_string(), item.clone());
1453 object.insert("item_index".to_string(), Value::from(index as u64));
1454 }
1455 enforce_expression_scope_budget(
1456 &node.id,
1457 &eval_scope,
1458 self.options.security_limits.max_expression_scope_bytes,
1459 )?;
1460 let include =
1461 expressions::evaluate_bool(expression, &eval_scope).map_err(|reason| {
1462 WorkflowRuntimeError::InvalidFilterExpression {
1463 node_id: node.id.clone(),
1464 expression: expression.to_string(),
1465 reason: reason.to_string(),
1466 }
1467 })?;
1468 if include {
1469 kept.push(item.clone());
1470 }
1471 }
1472 let output = Value::Array(kept.clone());
1473 scope
1474 .record_node_output(&node.id, output, ScopeCapability::MapWrite)
1475 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1476 node_id: node.id.clone(),
1477 source,
1478 })?;
1479
1480 Ok(NodeExecution {
1481 step,
1482 node_id: node.id.clone(),
1483 data: NodeExecutionData::Filter {
1484 items_path: items_path.to_string(),
1485 expression: expression.to_string(),
1486 kept: kept.len(),
1487 next: next.to_string(),
1488 },
1489 })
1490 }
1491
1492 async fn execute_llm_with_policy(
1493 &self,
1494 step: usize,
1495 node: &Node,
1496 model: &str,
1497 prompt: &str,
1498 scope: &RuntimeScope,
1499 cancellation: Option<&dyn CancellationSignal>,
1500 ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1501 let scoped_input = scope
1502 .scoped_input(ScopeCapability::LlmRead)
1503 .map_err(|source| WorkflowRuntimeError::ScopeAccess {
1504 node_id: node.id.clone(),
1505 source,
1506 })?;
1507
1508 self.execute_llm_with_policy_for_scope(
1509 step,
1510 node,
1511 model,
1512 prompt,
1513 scoped_input,
1514 cancellation,
1515 )
1516 .await
1517 }
1518
1519 async fn execute_parallel_branch(
1520 &self,
1521 step: usize,
1522 parallel_node_id: &str,
1523 branch_node: &Node,
1524 scoped_input: Value,
1525 cancellation: Option<&dyn CancellationSignal>,
1526 ) -> Result<(String, Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1527 match &branch_node.kind {
1528 NodeKind::Llm {
1529 model,
1530 prompt,
1531 next: _,
1532 } => {
1533 let (output, retries) = self
1534 .execute_llm_with_policy_for_scope(
1535 step,
1536 branch_node,
1537 model,
1538 prompt,
1539 scoped_input,
1540 cancellation,
1541 )
1542 .await?;
1543 Ok((
1544 branch_node.id.clone(),
1545 Value::String(output.content),
1546 retries,
1547 ))
1548 }
1549 NodeKind::Tool { tool, input, .. } => {
1550 let executor = self.tool_executor.ok_or_else(|| {
1551 WorkflowRuntimeError::MissingToolExecutor {
1552 node_id: branch_node.id.clone(),
1553 }
1554 })?;
1555 let (output, retries) = self
1556 .execute_tool_with_policy_for_scope(ToolPolicyRequest {
1557 step,
1558 node: branch_node,
1559 tool,
1560 input,
1561 executor,
1562 scoped_input,
1563 cancellation,
1564 })
1565 .await?;
1566 Ok((branch_node.id.clone(), output, retries))
1567 }
1568 _ => Err(WorkflowRuntimeError::ParallelBranchUnsupported {
1569 node_id: parallel_node_id.to_string(),
1570 branch_id: branch_node.id.clone(),
1571 reason: "only llm/tool branches are supported".to_string(),
1572 }),
1573 }
1574 }
1575
1576 async fn execute_llm_with_policy_for_scope(
1577 &self,
1578 step: usize,
1579 node: &Node,
1580 model: &str,
1581 prompt: &str,
1582 scoped_input: Value,
1583 cancellation: Option<&dyn CancellationSignal>,
1584 ) -> Result<(LlmExecutionOutput, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1585 let max_attempts = self.options.llm_node_policy.max_retries.saturating_add(1);
1586 let mut retry_events = Vec::new();
1587
1588 for attempt in 1..=max_attempts {
1589 check_cancelled(cancellation)?;
1590
1591 let execution = self.llm_executor.execute(LlmExecutionInput {
1592 node_id: node.id.clone(),
1593 model: model.to_string(),
1594 prompt: prompt.to_string(),
1595 scoped_input: scoped_input.clone(),
1596 });
1597
1598 let outcome = if let Some(timeout_duration) = self.options.llm_node_policy.timeout {
1599 match timeout(timeout_duration, execution).await {
1600 Ok(result) => result,
1601 Err(_) => {
1602 if attempt == max_attempts {
1603 return Err(WorkflowRuntimeError::LlmTimeout {
1604 node_id: node.id.clone(),
1605 timeout_ms: timeout_duration.as_millis(),
1606 attempts: attempt,
1607 });
1608 }
1609 retry_events.push(WorkflowRetryEvent {
1610 step,
1611 node_id: node.id.clone(),
1612 operation: "llm".to_string(),
1613 failed_attempt: attempt,
1614 reason: format!(
1615 "attempt {} timed out after {} ms",
1616 attempt,
1617 timeout_duration.as_millis()
1618 ),
1619 });
1620 check_cancelled(cancellation)?;
1621 continue;
1622 }
1623 }
1624 } else {
1625 execution.await
1626 };
1627
1628 match outcome {
1629 Ok(output) => return Ok((output, retry_events)),
1630 Err(last_error) => {
1631 if attempt == max_attempts {
1632 return Err(WorkflowRuntimeError::LlmRetryExhausted {
1633 node_id: node.id.clone(),
1634 attempts: attempt,
1635 last_error,
1636 });
1637 }
1638 retry_events.push(WorkflowRetryEvent {
1639 step,
1640 node_id: node.id.clone(),
1641 operation: "llm".to_string(),
1642 failed_attempt: attempt,
1643 reason: last_error.to_string(),
1644 });
1645 check_cancelled(cancellation)?;
1646 }
1647 }
1648 }
1649
1650 unreachable!("llm attempts loop always returns")
1651 }
1652
1653 async fn execute_tool_with_policy_for_scope(
1654 &self,
1655 request: ToolPolicyRequest<'_>,
1656 ) -> Result<(Value, Vec<WorkflowRetryEvent>), WorkflowRuntimeError> {
1657 let ToolPolicyRequest {
1658 step,
1659 node,
1660 tool,
1661 input,
1662 executor,
1663 scoped_input,
1664 cancellation,
1665 } = request;
1666 let max_attempts = self.options.tool_node_policy.max_retries.saturating_add(1);
1667 let mut retry_events = Vec::new();
1668
1669 for attempt in 1..=max_attempts {
1670 check_cancelled(cancellation)?;
1671
1672 let execution = executor.execute_tool(ToolExecutionInput {
1673 node_id: node.id.clone(),
1674 tool: tool.to_string(),
1675 input: input.clone(),
1676 scoped_input: scoped_input.clone(),
1677 });
1678
1679 let outcome = if let Some(timeout_duration) = self.options.tool_node_policy.timeout {
1680 match timeout(timeout_duration, execution).await {
1681 Ok(result) => result,
1682 Err(_) => {
1683 if attempt == max_attempts {
1684 return Err(WorkflowRuntimeError::ToolTimeout {
1685 node_id: node.id.clone(),
1686 timeout_ms: timeout_duration.as_millis(),
1687 attempts: attempt,
1688 });
1689 }
1690 retry_events.push(WorkflowRetryEvent {
1691 step,
1692 node_id: node.id.clone(),
1693 operation: "tool".to_string(),
1694 failed_attempt: attempt,
1695 reason: format!(
1696 "attempt {} timed out after {} ms",
1697 attempt,
1698 timeout_duration.as_millis()
1699 ),
1700 });
1701 check_cancelled(cancellation)?;
1702 continue;
1703 }
1704 }
1705 } else {
1706 execution.await
1707 };
1708
1709 match outcome {
1710 Ok(output) => return Ok((output, retry_events)),
1711 Err(last_error) => {
1712 if attempt == max_attempts {
1713 return Err(WorkflowRuntimeError::ToolRetryExhausted {
1714 node_id: node.id.clone(),
1715 attempts: attempt,
1716 last_error,
1717 });
1718 }
1719 retry_events.push(WorkflowRetryEvent {
1720 step,
1721 node_id: node.id.clone(),
1722 operation: "tool".to_string(),
1723 failed_attempt: attempt,
1724 reason: last_error.to_string(),
1725 });
1726 check_cancelled(cancellation)?;
1727 }
1728 }
1729 }
1730
1731 unreachable!("tool attempts loop always returns")
1732 }
1733}
1734
1735struct ToolPolicyRequest<'a> {
1736 step: usize,
1737 node: &'a Node,
1738 tool: &'a str,
1739 input: &'a Value,
1740 executor: &'a dyn ToolExecutor,
1741 scoped_input: Value,
1742 cancellation: Option<&'a dyn CancellationSignal>,
1743}
1744
1745struct LlmNodeSpec<'a> {
1746 model: &'a str,
1747 prompt: &'a str,
1748 next: &'a Option<String>,
1749}
1750
1751struct ToolNodeSpec<'a> {
1752 tool: &'a str,
1753 input: &'a Value,
1754 next: &'a Option<String>,
1755}
1756
1757struct ConditionNodeSpec<'a> {
1758 expression: &'a str,
1759 on_true: &'a str,
1760 on_false: &'a str,
1761}
1762
1763struct ParallelNodeSpec<'a> {
1764 node_index: &'a HashMap<&'a str, &'a Node>,
1765 branches: &'a [String],
1766 next: &'a str,
1767 max_in_flight: Option<usize>,
1768}
1769
1770struct MergeNodeSpec<'a> {
1771 sources: &'a [String],
1772 policy: &'a MergePolicy,
1773 quorum: Option<usize>,
1774 next: &'a str,
1775}
1776
1777struct MapNodeSpec<'a> {
1778 tool: &'a str,
1779 items_path: &'a str,
1780 next: &'a str,
1781 max_in_flight: Option<usize>,
1782}
1783
1784fn check_cancelled(
1785 cancellation: Option<&dyn CancellationSignal>,
1786) -> Result<(), WorkflowRuntimeError> {
1787 if cancellation.is_some_and(CancellationSignal::is_cancelled) {
1788 Err(WorkflowRuntimeError::Cancelled)
1789 } else {
1790 Ok(())
1791 }
1792}
1793
1794fn next_trace_timestamp(clock: &mut u64) -> u64 {
1795 let timestamp = *clock;
1796 *clock = clock.saturating_add(1);
1797 timestamp
1798}
1799
1800fn build_prompt_with_scope(prompt: &str, scoped_input: &Value) -> String {
1801 format!("{}\n\nScoped context:\n{}", prompt, scoped_input)
1802}
1803
1804fn build_node_index(workflow: &WorkflowDefinition) -> HashMap<&str, &Node> {
1805 let mut index = HashMap::with_capacity(workflow.nodes.len());
1806 for node in &workflow.nodes {
1807 index.insert(node.id.as_str(), node);
1808 }
1809 index
1810}
1811
1812fn find_start_node_id(workflow: &WorkflowDefinition) -> Result<String, WorkflowRuntimeError> {
1813 workflow
1814 .nodes
1815 .iter()
1816 .find_map(|node| match node.kind {
1817 NodeKind::Start { .. } => Some(node.id.clone()),
1818 _ => None,
1819 })
1820 .ok_or(WorkflowRuntimeError::MissingStartNode)
1821}
1822
1823fn next_node_id(data: &NodeExecutionData) -> Option<String> {
1824 match data {
1825 NodeExecutionData::Start { next }
1826 | NodeExecutionData::Llm { next, .. }
1827 | NodeExecutionData::Tool { next, .. }
1828 | NodeExecutionData::Condition { next, .. }
1829 | NodeExecutionData::Debounce { next, .. }
1830 | NodeExecutionData::Throttle { next, .. }
1831 | NodeExecutionData::RetryCompensate { next, .. }
1832 | NodeExecutionData::HumanInTheLoop { next, .. }
1833 | NodeExecutionData::CacheRead { next, .. }
1834 | NodeExecutionData::CacheWrite { next, .. }
1835 | NodeExecutionData::EventTrigger { next, .. }
1836 | NodeExecutionData::Router { next, .. }
1837 | NodeExecutionData::Transform { next, .. }
1838 | NodeExecutionData::Loop { next, .. }
1839 | NodeExecutionData::Subgraph { next, .. }
1840 | NodeExecutionData::Batch { next, .. }
1841 | NodeExecutionData::Filter { next, .. }
1842 | NodeExecutionData::Parallel { next, .. }
1843 | NodeExecutionData::Merge { next, .. }
1844 | NodeExecutionData::Map { next, .. }
1845 | NodeExecutionData::Reduce { next, .. } => Some(next.clone()),
1846 NodeExecutionData::End => None,
1847 }
1848}
1849
1850fn resolve_path<'a>(scope: &'a Value, path: &str) -> Option<&'a Value> {
1851 let mut current = scope;
1852 for segment in path.split('.') {
1853 if segment.is_empty() {
1854 continue;
1855 }
1856 current = current.get(segment)?;
1857 }
1858 Some(current)
1859}
1860
1861fn resolve_string_path(scope: &Value, path: &str) -> Option<String> {
1862 resolve_path(scope, path)
1863 .and_then(Value::as_str)
1864 .map(str::to_string)
1865}
1866
1867fn evaluate_human_decision(value: &Value) -> Option<bool> {
1868 match value {
1869 Value::Bool(flag) => Some(*flag),
1870 Value::String(text) => {
1871 let normalized = text.trim().to_ascii_lowercase();
1872 match normalized.as_str() {
1873 "approve" | "approved" | "yes" | "true" => Some(true),
1874 "reject" | "rejected" | "no" | "false" => Some(false),
1875 _ => None,
1876 }
1877 }
1878 _ => None,
1879 }
1880}
1881
1882fn evaluate_transform_expression(expression: &str, scope: &Value) -> Result<Value, String> {
1883 let trimmed = expression.trim();
1884 if trimmed.is_empty() {
1885 return Err("expression is empty".to_string());
1886 }
1887
1888 if let Ok(value) = serde_json::from_str::<Value>(trimmed) {
1889 return Ok(value);
1890 }
1891
1892 let path = trimmed.strip_prefix("$.").unwrap_or(trimmed);
1893 resolve_path(scope, path)
1894 .cloned()
1895 .ok_or_else(|| format!("path '{path}' not found in scoped input"))
1896}
1897
1898fn map_item_scoped_input(base_scope: &Value, item: &Value, index: usize) -> Value {
1899 let mut object = match base_scope {
1900 Value::Object(map) => map.clone(),
1901 _ => Map::new(),
1902 };
1903 object.insert("map_item".to_string(), item.clone());
1904 object.insert("map_index".to_string(), Value::from(index as u64));
1905 Value::Object(object)
1906}
1907
1908fn enforce_expression_scope_budget(
1909 node_id: &str,
1910 scoped_input: &Value,
1911 max_expression_scope_bytes: usize,
1912) -> Result<(), WorkflowRuntimeError> {
1913 let size = serde_json::to_vec(scoped_input)
1914 .map(|bytes| bytes.len())
1915 .unwrap_or(max_expression_scope_bytes.saturating_add(1));
1916 if size > max_expression_scope_bytes {
1917 return Err(WorkflowRuntimeError::ExpressionScopeLimitExceeded {
1918 node_id: node_id.to_string(),
1919 actual_bytes: size,
1920 limit_bytes: max_expression_scope_bytes,
1921 });
1922 }
1923 Ok(())
1924}
1925
1926fn reduce_value(
1927 node_id: &str,
1928 source: &str,
1929 operation: &ReduceOperation,
1930 source_value: Value,
1931) -> Result<Value, WorkflowRuntimeError> {
1932 let items =
1933 source_value
1934 .as_array()
1935 .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
1936 node_id: node_id.to_string(),
1937 source_node: source.to_string(),
1938 reason: "expected source output to be an array".to_string(),
1939 })?;
1940
1941 match operation {
1942 ReduceOperation::Count => Ok(Value::from(items.len() as u64)),
1943 ReduceOperation::Sum => {
1944 let mut sum = 0.0f64;
1945 for value in items {
1946 let number =
1947 value
1948 .as_f64()
1949 .ok_or_else(|| WorkflowRuntimeError::InvalidReduceInput {
1950 node_id: node_id.to_string(),
1951 source_node: source.to_string(),
1952 reason: "sum operation requires numeric array values".to_string(),
1953 })?;
1954 sum += number;
1955 }
1956 let number = serde_json::Number::from_f64(sum).ok_or_else(|| {
1957 WorkflowRuntimeError::InvalidReduceInput {
1958 node_id: node_id.to_string(),
1959 source_node: source.to_string(),
1960 reason: "sum produced non-finite value".to_string(),
1961 }
1962 })?;
1963 Ok(Value::Number(number))
1964 }
1965 }
1966}
1967
1968#[cfg(test)]
1969mod tests {
1970 use std::sync::atomic::{AtomicUsize, Ordering};
1971 use std::sync::{Arc, Mutex};
1972 use std::time::Duration;
1973
1974 use async_trait::async_trait;
1975 use serde_json::json;
1976 use tokio::time::sleep;
1977
1978 use super::*;
1979 use crate::ir::{MergePolicy, Node, NodeKind, ReduceOperation, WorkflowDefinition};
1980
1981 struct MockLlmExecutor {
1982 output: String,
1983 }
1984
1985 #[async_trait]
1986 impl LlmExecutor for MockLlmExecutor {
1987 async fn execute(
1988 &self,
1989 _input: LlmExecutionInput,
1990 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
1991 Ok(LlmExecutionOutput {
1992 content: self.output.clone(),
1993 })
1994 }
1995 }
1996
1997 struct MockToolExecutor {
1998 output: Value,
1999 fail: bool,
2000 }
2001
2002 #[async_trait]
2003 impl ToolExecutor for MockToolExecutor {
2004 async fn execute_tool(
2005 &self,
2006 input: ToolExecutionInput,
2007 ) -> Result<Value, ToolExecutionError> {
2008 if self.fail {
2009 return Err(ToolExecutionError::Failed(format!(
2010 "tool '{}' failed intentionally",
2011 input.tool
2012 )));
2013 }
2014 Ok(self.output.clone())
2015 }
2016 }
2017
2018 struct SequencedLlmExecutor {
2019 responses: Mutex<Vec<Result<LlmExecutionOutput, LlmExecutionError>>>,
2020 calls: AtomicUsize,
2021 }
2022
2023 #[async_trait]
2024 impl LlmExecutor for SequencedLlmExecutor {
2025 async fn execute(
2026 &self,
2027 _input: LlmExecutionInput,
2028 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2029 self.calls.fetch_add(1, Ordering::Relaxed);
2030 self.responses
2031 .lock()
2032 .expect("sequenced llm lock poisoned")
2033 .remove(0)
2034 }
2035 }
2036
2037 struct SlowToolExecutor {
2038 delay: Duration,
2039 }
2040
2041 #[async_trait]
2042 impl ToolExecutor for SlowToolExecutor {
2043 async fn execute_tool(
2044 &self,
2045 _input: ToolExecutionInput,
2046 ) -> Result<Value, ToolExecutionError> {
2047 sleep(self.delay).await;
2048 Ok(json!({"status": "slow-ok"}))
2049 }
2050 }
2051
2052 struct IncrementingToolExecutor {
2053 value: AtomicUsize,
2054 }
2055
2056 #[async_trait]
2057 impl ToolExecutor for IncrementingToolExecutor {
2058 async fn execute_tool(
2059 &self,
2060 _input: ToolExecutionInput,
2061 ) -> Result<Value, ToolExecutionError> {
2062 let next = self.value.fetch_add(1, Ordering::Relaxed) + 1;
2063 Ok(json!(next))
2064 }
2065 }
2066
2067 struct EchoInputToolExecutor;
2068
2069 #[async_trait]
2070 impl ToolExecutor for EchoInputToolExecutor {
2071 async fn execute_tool(
2072 &self,
2073 input: ToolExecutionInput,
2074 ) -> Result<Value, ToolExecutionError> {
2075 Ok(input.input)
2076 }
2077 }
2078
2079 struct RetryCompensateToolExecutor {
2080 attempts: AtomicUsize,
2081 }
2082
2083 #[async_trait]
2084 impl ToolExecutor for RetryCompensateToolExecutor {
2085 async fn execute_tool(
2086 &self,
2087 input: ToolExecutionInput,
2088 ) -> Result<Value, ToolExecutionError> {
2089 match input.tool.as_str() {
2090 "unstable_primary" => {
2091 let current = self.attempts.fetch_add(1, Ordering::Relaxed) + 1;
2092 if current <= 1 {
2093 Err(ToolExecutionError::Failed("primary failed".to_string()))
2094 } else {
2095 Ok(json!({"primary_attempt": current}))
2096 }
2097 }
2098 "always_fail" => Err(ToolExecutionError::Failed("always fail".to_string())),
2099 "compensate" => Ok(json!({"compensated": true})),
2100 _ => Err(ToolExecutionError::NotFound {
2101 tool: input.tool.clone(),
2102 }),
2103 }
2104 }
2105 }
2106
2107 struct CancellingLlmExecutor {
2108 cancel_flag: Arc<AtomicBool>,
2109 calls: AtomicUsize,
2110 }
2111
2112 #[async_trait]
2113 impl LlmExecutor for CancellingLlmExecutor {
2114 async fn execute(
2115 &self,
2116 _input: LlmExecutionInput,
2117 ) -> Result<LlmExecutionOutput, LlmExecutionError> {
2118 self.calls.fetch_add(1, Ordering::Relaxed);
2119 self.cancel_flag.store(true, Ordering::Relaxed);
2120 Err(LlmExecutionError::Client("transient failure".to_string()))
2121 }
2122 }
2123
2124 fn linear_workflow() -> WorkflowDefinition {
2125 WorkflowDefinition {
2126 version: "v0".to_string(),
2127 name: "linear".to_string(),
2128 nodes: vec![
2129 Node {
2130 id: "start".to_string(),
2131 kind: NodeKind::Start {
2132 next: "llm".to_string(),
2133 },
2134 },
2135 Node {
2136 id: "llm".to_string(),
2137 kind: NodeKind::Llm {
2138 model: "gpt-4".to_string(),
2139 prompt: "Summarize".to_string(),
2140 next: Some("tool".to_string()),
2141 },
2142 },
2143 Node {
2144 id: "tool".to_string(),
2145 kind: NodeKind::Tool {
2146 tool: "extract".to_string(),
2147 input: json!({"k": "v"}),
2148 next: Some("end".to_string()),
2149 },
2150 },
2151 Node {
2152 id: "end".to_string(),
2153 kind: NodeKind::End,
2154 },
2155 ],
2156 }
2157 }
2158
2159 fn llm_only_workflow() -> WorkflowDefinition {
2160 WorkflowDefinition {
2161 version: "v0".to_string(),
2162 name: "llm-only".to_string(),
2163 nodes: vec![
2164 Node {
2165 id: "start".to_string(),
2166 kind: NodeKind::Start {
2167 next: "llm".to_string(),
2168 },
2169 },
2170 Node {
2171 id: "llm".to_string(),
2172 kind: NodeKind::Llm {
2173 model: "gpt-4".to_string(),
2174 prompt: "Summarize".to_string(),
2175 next: Some("end".to_string()),
2176 },
2177 },
2178 Node {
2179 id: "end".to_string(),
2180 kind: NodeKind::End,
2181 },
2182 ],
2183 }
2184 }
2185
2186 fn loop_workflow(max_iterations: Option<u32>) -> WorkflowDefinition {
2187 WorkflowDefinition {
2188 version: "v0".to_string(),
2189 name: "loop-workflow".to_string(),
2190 nodes: vec![
2191 Node {
2192 id: "start".to_string(),
2193 kind: NodeKind::Start {
2194 next: "loop".to_string(),
2195 },
2196 },
2197 Node {
2198 id: "loop".to_string(),
2199 kind: NodeKind::Loop {
2200 condition: "last_tool_output != 3".to_string(),
2201 body: "counter".to_string(),
2202 next: "end".to_string(),
2203 max_iterations,
2204 },
2205 },
2206 Node {
2207 id: "counter".to_string(),
2208 kind: NodeKind::Tool {
2209 tool: "counter".to_string(),
2210 input: json!({}),
2211 next: Some("loop".to_string()),
2212 },
2213 },
2214 Node {
2215 id: "end".to_string(),
2216 kind: NodeKind::End,
2217 },
2218 ],
2219 }
2220 }
2221
2222 fn parallel_merge_workflow(policy: MergePolicy, quorum: Option<usize>) -> WorkflowDefinition {
2223 WorkflowDefinition {
2224 version: "v0".to_string(),
2225 name: "parallel-merge".to_string(),
2226 nodes: vec![
2227 Node {
2228 id: "start".to_string(),
2229 kind: NodeKind::Start {
2230 next: "parallel".to_string(),
2231 },
2232 },
2233 Node {
2234 id: "parallel".to_string(),
2235 kind: NodeKind::Parallel {
2236 branches: vec!["tool_a".to_string(), "tool_b".to_string()],
2237 next: "merge".to_string(),
2238 max_in_flight: Some(2),
2239 },
2240 },
2241 Node {
2242 id: "tool_a".to_string(),
2243 kind: NodeKind::Tool {
2244 tool: "extract".to_string(),
2245 input: json!({"value": 1}),
2246 next: Some("end".to_string()),
2247 },
2248 },
2249 Node {
2250 id: "tool_b".to_string(),
2251 kind: NodeKind::Tool {
2252 tool: "extract".to_string(),
2253 input: json!({"value": 2}),
2254 next: Some("end".to_string()),
2255 },
2256 },
2257 Node {
2258 id: "merge".to_string(),
2259 kind: NodeKind::Merge {
2260 sources: vec!["tool_a".to_string(), "tool_b".to_string()],
2261 policy,
2262 quorum,
2263 next: "end".to_string(),
2264 },
2265 },
2266 Node {
2267 id: "end".to_string(),
2268 kind: NodeKind::End,
2269 },
2270 ],
2271 }
2272 }
2273
2274 fn map_reduce_workflow(operation: ReduceOperation) -> WorkflowDefinition {
2275 WorkflowDefinition {
2276 version: "v0".to_string(),
2277 name: "map-reduce".to_string(),
2278 nodes: vec![
2279 Node {
2280 id: "start".to_string(),
2281 kind: NodeKind::Start {
2282 next: "map".to_string(),
2283 },
2284 },
2285 Node {
2286 id: "map".to_string(),
2287 kind: NodeKind::Map {
2288 tool: "counter".to_string(),
2289 items_path: "input.values".to_string(),
2290 next: "reduce".to_string(),
2291 max_in_flight: Some(3),
2292 },
2293 },
2294 Node {
2295 id: "reduce".to_string(),
2296 kind: NodeKind::Reduce {
2297 source: "map".to_string(),
2298 operation,
2299 next: "end".to_string(),
2300 },
2301 },
2302 Node {
2303 id: "end".to_string(),
2304 kind: NodeKind::End,
2305 },
2306 ],
2307 }
2308 }
2309
2310 fn debounce_and_throttle_workflow() -> WorkflowDefinition {
2311 WorkflowDefinition {
2312 version: "v0".to_string(),
2313 name: "debounce-throttle".to_string(),
2314 nodes: vec![
2315 Node {
2316 id: "start".to_string(),
2317 kind: NodeKind::Start {
2318 next: "debounce_a".to_string(),
2319 },
2320 },
2321 Node {
2322 id: "debounce_a".to_string(),
2323 kind: NodeKind::Debounce {
2324 key_path: "input.key".to_string(),
2325 window_steps: 3,
2326 next: "debounce_a".to_string(),
2327 on_suppressed: Some("throttle_a".to_string()),
2328 },
2329 },
2330 Node {
2331 id: "throttle_a".to_string(),
2332 kind: NodeKind::Throttle {
2333 key_path: "input.key".to_string(),
2334 window_steps: 3,
2335 next: "throttle_a".to_string(),
2336 on_throttled: Some("end_throttled".to_string()),
2337 },
2338 },
2339 Node {
2340 id: "end_throttled".to_string(),
2341 kind: NodeKind::End,
2342 },
2343 ],
2344 }
2345 }
2346
2347 fn extended_nodes_workflow() -> WorkflowDefinition {
2348 WorkflowDefinition {
2349 version: "v0".to_string(),
2350 name: "extended-nodes".to_string(),
2351 nodes: vec![
2352 Node {
2353 id: "start".to_string(),
2354 kind: NodeKind::Start {
2355 next: "event".to_string(),
2356 },
2357 },
2358 Node {
2359 id: "event".to_string(),
2360 kind: NodeKind::EventTrigger {
2361 event: "webhook".to_string(),
2362 event_path: "input.event_type".to_string(),
2363 next: "cache_write".to_string(),
2364 on_mismatch: Some("end_mismatch".to_string()),
2365 },
2366 },
2367 Node {
2368 id: "cache_write".to_string(),
2369 kind: NodeKind::CacheWrite {
2370 key_path: "input.cache_key".to_string(),
2371 value_path: "input.payload".to_string(),
2372 next: "cache_read".to_string(),
2373 },
2374 },
2375 Node {
2376 id: "cache_read".to_string(),
2377 kind: NodeKind::CacheRead {
2378 key_path: "input.cache_key".to_string(),
2379 next: "router".to_string(),
2380 on_miss: Some("end_miss".to_string()),
2381 },
2382 },
2383 Node {
2384 id: "router".to_string(),
2385 kind: NodeKind::Router {
2386 routes: vec![crate::ir::RouterRoute {
2387 when: "input.mode == 'manual'".to_string(),
2388 next: "human".to_string(),
2389 }],
2390 default: "transform".to_string(),
2391 },
2392 },
2393 Node {
2394 id: "human".to_string(),
2395 kind: NodeKind::HumanInTheLoop {
2396 decision_path: "input.approval".to_string(),
2397 response_path: Some("input.review_notes".to_string()),
2398 on_approve: "transform".to_string(),
2399 on_reject: "end_rejected".to_string(),
2400 },
2401 },
2402 Node {
2403 id: "transform".to_string(),
2404 kind: NodeKind::Transform {
2405 expression: "node_outputs.cache_read.value".to_string(),
2406 next: "end".to_string(),
2407 },
2408 },
2409 Node {
2410 id: "end".to_string(),
2411 kind: NodeKind::End,
2412 },
2413 Node {
2414 id: "end_mismatch".to_string(),
2415 kind: NodeKind::End,
2416 },
2417 Node {
2418 id: "end_miss".to_string(),
2419 kind: NodeKind::End,
2420 },
2421 Node {
2422 id: "end_rejected".to_string(),
2423 kind: NodeKind::End,
2424 },
2425 ],
2426 }
2427 }
2428
2429 fn retry_compensate_workflow(primary_tool: &str) -> WorkflowDefinition {
2430 WorkflowDefinition {
2431 version: "v0".to_string(),
2432 name: "retry-compensate".to_string(),
2433 nodes: vec![
2434 Node {
2435 id: "start".to_string(),
2436 kind: NodeKind::Start {
2437 next: "retry_comp".to_string(),
2438 },
2439 },
2440 Node {
2441 id: "retry_comp".to_string(),
2442 kind: NodeKind::RetryCompensate {
2443 tool: primary_tool.to_string(),
2444 input: json!({"job": "run"}),
2445 max_retries: 1,
2446 compensate_tool: "compensate".to_string(),
2447 compensate_input: json!({"job": "rollback"}),
2448 next: "end".to_string(),
2449 on_compensated: Some("end_compensated".to_string()),
2450 },
2451 },
2452 Node {
2453 id: "end".to_string(),
2454 kind: NodeKind::End,
2455 },
2456 Node {
2457 id: "end_compensated".to_string(),
2458 kind: NodeKind::End,
2459 },
2460 ],
2461 }
2462 }
2463
2464 #[tokio::test]
2465 async fn executes_happy_path_linear_flow() {
2466 let llm = MockLlmExecutor {
2467 output: "ok".to_string(),
2468 };
2469 let tools = MockToolExecutor {
2470 output: json!({"status": "done"}),
2471 fail: false,
2472 };
2473 let runtime = WorkflowRuntime::new(
2474 linear_workflow(),
2475 &llm,
2476 Some(&tools),
2477 WorkflowRuntimeOptions::default(),
2478 );
2479
2480 let result = runtime
2481 .execute(json!({"request_id": "r1"}), None)
2482 .await
2483 .expect("linear workflow should succeed");
2484
2485 assert_eq!(result.workflow_name, "linear");
2486 assert_eq!(result.terminal_node_id, "end");
2487 assert_eq!(result.node_executions.len(), 4);
2488 assert_eq!(
2489 result.node_outputs.get("llm"),
2490 Some(&Value::String("ok".to_string()))
2491 );
2492 assert_eq!(
2493 result.node_outputs.get("tool"),
2494 Some(&json!({"status": "done"}))
2495 );
2496 assert_eq!(result.events.len(), 8);
2497 assert!(result.retry_events.is_empty());
2498 assert!(result.trace.is_some());
2499 assert_eq!(result.replay_report, None);
2500 }
2501
2502 #[tokio::test]
2503 async fn linear_flow_records_expected_node_execution_payloads() {
2504 let llm = MockLlmExecutor {
2505 output: "ok".to_string(),
2506 };
2507 let tools = MockToolExecutor {
2508 output: json!({"status": "done"}),
2509 fail: false,
2510 };
2511 let runtime = WorkflowRuntime::new(
2512 linear_workflow(),
2513 &llm,
2514 Some(&tools),
2515 WorkflowRuntimeOptions::default(),
2516 );
2517
2518 let result = runtime
2519 .execute(json!({"request_id": "r1"}), None)
2520 .await
2521 .expect("linear workflow should succeed");
2522
2523 assert!(matches!(
2524 result.node_executions[0].data,
2525 NodeExecutionData::Start { ref next } if next == "llm"
2526 ));
2527 assert!(matches!(
2528 result.node_executions[1].data,
2529 NodeExecutionData::Llm {
2530 ref model,
2531 ref output,
2532 ref next
2533 } if model == "gpt-4" && output == "ok" && next == "tool"
2534 ));
2535 assert!(matches!(
2536 result.node_executions[2].data,
2537 NodeExecutionData::Tool {
2538 ref tool,
2539 ref next,
2540 ..
2541 } if tool == "extract" && next == "end"
2542 ));
2543 }
2544
2545 #[tokio::test]
2546 async fn executes_conditional_branching() {
2547 let workflow = WorkflowDefinition {
2548 version: "v0".to_string(),
2549 name: "conditional".to_string(),
2550 nodes: vec![
2551 Node {
2552 id: "start".to_string(),
2553 kind: NodeKind::Start {
2554 next: "condition".to_string(),
2555 },
2556 },
2557 Node {
2558 id: "condition".to_string(),
2559 kind: NodeKind::Condition {
2560 expression: "input.approved".to_string(),
2561 on_true: "end_true".to_string(),
2562 on_false: "end_false".to_string(),
2563 },
2564 },
2565 Node {
2566 id: "end_true".to_string(),
2567 kind: NodeKind::End,
2568 },
2569 Node {
2570 id: "end_false".to_string(),
2571 kind: NodeKind::End,
2572 },
2573 ],
2574 };
2575
2576 let llm = MockLlmExecutor {
2577 output: "unused".to_string(),
2578 };
2579 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
2580
2581 let result = runtime
2582 .execute(json!({"approved": true}), None)
2583 .await
2584 .expect("conditional workflow should succeed");
2585
2586 assert_eq!(result.terminal_node_id, "end_true");
2587 }
2588
2589 #[tokio::test]
2590 async fn executes_conditional_false_branch() {
2591 let workflow = WorkflowDefinition {
2592 version: "v0".to_string(),
2593 name: "conditional-false".to_string(),
2594 nodes: vec![
2595 Node {
2596 id: "start".to_string(),
2597 kind: NodeKind::Start {
2598 next: "condition".to_string(),
2599 },
2600 },
2601 Node {
2602 id: "condition".to_string(),
2603 kind: NodeKind::Condition {
2604 expression: "input.approved".to_string(),
2605 on_true: "end_true".to_string(),
2606 on_false: "end_false".to_string(),
2607 },
2608 },
2609 Node {
2610 id: "end_true".to_string(),
2611 kind: NodeKind::End,
2612 },
2613 Node {
2614 id: "end_false".to_string(),
2615 kind: NodeKind::End,
2616 },
2617 ],
2618 };
2619 let llm = MockLlmExecutor {
2620 output: "unused".to_string(),
2621 };
2622 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
2623
2624 let result = runtime
2625 .execute(json!({"approved": false}), None)
2626 .await
2627 .expect("conditional workflow should take false branch");
2628
2629 assert_eq!(result.terminal_node_id, "end_false");
2630 }
2631
2632 #[tokio::test]
2633 async fn fails_when_tool_executor_is_missing() {
2634 let llm = MockLlmExecutor {
2635 output: "ok".to_string(),
2636 };
2637 let runtime = WorkflowRuntime::new(
2638 linear_workflow(),
2639 &llm,
2640 None,
2641 WorkflowRuntimeOptions::default(),
2642 );
2643
2644 let error = runtime
2645 .execute(json!({}), None)
2646 .await
2647 .expect_err("workflow should fail without tool executor");
2648
2649 assert!(matches!(
2650 error,
2651 WorkflowRuntimeError::MissingToolExecutor { node_id } if node_id == "tool"
2652 ));
2653 }
2654
2655 #[tokio::test]
2656 async fn fails_on_tool_execution_error() {
2657 let llm = MockLlmExecutor {
2658 output: "ok".to_string(),
2659 };
2660 let tools = MockToolExecutor {
2661 output: json!({"status": "unused"}),
2662 fail: true,
2663 };
2664 let runtime = WorkflowRuntime::new(
2665 linear_workflow(),
2666 &llm,
2667 Some(&tools),
2668 WorkflowRuntimeOptions::default(),
2669 );
2670
2671 let error = runtime
2672 .execute(json!({}), None)
2673 .await
2674 .expect_err("workflow should fail on tool error");
2675
2676 assert!(matches!(
2677 error,
2678 WorkflowRuntimeError::ToolRetryExhausted { node_id, attempts: 1, .. }
2679 if node_id == "tool"
2680 ));
2681 }
2682
2683 #[tokio::test]
2684 async fn retries_llm_after_transient_failure() {
2685 let llm = SequencedLlmExecutor {
2686 responses: Mutex::new(vec![
2687 Err(LlmExecutionError::Client("temporary".to_string())),
2688 Ok(LlmExecutionOutput {
2689 content: "recovered".to_string(),
2690 }),
2691 ]),
2692 calls: AtomicUsize::new(0),
2693 };
2694 let runtime = WorkflowRuntime::new(
2695 llm_only_workflow(),
2696 &llm,
2697 None,
2698 WorkflowRuntimeOptions {
2699 llm_node_policy: NodeExecutionPolicy {
2700 timeout: None,
2701 max_retries: 1,
2702 },
2703 ..WorkflowRuntimeOptions::default()
2704 },
2705 );
2706
2707 let result = runtime
2708 .execute(json!({"request_id": "r2"}), None)
2709 .await
2710 .expect("llm retry should recover");
2711
2712 assert_eq!(result.terminal_node_id, "end");
2713 assert_eq!(result.node_outputs.get("llm"), Some(&json!("recovered")));
2714 assert_eq!(result.retry_events.len(), 1);
2715 assert_eq!(result.retry_events[0].operation, "llm");
2716 assert_eq!(llm.calls.load(Ordering::Relaxed), 2);
2717 }
2718
2719 #[tokio::test]
2720 async fn times_out_tool_execution_per_policy() {
2721 let llm = MockLlmExecutor {
2722 output: "ok".to_string(),
2723 };
2724 let tool = SlowToolExecutor {
2725 delay: Duration::from_millis(50),
2726 };
2727 let runtime = WorkflowRuntime::new(
2728 linear_workflow(),
2729 &llm,
2730 Some(&tool),
2731 WorkflowRuntimeOptions {
2732 tool_node_policy: NodeExecutionPolicy {
2733 timeout: Some(Duration::from_millis(5)),
2734 max_retries: 0,
2735 },
2736 ..WorkflowRuntimeOptions::default()
2737 },
2738 );
2739
2740 let error = runtime
2741 .execute(json!({}), None)
2742 .await
2743 .expect_err("tool execution should time out");
2744
2745 assert!(matches!(
2746 error,
2747 WorkflowRuntimeError::ToolTimeout {
2748 node_id,
2749 timeout_ms: 5,
2750 attempts: 1,
2751 } if node_id == "tool"
2752 ));
2753 }
2754
2755 #[tokio::test]
2756 async fn cancels_between_retry_attempts() {
2757 let cancel_flag = Arc::new(AtomicBool::new(false));
2758 let llm = CancellingLlmExecutor {
2759 cancel_flag: Arc::clone(&cancel_flag),
2760 calls: AtomicUsize::new(0),
2761 };
2762 let runtime = WorkflowRuntime::new(
2763 llm_only_workflow(),
2764 &llm,
2765 None,
2766 WorkflowRuntimeOptions {
2767 llm_node_policy: NodeExecutionPolicy {
2768 timeout: None,
2769 max_retries: 3,
2770 },
2771 ..WorkflowRuntimeOptions::default()
2772 },
2773 );
2774
2775 let error = runtime
2776 .execute(json!({}), Some(cancel_flag.as_ref()))
2777 .await
2778 .expect_err("workflow should stop when cancellation is observed");
2779
2780 assert!(matches!(error, WorkflowRuntimeError::Cancelled));
2781 assert_eq!(llm.calls.load(Ordering::Relaxed), 1);
2782 }
2783
2784 #[tokio::test]
2785 async fn enforces_step_limit_guard() {
2786 let workflow = WorkflowDefinition {
2787 version: "v0".to_string(),
2788 name: "loop".to_string(),
2789 nodes: vec![
2790 Node {
2791 id: "start".to_string(),
2792 kind: NodeKind::Start {
2793 next: "condition".to_string(),
2794 },
2795 },
2796 Node {
2797 id: "condition".to_string(),
2798 kind: NodeKind::Condition {
2799 expression: "true".to_string(),
2800 on_true: "condition".to_string(),
2801 on_false: "end".to_string(),
2802 },
2803 },
2804 Node {
2805 id: "end".to_string(),
2806 kind: NodeKind::End,
2807 },
2808 ],
2809 };
2810
2811 let llm = MockLlmExecutor {
2812 output: "unused".to_string(),
2813 };
2814 let runtime = WorkflowRuntime::new(
2815 workflow,
2816 &llm,
2817 None,
2818 WorkflowRuntimeOptions {
2819 max_steps: 3,
2820 ..WorkflowRuntimeOptions::default()
2821 },
2822 );
2823
2824 let error = runtime
2825 .execute(json!({}), None)
2826 .await
2827 .expect_err("workflow should fail on step limit");
2828
2829 assert!(matches!(
2830 error,
2831 WorkflowRuntimeError::StepLimitExceeded { max_steps: 3 }
2832 ));
2833 }
2834
2835 #[tokio::test]
2836 async fn validates_recorded_trace_in_replay_mode() {
2837 let llm = MockLlmExecutor {
2838 output: "ok".to_string(),
2839 };
2840 let tools = MockToolExecutor {
2841 output: json!({"status": "done"}),
2842 fail: false,
2843 };
2844 let runtime = WorkflowRuntime::new(
2845 linear_workflow(),
2846 &llm,
2847 Some(&tools),
2848 WorkflowRuntimeOptions {
2849 replay_mode: WorkflowReplayMode::ValidateRecordedTrace,
2850 ..WorkflowRuntimeOptions::default()
2851 },
2852 );
2853
2854 let result = runtime
2855 .execute(json!({"request_id": "r1"}), None)
2856 .await
2857 .expect("replay validation should pass");
2858
2859 assert!(result.trace.is_some());
2860 assert_eq!(
2861 result.replay_report.as_ref().map(|r| r.total_events),
2862 Some(9)
2863 );
2864 }
2865
2866 #[tokio::test]
2867 async fn executes_loop_until_condition_fails() {
2868 let llm = MockLlmExecutor {
2869 output: "unused".to_string(),
2870 };
2871 let counter = IncrementingToolExecutor {
2872 value: AtomicUsize::new(0),
2873 };
2874 let runtime = WorkflowRuntime::new(
2875 loop_workflow(Some(8)),
2876 &llm,
2877 Some(&counter),
2878 WorkflowRuntimeOptions::default(),
2879 );
2880
2881 let result = runtime
2882 .execute(json!({}), None)
2883 .await
2884 .expect("loop workflow should terminate at end");
2885
2886 assert_eq!(result.terminal_node_id, "end");
2887 assert!(result.node_executions.iter().any(|step| matches!(
2888 step.data,
2889 NodeExecutionData::Loop {
2890 evaluated: false,
2891 ..
2892 }
2893 )));
2894 }
2895
2896 #[tokio::test]
2897 async fn fails_when_loop_exceeds_max_iterations() {
2898 let llm = MockLlmExecutor {
2899 output: "unused".to_string(),
2900 };
2901 let counter = MockToolExecutor {
2902 output: json!(0),
2903 fail: false,
2904 };
2905 let runtime = WorkflowRuntime::new(
2906 loop_workflow(Some(2)),
2907 &llm,
2908 Some(&counter),
2909 WorkflowRuntimeOptions {
2910 max_steps: 20,
2911 ..WorkflowRuntimeOptions::default()
2912 },
2913 );
2914
2915 let error = runtime
2916 .execute(json!({}), None)
2917 .await
2918 .expect_err("loop should fail once max iterations are exceeded");
2919
2920 assert!(matches!(
2921 error,
2922 WorkflowRuntimeError::LoopIterationLimitExceeded {
2923 node_id,
2924 max_iterations: 2
2925 } if node_id == "loop"
2926 ));
2927 }
2928
2929 #[tokio::test]
2930 async fn executes_parallel_then_merge_all() {
2931 let llm = MockLlmExecutor {
2932 output: "unused".to_string(),
2933 };
2934 let tool = EchoInputToolExecutor;
2935 let runtime = WorkflowRuntime::new(
2936 parallel_merge_workflow(MergePolicy::All, None),
2937 &llm,
2938 Some(&tool),
2939 WorkflowRuntimeOptions::default(),
2940 );
2941
2942 let result = runtime
2943 .execute(json!({}), None)
2944 .await
2945 .expect("parallel merge workflow should succeed");
2946
2947 assert_eq!(result.terminal_node_id, "end");
2948 assert_eq!(
2949 result.node_outputs.get("tool_a"),
2950 Some(&json!({"value": 1}))
2951 );
2952 assert_eq!(
2953 result.node_outputs.get("tool_b"),
2954 Some(&json!({"value": 2}))
2955 );
2956 assert_eq!(
2957 result.node_outputs.get("merge"),
2958 Some(&json!([{"value": 1}, {"value": 2}]))
2959 );
2960 }
2961
2962 #[tokio::test]
2963 async fn executes_map_reduce_sum() {
2964 let llm = MockLlmExecutor {
2965 output: "unused".to_string(),
2966 };
2967 let tool = EchoInputToolExecutor;
2968 let runtime = WorkflowRuntime::new(
2969 map_reduce_workflow(ReduceOperation::Sum),
2970 &llm,
2971 Some(&tool),
2972 WorkflowRuntimeOptions::default(),
2973 );
2974
2975 let result = runtime
2976 .execute(json!({"values": [1, 2, 3]}), None)
2977 .await
2978 .expect("map reduce workflow should succeed");
2979
2980 assert_eq!(result.node_outputs.get("map"), Some(&json!([1, 2, 3])));
2981 assert_eq!(result.node_outputs.get("reduce"), Some(&json!(6.0)));
2982 }
2983
2984 #[tokio::test]
2985 async fn fails_map_when_items_path_is_not_array() {
2986 let llm = MockLlmExecutor {
2987 output: "unused".to_string(),
2988 };
2989 let tool = EchoInputToolExecutor;
2990 let runtime = WorkflowRuntime::new(
2991 map_reduce_workflow(ReduceOperation::Count),
2992 &llm,
2993 Some(&tool),
2994 WorkflowRuntimeOptions::default(),
2995 );
2996
2997 let error = runtime
2998 .execute(json!({"values": {"not": "array"}}), None)
2999 .await
3000 .expect_err("map node should fail on non-array path");
3001
3002 assert!(matches!(
3003 error,
3004 WorkflowRuntimeError::MapItemsNotArray {
3005 node_id,
3006 items_path
3007 } if node_id == "map" && items_path == "input.values"
3008 ));
3009 }
3010
3011 #[tokio::test]
3012 async fn executes_subgraph_via_registry() {
3013 let llm = MockLlmExecutor {
3014 output: "nested-ok".to_string(),
3015 };
3016 let subgraph = WorkflowDefinition {
3017 version: "v0".to_string(),
3018 name: "child".to_string(),
3019 nodes: vec![
3020 Node {
3021 id: "start".to_string(),
3022 kind: NodeKind::Start {
3023 next: "llm".to_string(),
3024 },
3025 },
3026 Node {
3027 id: "llm".to_string(),
3028 kind: NodeKind::Llm {
3029 model: "gpt-4".to_string(),
3030 prompt: "child".to_string(),
3031 next: Some("end".to_string()),
3032 },
3033 },
3034 Node {
3035 id: "end".to_string(),
3036 kind: NodeKind::End,
3037 },
3038 ],
3039 };
3040 let parent = WorkflowDefinition {
3041 version: "v0".to_string(),
3042 name: "parent".to_string(),
3043 nodes: vec![
3044 Node {
3045 id: "start".to_string(),
3046 kind: NodeKind::Start {
3047 next: "sub".to_string(),
3048 },
3049 },
3050 Node {
3051 id: "sub".to_string(),
3052 kind: NodeKind::Subgraph {
3053 graph: "child_graph".to_string(),
3054 next: Some("end".to_string()),
3055 },
3056 },
3057 Node {
3058 id: "end".to_string(),
3059 kind: NodeKind::End,
3060 },
3061 ],
3062 };
3063
3064 let mut registry = BTreeMap::new();
3065 registry.insert("child_graph".to_string(), subgraph);
3066 let runtime = WorkflowRuntime::new(
3067 parent,
3068 &llm,
3069 None,
3070 WorkflowRuntimeOptions {
3071 subgraph_registry: registry,
3072 ..WorkflowRuntimeOptions::default()
3073 },
3074 );
3075
3076 let result = runtime
3077 .execute(json!({"approved": true}), None)
3078 .await
3079 .expect("subgraph workflow should execute");
3080
3081 assert_eq!(result.terminal_node_id, "end");
3082 assert!(matches!(
3083 result.node_executions[1].data,
3084 NodeExecutionData::Subgraph { .. }
3085 ));
3086 }
3087
3088 #[tokio::test]
3089 async fn executes_batch_and_filter_nodes() {
3090 let llm = MockLlmExecutor {
3091 output: "unused".to_string(),
3092 };
3093 let workflow = WorkflowDefinition {
3094 version: "v0".to_string(),
3095 name: "batch-filter".to_string(),
3096 nodes: vec![
3097 Node {
3098 id: "start".to_string(),
3099 kind: NodeKind::Start {
3100 next: "batch".to_string(),
3101 },
3102 },
3103 Node {
3104 id: "batch".to_string(),
3105 kind: NodeKind::Batch {
3106 items_path: "input.items".to_string(),
3107 next: "filter".to_string(),
3108 },
3109 },
3110 Node {
3111 id: "filter".to_string(),
3112 kind: NodeKind::Filter {
3113 items_path: "node_outputs.batch".to_string(),
3114 expression: "item.keep == true".to_string(),
3115 next: "end".to_string(),
3116 },
3117 },
3118 Node {
3119 id: "end".to_string(),
3120 kind: NodeKind::End,
3121 },
3122 ],
3123 };
3124
3125 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3126 let result = runtime
3127 .execute(
3128 json!({
3129 "items": [
3130 {"id": 1, "keep": true},
3131 {"id": 2, "keep": false},
3132 {"id": 3, "keep": true}
3133 ]
3134 }),
3135 None,
3136 )
3137 .await
3138 .expect("batch/filter workflow should execute");
3139
3140 assert_eq!(
3141 result
3142 .node_outputs
3143 .get("batch")
3144 .and_then(Value::as_array)
3145 .map(Vec::len),
3146 Some(3)
3147 );
3148 assert_eq!(
3149 result
3150 .node_outputs
3151 .get("filter")
3152 .and_then(Value::as_array)
3153 .map(Vec::len),
3154 Some(2)
3155 );
3156 }
3157
3158 #[tokio::test]
3159 async fn executes_debounce_and_throttle_nodes() {
3160 let llm = MockLlmExecutor {
3161 output: "unused".to_string(),
3162 };
3163 let runtime = WorkflowRuntime::new(
3164 debounce_and_throttle_workflow(),
3165 &llm,
3166 None,
3167 WorkflowRuntimeOptions::default(),
3168 );
3169
3170 let result = runtime
3171 .execute(json!({"key": "k1"}), None)
3172 .await
3173 .expect("debounce/throttle workflow should execute");
3174
3175 assert_eq!(result.terminal_node_id, "end_throttled");
3176 assert_eq!(
3177 result.node_outputs.get("debounce_a"),
3178 Some(&json!({"key": "k1", "suppressed": true}))
3179 );
3180 assert_eq!(
3181 result.node_outputs.get("throttle_a"),
3182 Some(&json!({"key": "k1", "throttled": true}))
3183 );
3184 }
3185
3186 #[tokio::test]
3187 async fn executes_retry_compensate_successfully_without_compensation() {
3188 let llm = MockLlmExecutor {
3189 output: "unused".to_string(),
3190 };
3191 let tools = RetryCompensateToolExecutor {
3192 attempts: AtomicUsize::new(0),
3193 };
3194 let runtime = WorkflowRuntime::new(
3195 retry_compensate_workflow("unstable_primary"),
3196 &llm,
3197 Some(&tools),
3198 WorkflowRuntimeOptions::default(),
3199 );
3200
3201 let result = runtime
3202 .execute(json!({}), None)
3203 .await
3204 .expect("retry_compensate should recover by retry");
3205
3206 assert_eq!(result.terminal_node_id, "end");
3207 assert_eq!(tools.attempts.load(Ordering::Relaxed), 2);
3208 assert_eq!(result.retry_events.len(), 1);
3209 }
3210
3211 #[tokio::test]
3212 async fn executes_retry_compensate_with_compensation_route() {
3213 let llm = MockLlmExecutor {
3214 output: "unused".to_string(),
3215 };
3216 let tools = RetryCompensateToolExecutor {
3217 attempts: AtomicUsize::new(0),
3218 };
3219 let runtime = WorkflowRuntime::new(
3220 retry_compensate_workflow("always_fail"),
3221 &llm,
3222 Some(&tools),
3223 WorkflowRuntimeOptions::default(),
3224 );
3225
3226 let result = runtime
3227 .execute(json!({}), None)
3228 .await
3229 .expect("retry_compensate should use compensation");
3230
3231 assert_eq!(result.terminal_node_id, "end_compensated");
3232 assert_eq!(result.retry_events.len(), 1);
3233 assert_eq!(
3234 result
3235 .node_outputs
3236 .get("retry_comp")
3237 .and_then(|value| value.get("status")),
3238 Some(&json!("compensated"))
3239 );
3240 }
3241
3242 #[tokio::test]
3243 async fn executes_event_cache_router_human_transform_nodes() {
3244 let llm = MockLlmExecutor {
3245 output: "unused".to_string(),
3246 };
3247 let runtime = WorkflowRuntime::new(
3248 extended_nodes_workflow(),
3249 &llm,
3250 None,
3251 WorkflowRuntimeOptions::default(),
3252 );
3253
3254 let result = runtime
3255 .execute(
3256 json!({
3257 "event_type": "webhook",
3258 "cache_key": "customer-1",
3259 "payload": {"value": 99},
3260 "mode": "manual",
3261 "approval": "approve",
3262 "review_notes": {"editor": "ops"}
3263 }),
3264 None,
3265 )
3266 .await
3267 .expect("extended nodes workflow should execute");
3268
3269 assert_eq!(result.terminal_node_id, "end");
3270 assert_eq!(
3271 result.node_outputs.get("cache_read"),
3272 Some(&json!({"key": "customer-1", "hit": true, "value": {"value": 99}}))
3273 );
3274 assert_eq!(
3275 result.node_outputs.get("router"),
3276 Some(&json!({"selected": "human"}))
3277 );
3278 assert_eq!(
3279 result.node_outputs.get("transform"),
3280 Some(&json!({"value": 99}))
3281 );
3282 }
3283
3284 #[tokio::test]
3285 async fn routes_event_trigger_mismatch_to_fallback() {
3286 let llm = MockLlmExecutor {
3287 output: "unused".to_string(),
3288 };
3289 let runtime = WorkflowRuntime::new(
3290 extended_nodes_workflow(),
3291 &llm,
3292 None,
3293 WorkflowRuntimeOptions::default(),
3294 );
3295
3296 let result = runtime
3297 .execute(
3298 json!({
3299 "event_type": "cron",
3300 "cache_key": "customer-1",
3301 "payload": {"value": 99},
3302 "mode": "manual",
3303 "approval": "approve"
3304 }),
3305 None,
3306 )
3307 .await
3308 .expect("event mismatch should route to fallback");
3309
3310 assert_eq!(result.terminal_node_id, "end_mismatch");
3311 }
3312
3313 #[tokio::test]
3314 async fn cache_read_routes_to_on_miss_when_value_absent() {
3315 let llm = MockLlmExecutor {
3316 output: "unused".to_string(),
3317 };
3318 let workflow = WorkflowDefinition {
3319 version: "v0".to_string(),
3320 name: "cache-read-miss".to_string(),
3321 nodes: vec![
3322 Node {
3323 id: "start".to_string(),
3324 kind: NodeKind::Start {
3325 next: "cache_read".to_string(),
3326 },
3327 },
3328 Node {
3329 id: "cache_read".to_string(),
3330 kind: NodeKind::CacheRead {
3331 key_path: "input.cache_key".to_string(),
3332 next: "end_hit".to_string(),
3333 on_miss: Some("end_miss".to_string()),
3334 },
3335 },
3336 Node {
3337 id: "end_hit".to_string(),
3338 kind: NodeKind::End,
3339 },
3340 Node {
3341 id: "end_miss".to_string(),
3342 kind: NodeKind::End,
3343 },
3344 ],
3345 };
3346 let runtime = WorkflowRuntime::new(workflow, &llm, None, WorkflowRuntimeOptions::default());
3347
3348 let result = runtime
3349 .execute(json!({"cache_key": "new-customer"}), None)
3350 .await
3351 .expect("cache read miss should still execute via on_miss route");
3352
3353 assert_eq!(
3354 result.node_outputs.get("cache_read"),
3355 Some(&json!({"key": "new-customer", "hit": false, "value": null}))
3356 );
3357 assert_eq!(result.terminal_node_id, "end_miss");
3358 }
3359
3360 #[tokio::test]
3361 async fn rejects_condition_when_expression_scope_exceeds_limit() {
3362 let llm = MockLlmExecutor {
3363 output: "unused".to_string(),
3364 };
3365 let workflow = WorkflowDefinition {
3366 version: "v0".to_string(),
3367 name: "scope-limit".to_string(),
3368 nodes: vec![
3369 Node {
3370 id: "start".to_string(),
3371 kind: NodeKind::Start {
3372 next: "condition".to_string(),
3373 },
3374 },
3375 Node {
3376 id: "condition".to_string(),
3377 kind: NodeKind::Condition {
3378 expression: "input.flag == true".to_string(),
3379 on_true: "end".to_string(),
3380 on_false: "end".to_string(),
3381 },
3382 },
3383 Node {
3384 id: "end".to_string(),
3385 kind: NodeKind::End,
3386 },
3387 ],
3388 };
3389
3390 let runtime = WorkflowRuntime::new(
3391 workflow,
3392 &llm,
3393 None,
3394 WorkflowRuntimeOptions {
3395 security_limits: RuntimeSecurityLimits {
3396 max_expression_scope_bytes: 32,
3397 ..RuntimeSecurityLimits::default()
3398 },
3399 ..WorkflowRuntimeOptions::default()
3400 },
3401 );
3402
3403 let error = runtime
3404 .execute(
3405 json!({"flag": true, "payload": "this-is-too-large-for-limit"}),
3406 None,
3407 )
3408 .await
3409 .expect_err("condition should fail when scope budget is exceeded");
3410 assert!(matches!(
3411 error,
3412 WorkflowRuntimeError::ExpressionScopeLimitExceeded { node_id, .. }
3413 if node_id == "condition"
3414 ));
3415 }
3416
3417 #[tokio::test]
3418 async fn rejects_map_when_item_count_exceeds_limit() {
3419 let llm = MockLlmExecutor {
3420 output: "unused".to_string(),
3421 };
3422 let tool = EchoInputToolExecutor;
3423 let runtime = WorkflowRuntime::new(
3424 map_reduce_workflow(ReduceOperation::Count),
3425 &llm,
3426 Some(&tool),
3427 WorkflowRuntimeOptions {
3428 security_limits: RuntimeSecurityLimits {
3429 max_map_items: 2,
3430 ..RuntimeSecurityLimits::default()
3431 },
3432 ..WorkflowRuntimeOptions::default()
3433 },
3434 );
3435
3436 let error = runtime
3437 .execute(json!({"values": [1, 2, 3]}), None)
3438 .await
3439 .expect_err("map should fail when item guard is exceeded");
3440 assert!(matches!(
3441 error,
3442 WorkflowRuntimeError::MapItemLimitExceeded {
3443 node_id,
3444 actual_items: 3,
3445 max_items: 2,
3446 } if node_id == "map"
3447 ));
3448 }
3449
3450 #[tokio::test]
3451 async fn resumes_from_checkpoint() {
3452 let llm = MockLlmExecutor {
3453 output: "unused".to_string(),
3454 };
3455 let tool = MockToolExecutor {
3456 output: json!({"ok": true}),
3457 fail: false,
3458 };
3459 let runtime = WorkflowRuntime::new(
3460 linear_workflow(),
3461 &llm,
3462 Some(&tool),
3463 WorkflowRuntimeOptions::default(),
3464 );
3465
3466 let checkpoint = WorkflowCheckpoint {
3467 run_id: "run-1".to_string(),
3468 workflow_name: "linear".to_string(),
3469 step: 2,
3470 next_node_id: "tool".to_string(),
3471 scope_snapshot: json!({"input": {"request_id": "r-resume"}}),
3472 };
3473
3474 let resumed = runtime
3475 .execute_resume_from_failure(&checkpoint, None)
3476 .await
3477 .expect("resume should continue from checkpoint node");
3478 assert_eq!(resumed.terminal_node_id, "end");
3479 assert_eq!(resumed.node_executions[0].node_id, "tool");
3480 }
3481
3482 #[test]
3483 fn scope_capabilities_enforce_read_write_boundaries() {
3484 let mut scope = RuntimeScope::new(json!({"k": "v"}));
3485
3486 let read_error = scope
3487 .scoped_input(ScopeCapability::LlmWrite)
3488 .expect_err("write capability should not read scope");
3489 assert!(matches!(read_error, ScopeAccessError::ReadDenied { .. }));
3490
3491 let write_error = scope
3492 .record_tool_output("tool", json!({"ok": true}), ScopeCapability::LlmWrite)
3493 .expect_err("llm write capability should not write tool output");
3494 assert!(matches!(write_error, ScopeAccessError::WriteDenied { .. }));
3495 }
3496}