Skip to main content

minion_engine/steps/
agent.rs

1use std::time::{Duration, Instant};
2
3use async_trait::async_trait;
4use tokio::io::AsyncWriteExt;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::display;
9use crate::config::StepConfig;
10use crate::engine::context::Context;
11use crate::error::StepError;
12use crate::workflow::schema::StepDef;
13
14use super::{AgentOutput, AgentStats, SandboxAwareExecutor, SharedSandbox, StepExecutor, StepOutput};
15
16pub struct AgentExecutor;
17
18impl AgentExecutor {
19    /// Build the claude CLI args from step config
20    pub(crate) fn build_args(config: &StepConfig, ctx: &Context) -> Result<Vec<String>, StepError> {
21        let mut args: Vec<String> = vec![
22            "-p".into(),
23            "--verbose".into(),
24            "--output-format".into(),
25            "stream-json".into(),
26        ];
27
28        if let Some(model) = config.get_str("model") {
29            args.extend(["--model".into(), model.into()]);
30        }
31        if let Some(sp) = config.get_str("system_prompt_append") {
32            args.extend(["--append-system-prompt".into(), sp.into()]);
33        }
34        if config.get_str("permissions") == Some("skip") {
35            args.push("--dangerously-skip-permissions".into());
36        }
37
38        // Session resume (Story 2.1)
39        if let Some(resume_step) = config.get_str("resume") {
40            let session_id = lookup_session_id(ctx, resume_step)?;
41            args.extend(["--resume".into(), session_id]);
42        }
43
44        // Session fork (Story 2.2) — uses same --resume flag; Claude CLI creates new session
45        if let Some(fork_step) = config.get_str("fork_session") {
46            let session_id = lookup_session_id(ctx, fork_step)?;
47            args.extend(["--resume".into(), session_id]);
48        }
49
50        Ok(args)
51    }
52
53    /// Parse stream-json output from Claude CLI
54    fn parse_stream_json(line: &str, response: &mut String, session_id: &mut Option<String>, stats: &mut AgentStats) {
55        if let Ok(msg) = serde_json::from_str::<serde_json::Value>(line) {
56            match msg.get("type").and_then(|t| t.as_str()) {
57                Some("result") => {
58                    if let Some(r) = msg.get("result").and_then(|r| r.as_str()) {
59                        *response = r.to_string();
60                    }
61                    *session_id =
62                        msg.get("session_id").and_then(|s| s.as_str()).map(String::from);
63                    if let Some(usage) = msg.get("usage") {
64                        stats.input_tokens =
65                            usage.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
66                        stats.output_tokens = usage
67                            .get("output_tokens")
68                            .and_then(|v| v.as_u64())
69                            .unwrap_or(0);
70                    }
71                    if let Some(cost) = msg.get("cost_usd").and_then(|c| c.as_f64()) {
72                        stats.cost_usd = cost;
73                    }
74                }
75                Some("assistant") => {
76                    if let Some(content) = msg.get("content").and_then(|c| c.as_str()) {
77                        display::agent_progress(content);
78                    }
79                }
80                Some("tool_use") => {
81                    if let Some(tool) = msg.get("tool").and_then(|t| t.as_str()) {
82                        display::tool_use(tool, "");
83                    }
84                }
85                _ => {}
86            }
87        }
88    }
89
90    /// Execute agent on the host (no sandbox)
91    async fn execute_on_host(
92        &self,
93        prompt: &str,
94        command: &str,
95        args: &[String],
96        timeout: Duration,
97    ) -> Result<StepOutput, StepError> {
98        let mut child = Command::new(command)
99            .args(args)
100            .stdin(std::process::Stdio::piped())
101            .stdout(std::process::Stdio::piped())
102            .stderr(std::process::Stdio::piped())
103            .spawn()
104            .map_err(|e| StepError::Fail(format!("Failed to spawn {command}: {e}")))?;
105
106        // Send prompt via stdin
107        if let Some(mut stdin) = child.stdin.take() {
108            stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
109                StepError::Fail(format!("Failed to write prompt to stdin: {e}"))
110            })?;
111            drop(stdin);
112        }
113
114        // Parse streaming JSON from stdout
115        let stdout = child.stdout.take().unwrap();
116        let reader = BufReader::new(stdout);
117        let mut lines = reader.lines();
118
119        let start = Instant::now();
120        let mut response = String::new();
121        let mut session_id = None;
122        let mut stats = AgentStats::default();
123
124        let parse_result = tokio::time::timeout(timeout, async {
125            while let Ok(Some(line)) = lines.next_line().await {
126                Self::parse_stream_json(&line, &mut response, &mut session_id, &mut stats);
127            }
128        })
129        .await;
130
131        if parse_result.is_err() {
132            let _ = child.kill().await;
133            return Err(StepError::Timeout(timeout));
134        }
135
136        let status = child.wait().await.map_err(|e| {
137            StepError::Fail(format!("Failed to wait for claude process: {e}"))
138        })?;
139
140        if !status.success() && response.is_empty() {
141            return Err(StepError::Fail(format!(
142                "Claude Code exited with status {}",
143                status.code().unwrap_or(-1)
144            )));
145        }
146
147        stats.duration = start.elapsed();
148
149        Ok(StepOutput::Agent(AgentOutput {
150            response,
151            session_id,
152            stats,
153        }))
154    }
155
156    /// Execute agent inside a Docker sandbox container
157    async fn execute_in_sandbox(
158        &self,
159        prompt: &str,
160        command: &str,
161        args: &[String],
162        timeout: Duration,
163        sandbox: &SharedSandbox,
164    ) -> Result<StepOutput, StepError> {
165        let sb = sandbox.as_ref().ok_or_else(|| {
166            StepError::Fail("Sandbox reference is None but sandbox execution was requested".into())
167        })?;
168
169        let start = Instant::now();
170
171        // Escape the prompt for shell embedding
172        let escaped_prompt = prompt.replace('\'', "'\\''");
173
174        // Build the full command to run inside the container:
175        // echo '<prompt>' | claude -p --output-format stream-json ...
176        let args_str = args.join(" ");
177        let sandbox_cmd = format!(
178            "echo '{}' | {} {}",
179            escaped_prompt, command, args_str
180        );
181
182        let sb_guard = sb.lock().await;
183        let sb_output = tokio::time::timeout(timeout, sb_guard.run_command(&sandbox_cmd))
184            .await
185            .map_err(|_| StepError::Timeout(timeout))?
186            .map_err(|e| StepError::Fail(format!("Sandbox agent execution failed: {e}")))?;
187
188        // Parse the stream-json output (line by line)
189        let mut response = String::new();
190        let mut session_id = None;
191        let mut stats = AgentStats::default();
192
193        for line in sb_output.stdout.lines() {
194            Self::parse_stream_json(line, &mut response, &mut session_id, &mut stats);
195        }
196
197        if sb_output.exit_code != 0 && response.is_empty() {
198            return Err(StepError::Fail(format!(
199                "Claude Code in sandbox exited with status {}: {}",
200                sb_output.exit_code,
201                sb_output.stderr.trim()
202            )));
203        }
204
205        stats.duration = start.elapsed();
206
207        Ok(StepOutput::Agent(AgentOutput {
208            response,
209            session_id,
210            stats,
211        }))
212    }
213}
214
215fn lookup_session_id(ctx: &Context, step_name: &str) -> Result<String, StepError> {
216    ctx.get_step(step_name)
217        .and_then(|out| {
218            if let StepOutput::Agent(a) = out {
219                a.session_id.clone()
220            } else {
221                None
222            }
223        })
224        .ok_or_else(|| StepError::Fail(format!("session not found for step '{}'", step_name)))
225}
226
227#[async_trait]
228impl StepExecutor for AgentExecutor {
229    async fn execute(
230        &self,
231        step: &StepDef,
232        config: &StepConfig,
233        ctx: &Context,
234    ) -> Result<StepOutput, StepError> {
235        self.execute_sandboxed(step, config, ctx, &None).await
236    }
237}
238
239#[async_trait]
240impl SandboxAwareExecutor for AgentExecutor {
241    async fn execute_sandboxed(
242        &self,
243        step: &StepDef,
244        config: &StepConfig,
245        ctx: &Context,
246        sandbox: &SharedSandbox,
247    ) -> Result<StepOutput, StepError> {
248        let prompt_template = step
249            .prompt
250            .as_ref()
251            .ok_or_else(|| StepError::Fail("agent step missing 'prompt' field".into()))?;
252
253        let prompt = ctx.render_template(prompt_template)?;
254        let command = config.get_str("command").unwrap_or("claude");
255        let timeout = config
256            .get_duration("timeout")
257            .unwrap_or(Duration::from_secs(600));
258        let args = Self::build_args(config, ctx)?;
259
260        if sandbox.is_some() {
261            self.execute_in_sandbox(&prompt, command, &args, timeout, sandbox).await
262        } else {
263            self.execute_on_host(&prompt, command, &args, timeout).await
264        }
265    }
266}
267
268#[cfg(test)]
269mod tests {
270    use super::*;
271    use crate::config::StepConfig;
272    use crate::engine::context::Context;
273    use crate::workflow::schema::StepType;
274    use std::collections::HashMap;
275
276    fn agent_step(prompt: &str) -> StepDef {
277        StepDef {
278            name: "test".to_string(),
279            step_type: StepType::Agent,
280            run: None,
281            prompt: Some(prompt.to_string()),
282            condition: None,
283            on_pass: None,
284            on_fail: None,
285            message: None,
286            scope: None,
287            max_iterations: None,
288            initial_value: None,
289            items: None,
290            parallel: None,
291            steps: None,
292            config: HashMap::new(),
293            outputs: None,
294            output_type: None,
295            async_exec: None,
296        }
297    }
298
299    #[tokio::test]
300    async fn agent_mock_claude() {
301        let mock_script = format!("{}/tests/fixtures/mock_claude.sh", env!("CARGO_MANIFEST_DIR"));
302
303        // Make executable (in case git checkout lost exec bit)
304        use std::os::unix::fs::PermissionsExt;
305        let mut perms = std::fs::metadata(&mock_script).unwrap().permissions();
306        perms.set_mode(0o755);
307        std::fs::set_permissions(&mock_script, perms).unwrap();
308
309        let step = agent_step("test prompt");
310        let mut values = HashMap::new();
311        values.insert(
312            "command".to_string(),
313            serde_json::Value::String(mock_script.clone()),
314        );
315        let config = StepConfig { values };
316        let ctx = Context::new(String::new(), HashMap::new());
317
318        let result = AgentExecutor.execute(&step, &config, &ctx).await.unwrap();
319        if let StepOutput::Agent(out) = result {
320            assert_eq!(out.response, "Task completed successfully");
321            assert_eq!(out.session_id.as_deref(), Some("mock-session-123"));
322            assert_eq!(out.stats.input_tokens, 10);
323            assert_eq!(out.stats.output_tokens, 20);
324        } else {
325            panic!("Expected Agent output");
326        }
327    }
328
329    #[tokio::test]
330    async fn resume_missing_step_returns_error() {
331        let step = agent_step("test prompt");
332        let mut values = HashMap::new();
333        values.insert("resume".to_string(), serde_json::Value::String("nonexistent".to_string()));
334        let config = StepConfig { values };
335        let ctx = Context::new(String::new(), HashMap::new());
336
337        let result = AgentExecutor.execute(&step, &config, &ctx).await;
338        assert!(result.is_err());
339        let err = result.unwrap_err().to_string();
340        assert!(
341            err.contains("session not found for step 'nonexistent'"),
342            "Unexpected error: {}",
343            err
344        );
345    }
346
347    #[test]
348    fn build_args_resume_adds_flag() {
349        use crate::steps::{AgentOutput, AgentStats, StepOutput};
350
351        let mut ctx = Context::new(String::new(), HashMap::new());
352        ctx.store(
353            "analyze",
354            StepOutput::Agent(AgentOutput {
355                response: "result".to_string(),
356                session_id: Some("sess-123".to_string()),
357                stats: AgentStats::default(),
358            }),
359        );
360
361        let mut values = HashMap::new();
362        values.insert("resume".to_string(), serde_json::Value::String("analyze".to_string()));
363        let config = StepConfig { values };
364
365        let args = AgentExecutor::build_args(&config, &ctx).unwrap();
366        let resume_idx = args.iter().position(|a| a == "--resume").expect("--resume not found");
367        assert_eq!(args[resume_idx + 1], "sess-123");
368    }
369
370    #[tokio::test]
371    async fn fork_session_missing_step_returns_error() {
372        let step = agent_step("test prompt");
373        let mut values = HashMap::new();
374        values.insert(
375            "fork_session".to_string(),
376            serde_json::Value::String("nonexistent".to_string()),
377        );
378        let config = StepConfig { values };
379        let ctx = Context::new(String::new(), HashMap::new());
380
381        let result = AgentExecutor.execute(&step, &config, &ctx).await;
382        assert!(result.is_err());
383        let err = result.unwrap_err().to_string();
384        assert!(
385            err.contains("session not found for step 'nonexistent'"),
386            "Unexpected error: {}",
387            err
388        );
389    }
390
391    #[test]
392    fn build_args_fork_session_adds_resume_flag() {
393        use crate::steps::{AgentOutput, AgentStats, StepOutput};
394
395        let mut ctx = Context::new(String::new(), HashMap::new());
396        ctx.store(
397            "analyze",
398            StepOutput::Agent(AgentOutput {
399                response: "result".to_string(),
400                session_id: Some("sess-fork-456".to_string()),
401                stats: AgentStats::default(),
402            }),
403        );
404
405        let mut values = HashMap::new();
406        values.insert(
407            "fork_session".to_string(),
408            serde_json::Value::String("analyze".to_string()),
409        );
410        let config = StepConfig { values };
411
412        let args = AgentExecutor::build_args(&config, &ctx).unwrap();
413        let resume_idx = args.iter().position(|a| a == "--resume").expect("--resume not found");
414        assert_eq!(args[resume_idx + 1], "sess-fork-456");
415    }
416
417    #[tokio::test]
418    async fn agent_sandbox_aware_no_sandbox_uses_host() {
419        let mock_script = format!("{}/tests/fixtures/mock_claude.sh", env!("CARGO_MANIFEST_DIR"));
420
421        use std::os::unix::fs::PermissionsExt;
422        let mut perms = std::fs::metadata(&mock_script).unwrap().permissions();
423        perms.set_mode(0o755);
424        std::fs::set_permissions(&mock_script, perms).unwrap();
425
426        let step = agent_step("test prompt");
427        let mut values = HashMap::new();
428        values.insert(
429            "command".to_string(),
430            serde_json::Value::String(mock_script),
431        );
432        let config = StepConfig { values };
433        let ctx = Context::new(String::new(), HashMap::new());
434
435        // With sandbox=None, should fall back to host execution
436        let result = AgentExecutor
437            .execute_sandboxed(&step, &config, &ctx, &None)
438            .await
439            .unwrap();
440        assert!(matches!(result, StepOutput::Agent(_)));
441    }
442}