Skip to main content

minion_engine/steps/
call.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4
5use crate::config::StepConfig;
6use crate::control_flow::ControlFlow;
7use crate::engine::context::Context;
8use crate::error::StepError;
9use crate::workflow::schema::{ScopeDef, StepDef, StepType};
10
11use super::{
12    agent::AgentExecutor, cmd::CmdExecutor, gate::GateExecutor, repeat::RepeatExecutor,
13    chat::ChatExecutor, CmdOutput, IterationOutput, ScopeOutput, StepExecutor, StepOutput,
14};
15
16pub struct CallExecutor {
17    scopes: HashMap<String, ScopeDef>,
18}
19
20impl CallExecutor {
21    pub fn new(scopes: &HashMap<String, ScopeDef>) -> Self {
22        Self {
23            scopes: scopes.clone(),
24        }
25    }
26}
27
28#[async_trait]
29impl StepExecutor for CallExecutor {
30    async fn execute(
31        &self,
32        step: &StepDef,
33        _config: &StepConfig,
34        ctx: &Context,
35    ) -> Result<StepOutput, StepError> {
36        let scope_name = step
37            .scope
38            .as_ref()
39            .ok_or_else(|| StepError::Fail("call step missing 'scope' field".into()))?;
40
41        let scope = self
42            .scopes
43            .get(scope_name)
44            .ok_or_else(|| StepError::Fail(format!("scope '{}' not found", scope_name)))?
45            .clone();
46
47        let mut child_ctx = Context::new(
48            ctx.get_var("target")
49                .and_then(|v| v.as_str())
50                .unwrap_or("")
51                .to_string(),
52            HashMap::new(),
53        );
54
55        let mut last_output = StepOutput::Empty;
56
57        for scope_step in &scope.steps {
58            let step_config = StepConfig::default();
59            let result =
60                dispatch_scope_step(scope_step, &step_config, &child_ctx, &self.scopes).await;
61
62            match result {
63                Ok(output) => {
64                    child_ctx.store(&scope_step.name, output.clone());
65                    last_output = output;
66                }
67                Err(StepError::ControlFlow(ControlFlow::Break { value, .. })) => {
68                    if let Some(v) = value {
69                        last_output = v;
70                    }
71                    break;
72                }
73                Err(StepError::ControlFlow(ControlFlow::Skip { .. })) => {
74                    child_ctx.store(&scope_step.name, StepOutput::Empty);
75                }
76                Err(StepError::ControlFlow(ControlFlow::Next { .. })) => {
77                    break;
78                }
79                Err(e) => return Err(e),
80            }
81        }
82
83        // Use scope outputs if defined, otherwise last step output
84        let final_output = if let Some(outputs_template) = &scope.outputs {
85            match child_ctx.render_template(outputs_template) {
86                Ok(rendered) => StepOutput::Cmd(CmdOutput {
87                    stdout: rendered,
88                    stderr: String::new(),
89                    exit_code: 0,
90                    duration: std::time::Duration::ZERO,
91                }),
92                Err(_) => last_output,
93            }
94        } else {
95            last_output
96        };
97
98        Ok(StepOutput::Scope(ScopeOutput {
99            iterations: vec![IterationOutput {
100                index: 0,
101                output: final_output.clone(),
102            }],
103            final_value: Some(Box::new(final_output)),
104        }))
105    }
106}
107
108pub(super) async fn dispatch_scope_step(
109    step: &StepDef,
110    config: &StepConfig,
111    ctx: &Context,
112    scopes: &HashMap<String, ScopeDef>,
113) -> Result<StepOutput, StepError> {
114    match step.step_type {
115        StepType::Cmd => CmdExecutor.execute(step, config, ctx).await,
116        StepType::Agent => AgentExecutor.execute(step, config, ctx).await,
117        StepType::Gate => GateExecutor.execute(step, config, ctx).await,
118        StepType::Chat => ChatExecutor.execute(step, config, ctx).await,
119        StepType::Repeat => RepeatExecutor::new(scopes).execute(step, config, ctx).await,
120        StepType::Call => CallExecutor::new(scopes).execute(step, config, ctx).await,
121        _ => Err(StepError::Fail(format!(
122            "Step type '{}' not supported in scope",
123            step.step_type
124        ))),
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use std::collections::HashMap;
132    use crate::workflow::schema::{ScopeDef, StepType};
133
134    fn cmd_step(name: &str, run: &str) -> StepDef {
135        StepDef {
136            name: name.to_string(),
137            step_type: StepType::Cmd,
138            run: Some(run.to_string()),
139            prompt: None,
140            condition: None,
141            on_pass: None,
142            on_fail: None,
143            message: None,
144            scope: None,
145            max_iterations: None,
146            initial_value: None,
147            items: None,
148            parallel: None,
149            steps: None,
150            config: HashMap::new(),
151            outputs: None,
152            output_type: None,
153            async_exec: None,
154        }
155    }
156
157    fn call_step(name: &str, scope: &str) -> StepDef {
158        StepDef {
159            name: name.to_string(),
160            step_type: StepType::Call,
161            run: None,
162            prompt: None,
163            condition: None,
164            on_pass: None,
165            on_fail: None,
166            message: None,
167            scope: Some(scope.to_string()),
168            max_iterations: None,
169            initial_value: None,
170            items: None,
171            parallel: None,
172            steps: None,
173            config: HashMap::new(),
174            outputs: None,
175            output_type: None,
176            async_exec: None,
177        }
178    }
179
180    #[tokio::test]
181    async fn call_scope_with_two_steps() {
182        let scope = ScopeDef {
183            steps: vec![
184                cmd_step("step1", "echo first"),
185                cmd_step("step2", "echo second"),
186            ],
187            outputs: None,
188        };
189        let mut scopes = HashMap::new();
190        scopes.insert("my_scope".to_string(), scope);
191
192        let step = call_step("call_test", "my_scope");
193        let executor = CallExecutor::new(&scopes);
194        let config = StepConfig::default();
195        let ctx = Context::new(String::new(), HashMap::new());
196
197        let result = executor.execute(&step, &config, &ctx).await.unwrap();
198        // Last step output is "second\n"
199        assert!(result.text().contains("second"));
200    }
201
202    #[tokio::test]
203    async fn call_with_explicit_outputs() {
204        let scope = ScopeDef {
205            steps: vec![
206                cmd_step("step1", "echo hello"),
207            ],
208            outputs: Some("rendered: {{ steps.step1.stdout }}".to_string()),
209        };
210        let mut scopes = HashMap::new();
211        scopes.insert("output_scope".to_string(), scope);
212
213        let step = call_step("call_out", "output_scope");
214        let executor = CallExecutor::new(&scopes);
215        let config = StepConfig::default();
216        let ctx = Context::new(String::new(), HashMap::new());
217
218        let result = executor.execute(&step, &config, &ctx).await.unwrap();
219        assert!(result.text().contains("rendered:"));
220        assert!(result.text().contains("hello"));
221    }
222
223    #[tokio::test]
224    async fn call_missing_scope_error() {
225        let scopes = HashMap::new();
226        let step = call_step("call_bad", "nonexistent");
227        let executor = CallExecutor::new(&scopes);
228        let config = StepConfig::default();
229        let ctx = Context::new(String::new(), HashMap::new());
230
231        let result = executor.execute(&step, &config, &ctx).await;
232        assert!(result.is_err());
233    }
234}