Skip to main content

minion_engine/steps/
agent.rs

1use std::time::{Duration, Instant};
2
3use async_trait::async_trait;
4use tokio::io::AsyncWriteExt;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::display;
9use crate::config::StepConfig;
10use crate::engine::context::Context;
11use crate::error::StepError;
12use crate::workflow::schema::StepDef;
13
14use super::{AgentOutput, AgentStats, SandboxAwareExecutor, SharedSandbox, StepExecutor, StepOutput};
15
16pub struct AgentExecutor;
17
18impl AgentExecutor {
19    /// Build the claude CLI args from step config
20    pub(crate) fn build_args(config: &StepConfig, ctx: &Context) -> Result<Vec<String>, StepError> {
21        let mut args: Vec<String> = vec![
22            "-p".into(),
23            "--verbose".into(),
24            "--output-format".into(),
25            "stream-json".into(),
26        ];
27
28        if let Some(model) = config.get_str("model") {
29            args.extend(["--model".into(), model.into()]);
30        }
31        if let Some(sp) = config.get_str("system_prompt_append") {
32            args.extend(["--append-system-prompt".into(), sp.into()]);
33        }
34        if config.get_str("permissions") == Some("skip") {
35            args.push("--dangerously-skip-permissions".into());
36        }
37
38        // Session resume (Story 2.1)
39        if let Some(resume_step) = config.get_str("resume") {
40            let session_id = lookup_session_id(ctx, resume_step)?;
41            args.extend(["--resume".into(), session_id]);
42        }
43
44        // Session fork (Story 2.2) — uses same --resume flag; Claude CLI creates new session
45        if let Some(fork_step) = config.get_str("fork_session") {
46            let session_id = lookup_session_id(ctx, fork_step)?;
47            args.extend(["--resume".into(), session_id]);
48        }
49
50        Ok(args)
51    }
52
53    /// Parse stream-json output from Claude CLI
54    fn parse_stream_json(line: &str, response: &mut String, session_id: &mut Option<String>, stats: &mut AgentStats) {
55        if let Ok(msg) = serde_json::from_str::<serde_json::Value>(line) {
56            match msg.get("type").and_then(|t| t.as_str()) {
57                Some("result") => {
58                    if let Some(r) = msg.get("result").and_then(|r| r.as_str()) {
59                        *response = r.to_string();
60                    }
61                    *session_id =
62                        msg.get("session_id").and_then(|s| s.as_str()).map(String::from);
63                    if let Some(usage) = msg.get("usage") {
64                        stats.input_tokens =
65                            usage.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
66                        stats.output_tokens = usage
67                            .get("output_tokens")
68                            .and_then(|v| v.as_u64())
69                            .unwrap_or(0);
70                    }
71                    if let Some(cost) = msg.get("cost_usd").and_then(|c| c.as_f64()) {
72                        stats.cost_usd = cost;
73                    }
74                }
75                Some("assistant") => {
76                    if let Some(content) = msg.get("content").and_then(|c| c.as_str()) {
77                        display::agent_progress(content);
78                    }
79                }
80                Some("tool_use") => {
81                    if let Some(tool) = msg.get("tool").and_then(|t| t.as_str()) {
82                        display::tool_use(tool, "");
83                    }
84                }
85                _ => {}
86            }
87        }
88    }
89
90    /// Execute agent on the host (no sandbox)
91    async fn execute_on_host(
92        &self,
93        prompt: &str,
94        command: &str,
95        args: &[String],
96        timeout: Duration,
97    ) -> Result<StepOutput, StepError> {
98        let mut child = Command::new(command)
99            .args(args)
100            .stdin(std::process::Stdio::piped())
101            .stdout(std::process::Stdio::piped())
102            .stderr(std::process::Stdio::piped())
103            .spawn()
104            .map_err(|e| StepError::Fail(format!("Failed to spawn {command}: {e}")))?;
105
106        // Send prompt via stdin
107        if let Some(mut stdin) = child.stdin.take() {
108            stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
109                StepError::Fail(format!("Failed to write prompt to stdin: {e}"))
110            })?;
111            drop(stdin);
112        }
113
114        // Parse streaming JSON from stdout
115        let stdout = child.stdout.take().unwrap();
116        let reader = BufReader::new(stdout);
117        let mut lines = reader.lines();
118
119        let start = Instant::now();
120        let mut response = String::new();
121        let mut session_id = None;
122        let mut stats = AgentStats::default();
123
124        let parse_result = tokio::time::timeout(timeout, async {
125            while let Ok(Some(line)) = lines.next_line().await {
126                Self::parse_stream_json(&line, &mut response, &mut session_id, &mut stats);
127            }
128        })
129        .await;
130
131        if parse_result.is_err() {
132            let _ = child.kill().await;
133            return Err(StepError::Timeout(timeout));
134        }
135
136        let status = child.wait().await.map_err(|e| {
137            StepError::Fail(format!("Failed to wait for claude process: {e}"))
138        })?;
139
140        if !status.success() && response.is_empty() {
141            return Err(StepError::Fail(format!(
142                "Claude Code exited with status {}",
143                status.code().unwrap_or(-1)
144            )));
145        }
146
147        stats.duration = start.elapsed();
148
149        Ok(StepOutput::Agent(AgentOutput {
150            response,
151            session_id,
152            stats,
153        }))
154    }
155
156    /// Execute agent inside a Docker sandbox container
157    async fn execute_in_sandbox(
158        &self,
159        prompt: &str,
160        command: &str,
161        args: &[String],
162        timeout: Duration,
163        sandbox: &SharedSandbox,
164    ) -> Result<StepOutput, StepError> {
165        let sb = sandbox.as_ref().ok_or_else(|| {
166            StepError::Fail("Sandbox reference is None but sandbox execution was requested".into())
167        })?;
168
169        let start = Instant::now();
170
171        // Escape the prompt for shell embedding
172        let escaped_prompt = prompt.replace('\'', "'\\''");
173
174        // Build the full command to run inside the container:
175        // echo '<prompt>' | claude -p --output-format stream-json ...
176        let args_str = args.join(" ");
177        // Set HOME for the minion user so Claude CLI finds its config
178        let sandbox_cmd = format!(
179            "export HOME=/home/minion && echo '{}' | {} {}",
180            escaped_prompt, command, args_str
181        );
182
183        let sb_guard = sb.lock().await;
184        let sb_output = tokio::time::timeout(timeout, sb_guard.run_command_as_user(&sandbox_cmd, "minion"))
185            .await
186            .map_err(|_| StepError::Timeout(timeout))?
187            .map_err(|e| StepError::Fail(format!("Sandbox agent execution failed: {e}")))?;
188
189        // Parse the stream-json output (line by line)
190        let mut response = String::new();
191        let mut session_id = None;
192        let mut stats = AgentStats::default();
193
194        for line in sb_output.stdout.lines() {
195            Self::parse_stream_json(line, &mut response, &mut session_id, &mut stats);
196        }
197
198        if sb_output.exit_code != 0 && response.is_empty() {
199            return Err(StepError::Fail(format!(
200                "Claude Code in sandbox exited with status {}: {}",
201                sb_output.exit_code,
202                sb_output.stderr.trim()
203            )));
204        }
205
206        stats.duration = start.elapsed();
207
208        Ok(StepOutput::Agent(AgentOutput {
209            response,
210            session_id,
211            stats,
212        }))
213    }
214}
215
216fn lookup_session_id(ctx: &Context, step_name: &str) -> Result<String, StepError> {
217    ctx.get_step(step_name)
218        .and_then(|out| {
219            if let StepOutput::Agent(a) = out {
220                a.session_id.clone()
221            } else {
222                None
223            }
224        })
225        .ok_or_else(|| StepError::Fail(format!("session not found for step '{}'", step_name)))
226}
227
228#[async_trait]
229impl StepExecutor for AgentExecutor {
230    async fn execute(
231        &self,
232        step: &StepDef,
233        config: &StepConfig,
234        ctx: &Context,
235    ) -> Result<StepOutput, StepError> {
236        self.execute_sandboxed(step, config, ctx, &None).await
237    }
238}
239
240#[async_trait]
241impl SandboxAwareExecutor for AgentExecutor {
242    async fn execute_sandboxed(
243        &self,
244        step: &StepDef,
245        config: &StepConfig,
246        ctx: &Context,
247        sandbox: &SharedSandbox,
248    ) -> Result<StepOutput, StepError> {
249        let prompt_template = step
250            .prompt
251            .as_ref()
252            .ok_or_else(|| StepError::Fail("agent step missing 'prompt' field".into()))?;
253
254        let prompt = ctx.render_template(prompt_template)?;
255        let command = config.get_str("command").unwrap_or("claude");
256        let timeout = config
257            .get_duration("timeout")
258            .unwrap_or(Duration::from_secs(600));
259        let args = Self::build_args(config, ctx)?;
260
261        if sandbox.is_some() {
262            self.execute_in_sandbox(&prompt, command, &args, timeout, sandbox).await
263        } else {
264            self.execute_on_host(&prompt, command, &args, timeout).await
265        }
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::config::StepConfig;
273    use crate::engine::context::Context;
274    use crate::workflow::schema::StepType;
275    use std::collections::HashMap;
276
277    fn agent_step(prompt: &str) -> StepDef {
278        StepDef {
279            name: "test".to_string(),
280            step_type: StepType::Agent,
281            run: None,
282            prompt: Some(prompt.to_string()),
283            condition: None,
284            on_pass: None,
285            on_fail: None,
286            message: None,
287            scope: None,
288            max_iterations: None,
289            initial_value: None,
290            items: None,
291            parallel: None,
292            steps: None,
293            config: HashMap::new(),
294            outputs: None,
295            output_type: None,
296            async_exec: None,
297        }
298    }
299
300    #[tokio::test]
301    async fn agent_mock_claude() {
302        let mock_script = format!("{}/tests/fixtures/mock_claude.sh", env!("CARGO_MANIFEST_DIR"));
303
304        // Make executable (in case git checkout lost exec bit)
305        use std::os::unix::fs::PermissionsExt;
306        let mut perms = std::fs::metadata(&mock_script).unwrap().permissions();
307        perms.set_mode(0o755);
308        std::fs::set_permissions(&mock_script, perms).unwrap();
309
310        let step = agent_step("test prompt");
311        let mut values = HashMap::new();
312        values.insert(
313            "command".to_string(),
314            serde_json::Value::String(mock_script.clone()),
315        );
316        let config = StepConfig { values };
317        let ctx = Context::new(String::new(), HashMap::new());
318
319        let result = AgentExecutor.execute(&step, &config, &ctx).await.unwrap();
320        if let StepOutput::Agent(out) = result {
321            assert_eq!(out.response, "Task completed successfully");
322            assert_eq!(out.session_id.as_deref(), Some("mock-session-123"));
323            assert_eq!(out.stats.input_tokens, 10);
324            assert_eq!(out.stats.output_tokens, 20);
325        } else {
326            panic!("Expected Agent output");
327        }
328    }
329
330    #[tokio::test]
331    async fn resume_missing_step_returns_error() {
332        let step = agent_step("test prompt");
333        let mut values = HashMap::new();
334        values.insert("resume".to_string(), serde_json::Value::String("nonexistent".to_string()));
335        let config = StepConfig { values };
336        let ctx = Context::new(String::new(), HashMap::new());
337
338        let result = AgentExecutor.execute(&step, &config, &ctx).await;
339        assert!(result.is_err());
340        let err = result.unwrap_err().to_string();
341        assert!(
342            err.contains("session not found for step 'nonexistent'"),
343            "Unexpected error: {}",
344            err
345        );
346    }
347
348    #[test]
349    fn build_args_resume_adds_flag() {
350        use crate::steps::{AgentOutput, AgentStats, StepOutput};
351
352        let mut ctx = Context::new(String::new(), HashMap::new());
353        ctx.store(
354            "analyze",
355            StepOutput::Agent(AgentOutput {
356                response: "result".to_string(),
357                session_id: Some("sess-123".to_string()),
358                stats: AgentStats::default(),
359            }),
360        );
361
362        let mut values = HashMap::new();
363        values.insert("resume".to_string(), serde_json::Value::String("analyze".to_string()));
364        let config = StepConfig { values };
365
366        let args = AgentExecutor::build_args(&config, &ctx).unwrap();
367        let resume_idx = args.iter().position(|a| a == "--resume").expect("--resume not found");
368        assert_eq!(args[resume_idx + 1], "sess-123");
369    }
370
371    #[tokio::test]
372    async fn fork_session_missing_step_returns_error() {
373        let step = agent_step("test prompt");
374        let mut values = HashMap::new();
375        values.insert(
376            "fork_session".to_string(),
377            serde_json::Value::String("nonexistent".to_string()),
378        );
379        let config = StepConfig { values };
380        let ctx = Context::new(String::new(), HashMap::new());
381
382        let result = AgentExecutor.execute(&step, &config, &ctx).await;
383        assert!(result.is_err());
384        let err = result.unwrap_err().to_string();
385        assert!(
386            err.contains("session not found for step 'nonexistent'"),
387            "Unexpected error: {}",
388            err
389        );
390    }
391
392    #[test]
393    fn build_args_fork_session_adds_resume_flag() {
394        use crate::steps::{AgentOutput, AgentStats, StepOutput};
395
396        let mut ctx = Context::new(String::new(), HashMap::new());
397        ctx.store(
398            "analyze",
399            StepOutput::Agent(AgentOutput {
400                response: "result".to_string(),
401                session_id: Some("sess-fork-456".to_string()),
402                stats: AgentStats::default(),
403            }),
404        );
405
406        let mut values = HashMap::new();
407        values.insert(
408            "fork_session".to_string(),
409            serde_json::Value::String("analyze".to_string()),
410        );
411        let config = StepConfig { values };
412
413        let args = AgentExecutor::build_args(&config, &ctx).unwrap();
414        let resume_idx = args.iter().position(|a| a == "--resume").expect("--resume not found");
415        assert_eq!(args[resume_idx + 1], "sess-fork-456");
416    }
417
418    #[tokio::test]
419    async fn agent_sandbox_aware_no_sandbox_uses_host() {
420        let mock_script = format!("{}/tests/fixtures/mock_claude.sh", env!("CARGO_MANIFEST_DIR"));
421
422        use std::os::unix::fs::PermissionsExt;
423        let mut perms = std::fs::metadata(&mock_script).unwrap().permissions();
424        perms.set_mode(0o755);
425        std::fs::set_permissions(&mock_script, perms).unwrap();
426
427        let step = agent_step("test prompt");
428        let mut values = HashMap::new();
429        values.insert(
430            "command".to_string(),
431            serde_json::Value::String(mock_script),
432        );
433        let config = StepConfig { values };
434        let ctx = Context::new(String::new(), HashMap::new());
435
436        // With sandbox=None, should fall back to host execution
437        let result = AgentExecutor
438            .execute_sandboxed(&step, &config, &ctx, &None)
439            .await
440            .unwrap();
441        assert!(matches!(result, StepOutput::Agent(_)));
442    }
443}