1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5
6use crate::config::StepConfig;
7use crate::config::manager::ConfigManager;
8use crate::control_flow::ControlFlow;
9use crate::engine::context::Context;
10use crate::error::StepError;
11use crate::workflow::schema::{ScopeDef, StepDef, StepType};
12
13use super::{
14 agent::AgentExecutor, cmd::CmdExecutor, gate::GateExecutor, repeat::RepeatExecutor,
15 chat::ChatExecutor, CmdOutput, IterationOutput, SandboxAwareExecutor, ScopeOutput,
16 SharedSandbox, StepExecutor, StepOutput,
17};
18
19pub struct CallExecutor {
20 scopes: HashMap<String, ScopeDef>,
21 sandbox: SharedSandbox,
22 config_manager: Option<Arc<ConfigManager>>,
23}
24
25impl CallExecutor {
26 pub fn new(scopes: &HashMap<String, ScopeDef>, sandbox: SharedSandbox) -> Self {
27 Self {
28 scopes: scopes.clone(),
29 sandbox,
30 config_manager: None,
31 }
32 }
33
34 pub fn with_config_manager(mut self, cm: Option<Arc<ConfigManager>>) -> Self {
35 self.config_manager = cm;
36 self
37 }
38}
39
40#[async_trait]
41impl StepExecutor for CallExecutor {
42 async fn execute(
43 &self,
44 step: &StepDef,
45 _config: &StepConfig,
46 ctx: &Context,
47 ) -> Result<StepOutput, StepError> {
48 let scope_name = step
49 .scope
50 .as_ref()
51 .ok_or_else(|| StepError::Fail("call step missing 'scope' field".into()))?;
52
53 let scope = self
54 .scopes
55 .get(scope_name)
56 .ok_or_else(|| StepError::Fail(format!("scope '{}' not found", scope_name)))?
57 .clone();
58
59 let mut child_ctx = Context::new(
60 ctx.get_var("target")
61 .and_then(|v| v.as_str())
62 .unwrap_or("")
63 .to_string(),
64 HashMap::new(),
65 );
66
67 let mut last_output = StepOutput::Empty;
68
69 for scope_step in &scope.steps {
70 let step_config = resolve_scope_step_config(&self.config_manager, scope_step);
71 let result =
72 dispatch_scope_step_sandboxed(scope_step, &step_config, &child_ctx, &self.scopes, &self.sandbox, &self.config_manager).await;
73
74 match result {
75 Ok(output) => {
76 child_ctx.store(&scope_step.name, output.clone());
77 last_output = output;
78 }
79 Err(StepError::ControlFlow(ControlFlow::Break { value, .. })) => {
80 if let Some(v) = value {
81 last_output = v;
82 }
83 break;
84 }
85 Err(StepError::ControlFlow(ControlFlow::Skip { .. })) => {
86 child_ctx.store(&scope_step.name, StepOutput::Empty);
87 }
88 Err(StepError::ControlFlow(ControlFlow::Next { .. })) => {
89 break;
90 }
91 Err(e) => return Err(e),
92 }
93 }
94
95 let final_output = if let Some(outputs_template) = &scope.outputs {
97 match child_ctx.render_template(outputs_template) {
98 Ok(rendered) => StepOutput::Cmd(CmdOutput {
99 stdout: rendered,
100 stderr: String::new(),
101 exit_code: 0,
102 duration: std::time::Duration::ZERO,
103 }),
104 Err(_) => last_output,
105 }
106 } else {
107 last_output
108 };
109
110 Ok(StepOutput::Scope(ScopeOutput {
111 iterations: vec![IterationOutput {
112 index: 0,
113 output: final_output.clone(),
114 }],
115 final_value: Some(Box::new(final_output)),
116 }))
117 }
118}
119
120pub(super) fn resolve_scope_step_config(
123 config_manager: &Option<Arc<ConfigManager>>,
124 step: &StepDef,
125) -> StepConfig {
126 if let Some(cm) = config_manager {
127 cm.resolve(&step.name, &step.step_type, &step.config)
128 } else {
129 let values: HashMap<String, serde_json::Value> = step
131 .config
132 .iter()
133 .map(|(k, v)| (k.clone(), serde_json::to_value(v).unwrap_or(serde_json::Value::Null)))
134 .collect();
135 StepConfig { values }
136 }
137}
138
139pub(super) async fn dispatch_scope_step_sandboxed(
140 step: &StepDef,
141 config: &StepConfig,
142 ctx: &Context,
143 scopes: &HashMap<String, ScopeDef>,
144 sandbox: &SharedSandbox,
145 config_manager: &Option<Arc<ConfigManager>>,
146) -> Result<StepOutput, StepError> {
147 match step.step_type {
148 StepType::Cmd => CmdExecutor.execute_sandboxed(step, config, ctx, sandbox).await,
149 StepType::Agent => AgentExecutor.execute_sandboxed(step, config, ctx, sandbox).await,
150 StepType::Gate => GateExecutor.execute(step, config, ctx).await,
151 StepType::Chat => ChatExecutor.execute(step, config, ctx).await,
152 StepType::Repeat => RepeatExecutor::new(scopes, sandbox.clone())
153 .with_config_manager(config_manager.clone())
154 .execute(step, config, ctx).await,
155 StepType::Call => CallExecutor::new(scopes, sandbox.clone())
156 .with_config_manager(config_manager.clone())
157 .execute(step, config, ctx).await,
158 _ => Err(StepError::Fail(format!(
159 "Step type '{}' not supported in scope",
160 step.step_type
161 ))),
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use std::collections::HashMap;
169 use crate::workflow::schema::{ScopeDef, StepType};
170
171 fn cmd_step(name: &str, run: &str) -> StepDef {
172 StepDef {
173 name: name.to_string(),
174 step_type: StepType::Cmd,
175 run: Some(run.to_string()),
176 prompt: None,
177 condition: None,
178 on_pass: None,
179 on_fail: None,
180 message: None,
181 scope: None,
182 max_iterations: None,
183 initial_value: None,
184 items: None,
185 parallel: None,
186 steps: None,
187 config: HashMap::new(),
188 outputs: None,
189 output_type: None,
190 async_exec: None,
191 }
192 }
193
194 fn call_step(name: &str, scope: &str) -> StepDef {
195 StepDef {
196 name: name.to_string(),
197 step_type: StepType::Call,
198 run: None,
199 prompt: None,
200 condition: None,
201 on_pass: None,
202 on_fail: None,
203 message: None,
204 scope: Some(scope.to_string()),
205 max_iterations: None,
206 initial_value: None,
207 items: None,
208 parallel: None,
209 steps: None,
210 config: HashMap::new(),
211 outputs: None,
212 output_type: None,
213 async_exec: None,
214 }
215 }
216
217 #[tokio::test]
218 async fn call_scope_with_two_steps() {
219 let scope = ScopeDef {
220 steps: vec![
221 cmd_step("step1", "echo first"),
222 cmd_step("step2", "echo second"),
223 ],
224 outputs: None,
225 };
226 let mut scopes = HashMap::new();
227 scopes.insert("my_scope".to_string(), scope);
228
229 let step = call_step("call_test", "my_scope");
230 let executor = CallExecutor::new(&scopes, None);
231 let config = StepConfig::default();
232 let ctx = Context::new(String::new(), HashMap::new());
233
234 let result = executor.execute(&step, &config, &ctx).await.unwrap();
235 assert!(result.text().contains("second"));
237 }
238
239 #[tokio::test]
240 async fn call_with_explicit_outputs() {
241 let scope = ScopeDef {
242 steps: vec![
243 cmd_step("step1", "echo hello"),
244 ],
245 outputs: Some("rendered: {{ steps.step1.stdout }}".to_string()),
246 };
247 let mut scopes = HashMap::new();
248 scopes.insert("output_scope".to_string(), scope);
249
250 let step = call_step("call_out", "output_scope");
251 let executor = CallExecutor::new(&scopes, None);
252 let config = StepConfig::default();
253 let ctx = Context::new(String::new(), HashMap::new());
254
255 let result = executor.execute(&step, &config, &ctx).await.unwrap();
256 assert!(result.text().contains("rendered:"));
257 assert!(result.text().contains("hello"));
258 }
259
260 #[tokio::test]
261 async fn call_missing_scope_error() {
262 let scopes = HashMap::new();
263 let step = call_step("call_bad", "nonexistent");
264 let executor = CallExecutor::new(&scopes, None);
265 let config = StepConfig::default();
266 let ctx = Context::new(String::new(), HashMap::new());
267
268 let result = executor.execute(&step, &config, &ctx).await;
269 assert!(result.is_err());
270 }
271}