Skip to main content

minion_engine/steps/
call.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5
6use crate::config::StepConfig;
7use crate::config::manager::ConfigManager;
8use crate::control_flow::ControlFlow;
9use crate::engine::context::Context;
10use crate::error::StepError;
11use crate::workflow::schema::{ScopeDef, StepDef, StepType};
12
13use super::{
14    agent::AgentExecutor, cmd::CmdExecutor, gate::GateExecutor, repeat::RepeatExecutor,
15    chat::ChatExecutor, CmdOutput, IterationOutput, SandboxAwareExecutor, ScopeOutput,
16    SharedSandbox, StepExecutor, StepOutput,
17};
18
19pub struct CallExecutor {
20    scopes: HashMap<String, ScopeDef>,
21    sandbox: SharedSandbox,
22    config_manager: Option<Arc<ConfigManager>>,
23}
24
25impl CallExecutor {
26    pub fn new(scopes: &HashMap<String, ScopeDef>, sandbox: SharedSandbox) -> Self {
27        Self {
28            scopes: scopes.clone(),
29            sandbox,
30            config_manager: None,
31        }
32    }
33
34    pub fn with_config_manager(mut self, cm: Option<Arc<ConfigManager>>) -> Self {
35        self.config_manager = cm;
36        self
37    }
38}
39
40#[async_trait]
41impl StepExecutor for CallExecutor {
42    async fn execute(
43        &self,
44        step: &StepDef,
45        _config: &StepConfig,
46        ctx: &Context,
47    ) -> Result<StepOutput, StepError> {
48        let scope_name = step
49            .scope
50            .as_ref()
51            .ok_or_else(|| StepError::Fail("call step missing 'scope' field".into()))?;
52
53        let scope = self
54            .scopes
55            .get(scope_name)
56            .ok_or_else(|| StepError::Fail(format!("scope '{}' not found", scope_name)))?
57            .clone();
58
59        let mut child_ctx = Context::new(
60            ctx.get_var("target")
61                .and_then(|v| v.as_str())
62                .unwrap_or("")
63                .to_string(),
64            HashMap::new(),
65        );
66
67        let mut last_output = StepOutput::Empty;
68
69        for scope_step in &scope.steps {
70            let step_config = resolve_scope_step_config(&self.config_manager, scope_step);
71            let result =
72                dispatch_scope_step_sandboxed(scope_step, &step_config, &child_ctx, &self.scopes, &self.sandbox, &self.config_manager).await;
73
74            match result {
75                Ok(output) => {
76                    child_ctx.store(&scope_step.name, output.clone());
77                    last_output = output;
78                }
79                Err(StepError::ControlFlow(ControlFlow::Break { value, .. })) => {
80                    if let Some(v) = value {
81                        last_output = v;
82                    }
83                    break;
84                }
85                Err(StepError::ControlFlow(ControlFlow::Skip { .. })) => {
86                    child_ctx.store(&scope_step.name, StepOutput::Empty);
87                }
88                Err(StepError::ControlFlow(ControlFlow::Next { .. })) => {
89                    break;
90                }
91                Err(e) => return Err(e),
92            }
93        }
94
95        // Use scope outputs if defined, otherwise last step output
96        let final_output = if let Some(outputs_template) = &scope.outputs {
97            match child_ctx.render_template(outputs_template) {
98                Ok(rendered) => StepOutput::Cmd(CmdOutput {
99                    stdout: rendered,
100                    stderr: String::new(),
101                    exit_code: 0,
102                    duration: std::time::Duration::ZERO,
103                }),
104                Err(_) => last_output,
105            }
106        } else {
107            last_output
108        };
109
110        Ok(StepOutput::Scope(ScopeOutput {
111            iterations: vec![IterationOutput {
112                index: 0,
113                output: final_output.clone(),
114            }],
115            final_value: Some(Box::new(final_output)),
116        }))
117    }
118}
119
120/// Resolve config for a scope step using the workflow's ConfigManager (if available).
121/// Falls back to StepConfig::default() when no ConfigManager is provided (e.g. in tests).
122pub(super) fn resolve_scope_step_config(
123    config_manager: &Option<Arc<ConfigManager>>,
124    step: &StepDef,
125) -> StepConfig {
126    if let Some(cm) = config_manager {
127        cm.resolve(&step.name, &step.step_type, &step.config)
128    } else {
129        // Fallback: at least convert step's inline config
130        let values: HashMap<String, serde_json::Value> = step
131            .config
132            .iter()
133            .map(|(k, v)| (k.clone(), serde_json::to_value(v).unwrap_or(serde_json::Value::Null)))
134            .collect();
135        StepConfig { values }
136    }
137}
138
139pub(super) async fn dispatch_scope_step_sandboxed(
140    step: &StepDef,
141    config: &StepConfig,
142    ctx: &Context,
143    scopes: &HashMap<String, ScopeDef>,
144    sandbox: &SharedSandbox,
145    config_manager: &Option<Arc<ConfigManager>>,
146) -> Result<StepOutput, StepError> {
147    match step.step_type {
148        StepType::Cmd => CmdExecutor.execute_sandboxed(step, config, ctx, sandbox).await,
149        StepType::Agent => AgentExecutor.execute_sandboxed(step, config, ctx, sandbox).await,
150        StepType::Gate => GateExecutor.execute(step, config, ctx).await,
151        StepType::Chat => ChatExecutor.execute(step, config, ctx).await,
152        StepType::Repeat => RepeatExecutor::new(scopes, sandbox.clone())
153            .with_config_manager(config_manager.clone())
154            .execute(step, config, ctx).await,
155        StepType::Call => CallExecutor::new(scopes, sandbox.clone())
156            .with_config_manager(config_manager.clone())
157            .execute(step, config, ctx).await,
158        _ => Err(StepError::Fail(format!(
159            "Step type '{}' not supported in scope",
160            step.step_type
161        ))),
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use std::collections::HashMap;
169    use crate::workflow::schema::{ScopeDef, StepType};
170
171    fn cmd_step(name: &str, run: &str) -> StepDef {
172        StepDef {
173            name: name.to_string(),
174            step_type: StepType::Cmd,
175            run: Some(run.to_string()),
176            prompt: None,
177            condition: None,
178            on_pass: None,
179            on_fail: None,
180            message: None,
181            scope: None,
182            max_iterations: None,
183            initial_value: None,
184            items: None,
185            parallel: None,
186            steps: None,
187            config: HashMap::new(),
188            outputs: None,
189            output_type: None,
190            async_exec: None,
191        }
192    }
193
194    fn call_step(name: &str, scope: &str) -> StepDef {
195        StepDef {
196            name: name.to_string(),
197            step_type: StepType::Call,
198            run: None,
199            prompt: None,
200            condition: None,
201            on_pass: None,
202            on_fail: None,
203            message: None,
204            scope: Some(scope.to_string()),
205            max_iterations: None,
206            initial_value: None,
207            items: None,
208            parallel: None,
209            steps: None,
210            config: HashMap::new(),
211            outputs: None,
212            output_type: None,
213            async_exec: None,
214        }
215    }
216
217    #[tokio::test]
218    async fn call_scope_with_two_steps() {
219        let scope = ScopeDef {
220            steps: vec![
221                cmd_step("step1", "echo first"),
222                cmd_step("step2", "echo second"),
223            ],
224            outputs: None,
225        };
226        let mut scopes = HashMap::new();
227        scopes.insert("my_scope".to_string(), scope);
228
229        let step = call_step("call_test", "my_scope");
230        let executor = CallExecutor::new(&scopes, None);
231        let config = StepConfig::default();
232        let ctx = Context::new(String::new(), HashMap::new());
233
234        let result = executor.execute(&step, &config, &ctx).await.unwrap();
235        // Last step output is "second\n"
236        assert!(result.text().contains("second"));
237    }
238
239    #[tokio::test]
240    async fn call_with_explicit_outputs() {
241        let scope = ScopeDef {
242            steps: vec![
243                cmd_step("step1", "echo hello"),
244            ],
245            outputs: Some("rendered: {{ steps.step1.stdout }}".to_string()),
246        };
247        let mut scopes = HashMap::new();
248        scopes.insert("output_scope".to_string(), scope);
249
250        let step = call_step("call_out", "output_scope");
251        let executor = CallExecutor::new(&scopes, None);
252        let config = StepConfig::default();
253        let ctx = Context::new(String::new(), HashMap::new());
254
255        let result = executor.execute(&step, &config, &ctx).await.unwrap();
256        assert!(result.text().contains("rendered:"));
257        assert!(result.text().contains("hello"));
258    }
259
260    #[tokio::test]
261    async fn call_missing_scope_error() {
262        let scopes = HashMap::new();
263        let step = call_step("call_bad", "nonexistent");
264        let executor = CallExecutor::new(&scopes, None);
265        let config = StepConfig::default();
266        let ctx = Context::new(String::new(), HashMap::new());
267
268        let result = executor.execute(&step, &config, &ctx).await;
269        assert!(result.is_err());
270    }
271}