1use std::time::{Duration, Instant};
2
3use async_trait::async_trait;
4use tokio::io::AsyncWriteExt;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::display;
9use crate::config::StepConfig;
10use crate::engine::context::Context;
11use crate::error::StepError;
12use crate::workflow::schema::StepDef;
13
14use super::{AgentOutput, AgentStats, SandboxAwareExecutor, SharedSandbox, StepExecutor, StepOutput};
15
16pub struct AgentExecutor;
17
18impl AgentExecutor {
19 pub(crate) fn build_args(config: &StepConfig, ctx: &Context) -> Result<Vec<String>, StepError> {
21 let mut args: Vec<String> = vec![
22 "-p".into(),
23 "--verbose".into(),
24 "--output-format".into(),
25 "stream-json".into(),
26 ];
27
28 if let Some(model) = config.get_str("model") {
29 args.extend(["--model".into(), model.into()]);
30 }
31 if let Some(sp) = config.get_str("system_prompt_append") {
32 args.extend(["--append-system-prompt".into(), sp.into()]);
33 }
34 if config.get_str("permissions") == Some("skip") {
35 args.push("--dangerously-skip-permissions".into());
36 }
37
38 if let Some(resume_step) = config.get_str("resume") {
40 let session_id = lookup_session_id(ctx, resume_step)?;
41 args.extend(["--resume".into(), session_id]);
42 }
43
44 if let Some(fork_step) = config.get_str("fork_session") {
46 let session_id = lookup_session_id(ctx, fork_step)?;
47 args.extend(["--resume".into(), session_id]);
48 }
49
50 Ok(args)
51 }
52
53 fn parse_stream_json(line: &str, response: &mut String, session_id: &mut Option<String>, stats: &mut AgentStats) {
55 if let Ok(msg) = serde_json::from_str::<serde_json::Value>(line) {
56 match msg.get("type").and_then(|t| t.as_str()) {
57 Some("result") => {
58 if let Some(r) = msg.get("result").and_then(|r| r.as_str()) {
59 *response = r.to_string();
60 }
61 *session_id =
62 msg.get("session_id").and_then(|s| s.as_str()).map(String::from);
63 if let Some(usage) = msg.get("usage") {
64 stats.input_tokens =
65 usage.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
66 stats.output_tokens = usage
67 .get("output_tokens")
68 .and_then(|v| v.as_u64())
69 .unwrap_or(0);
70 }
71 if let Some(cost) = msg.get("cost_usd").and_then(|c| c.as_f64()) {
72 stats.cost_usd = cost;
73 }
74 }
75 Some("assistant") => {
76 if let Some(content) = msg.get("content").and_then(|c| c.as_str()) {
77 display::agent_progress(content);
78 }
79 }
80 Some("tool_use") => {
81 if let Some(tool) = msg.get("tool").and_then(|t| t.as_str()) {
82 display::tool_use(tool, "");
83 }
84 }
85 _ => {}
86 }
87 }
88 }
89
90 async fn execute_on_host(
92 &self,
93 prompt: &str,
94 command: &str,
95 args: &[String],
96 timeout: Duration,
97 ) -> Result<StepOutput, StepError> {
98 let mut child = Command::new(command)
99 .args(args)
100 .stdin(std::process::Stdio::piped())
101 .stdout(std::process::Stdio::piped())
102 .stderr(std::process::Stdio::piped())
103 .spawn()
104 .map_err(|e| StepError::Fail(format!("Failed to spawn {command}: {e}")))?;
105
106 if let Some(mut stdin) = child.stdin.take() {
108 stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
109 StepError::Fail(format!("Failed to write prompt to stdin: {e}"))
110 })?;
111 drop(stdin);
112 }
113
114 let stdout = child.stdout.take().unwrap();
116 let reader = BufReader::new(stdout);
117 let mut lines = reader.lines();
118
119 let start = Instant::now();
120 let mut response = String::new();
121 let mut session_id = None;
122 let mut stats = AgentStats::default();
123
124 let parse_result = tokio::time::timeout(timeout, async {
125 while let Ok(Some(line)) = lines.next_line().await {
126 Self::parse_stream_json(&line, &mut response, &mut session_id, &mut stats);
127 }
128 })
129 .await;
130
131 if parse_result.is_err() {
132 let _ = child.kill().await;
133 return Err(StepError::Timeout(timeout));
134 }
135
136 let status = child.wait().await.map_err(|e| {
137 StepError::Fail(format!("Failed to wait for claude process: {e}"))
138 })?;
139
140 if !status.success() && response.is_empty() {
141 return Err(StepError::Fail(format!(
142 "Claude Code exited with status {}",
143 status.code().unwrap_or(-1)
144 )));
145 }
146
147 stats.duration = start.elapsed();
148
149 Ok(StepOutput::Agent(AgentOutput {
150 response,
151 session_id,
152 stats,
153 }))
154 }
155
156 async fn execute_in_sandbox(
158 &self,
159 prompt: &str,
160 command: &str,
161 args: &[String],
162 timeout: Duration,
163 sandbox: &SharedSandbox,
164 ) -> Result<StepOutput, StepError> {
165 let sb = sandbox.as_ref().ok_or_else(|| {
166 StepError::Fail("Sandbox reference is None but sandbox execution was requested".into())
167 })?;
168
169 let start = Instant::now();
170
171 let escaped_prompt = prompt.replace('\'', "'\\''");
173
174 let args_str = args.join(" ");
177 let sandbox_cmd = format!(
179 "export HOME=/home/minion && echo '{}' | {} {}",
180 escaped_prompt, command, args_str
181 );
182
183 let sb_guard = sb.lock().await;
184 let sb_output = tokio::time::timeout(timeout, sb_guard.run_command_as_user(&sandbox_cmd, "minion"))
185 .await
186 .map_err(|_| StepError::Timeout(timeout))?
187 .map_err(|e| StepError::Fail(format!("Sandbox agent execution failed: {e}")))?;
188
189 let mut response = String::new();
191 let mut session_id = None;
192 let mut stats = AgentStats::default();
193
194 for line in sb_output.stdout.lines() {
195 Self::parse_stream_json(line, &mut response, &mut session_id, &mut stats);
196 }
197
198 if sb_output.exit_code != 0 && response.is_empty() {
199 return Err(StepError::Fail(format!(
200 "Claude Code in sandbox exited with status {}: {}",
201 sb_output.exit_code,
202 sb_output.stderr.trim()
203 )));
204 }
205
206 stats.duration = start.elapsed();
207
208 Ok(StepOutput::Agent(AgentOutput {
209 response,
210 session_id,
211 stats,
212 }))
213 }
214}
215
216fn lookup_session_id(ctx: &Context, step_name: &str) -> Result<String, StepError> {
217 ctx.get_step(step_name)
218 .and_then(|out| {
219 if let StepOutput::Agent(a) = out {
220 a.session_id.clone()
221 } else {
222 None
223 }
224 })
225 .ok_or_else(|| StepError::Fail(format!("session not found for step '{}'", step_name)))
226}
227
228#[async_trait]
229impl StepExecutor for AgentExecutor {
230 async fn execute(
231 &self,
232 step: &StepDef,
233 config: &StepConfig,
234 ctx: &Context,
235 ) -> Result<StepOutput, StepError> {
236 self.execute_sandboxed(step, config, ctx, &None).await
237 }
238}
239
240#[async_trait]
241impl SandboxAwareExecutor for AgentExecutor {
242 async fn execute_sandboxed(
243 &self,
244 step: &StepDef,
245 config: &StepConfig,
246 ctx: &Context,
247 sandbox: &SharedSandbox,
248 ) -> Result<StepOutput, StepError> {
249 let prompt_template = step
250 .prompt
251 .as_ref()
252 .ok_or_else(|| StepError::Fail("agent step missing 'prompt' field".into()))?;
253
254 let prompt = ctx.render_template(prompt_template)?;
255 let command = config.get_str("command").unwrap_or("claude");
256 let timeout = config
257 .get_duration("timeout")
258 .unwrap_or(Duration::from_secs(600));
259 let args = Self::build_args(config, ctx)?;
260
261 if sandbox.is_some() {
262 self.execute_in_sandbox(&prompt, command, &args, timeout, sandbox).await
263 } else {
264 self.execute_on_host(&prompt, command, &args, timeout).await
265 }
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::config::StepConfig;
273 use crate::engine::context::Context;
274 use crate::workflow::schema::StepType;
275 use std::collections::HashMap;
276
277 fn agent_step(prompt: &str) -> StepDef {
278 StepDef {
279 name: "test".to_string(),
280 step_type: StepType::Agent,
281 run: None,
282 prompt: Some(prompt.to_string()),
283 condition: None,
284 on_pass: None,
285 on_fail: None,
286 message: None,
287 scope: None,
288 max_iterations: None,
289 initial_value: None,
290 items: None,
291 parallel: None,
292 steps: None,
293 config: HashMap::new(),
294 outputs: None,
295 output_type: None,
296 async_exec: None,
297 }
298 }
299
300 #[tokio::test]
301 async fn agent_mock_claude() {
302 let mock_script = format!("{}/tests/fixtures/mock_claude.sh", env!("CARGO_MANIFEST_DIR"));
303
304 use std::os::unix::fs::PermissionsExt;
306 let mut perms = std::fs::metadata(&mock_script).unwrap().permissions();
307 perms.set_mode(0o755);
308 std::fs::set_permissions(&mock_script, perms).unwrap();
309
310 let step = agent_step("test prompt");
311 let mut values = HashMap::new();
312 values.insert(
313 "command".to_string(),
314 serde_json::Value::String(mock_script.clone()),
315 );
316 let config = StepConfig { values };
317 let ctx = Context::new(String::new(), HashMap::new());
318
319 let result = AgentExecutor.execute(&step, &config, &ctx).await.unwrap();
320 if let StepOutput::Agent(out) = result {
321 assert_eq!(out.response, "Task completed successfully");
322 assert_eq!(out.session_id.as_deref(), Some("mock-session-123"));
323 assert_eq!(out.stats.input_tokens, 10);
324 assert_eq!(out.stats.output_tokens, 20);
325 } else {
326 panic!("Expected Agent output");
327 }
328 }
329
330 #[tokio::test]
331 async fn resume_missing_step_returns_error() {
332 let step = agent_step("test prompt");
333 let mut values = HashMap::new();
334 values.insert("resume".to_string(), serde_json::Value::String("nonexistent".to_string()));
335 let config = StepConfig { values };
336 let ctx = Context::new(String::new(), HashMap::new());
337
338 let result = AgentExecutor.execute(&step, &config, &ctx).await;
339 assert!(result.is_err());
340 let err = result.unwrap_err().to_string();
341 assert!(
342 err.contains("session not found for step 'nonexistent'"),
343 "Unexpected error: {}",
344 err
345 );
346 }
347
348 #[test]
349 fn build_args_resume_adds_flag() {
350 use crate::steps::{AgentOutput, AgentStats, StepOutput};
351
352 let mut ctx = Context::new(String::new(), HashMap::new());
353 ctx.store(
354 "analyze",
355 StepOutput::Agent(AgentOutput {
356 response: "result".to_string(),
357 session_id: Some("sess-123".to_string()),
358 stats: AgentStats::default(),
359 }),
360 );
361
362 let mut values = HashMap::new();
363 values.insert("resume".to_string(), serde_json::Value::String("analyze".to_string()));
364 let config = StepConfig { values };
365
366 let args = AgentExecutor::build_args(&config, &ctx).unwrap();
367 let resume_idx = args.iter().position(|a| a == "--resume").expect("--resume not found");
368 assert_eq!(args[resume_idx + 1], "sess-123");
369 }
370
371 #[tokio::test]
372 async fn fork_session_missing_step_returns_error() {
373 let step = agent_step("test prompt");
374 let mut values = HashMap::new();
375 values.insert(
376 "fork_session".to_string(),
377 serde_json::Value::String("nonexistent".to_string()),
378 );
379 let config = StepConfig { values };
380 let ctx = Context::new(String::new(), HashMap::new());
381
382 let result = AgentExecutor.execute(&step, &config, &ctx).await;
383 assert!(result.is_err());
384 let err = result.unwrap_err().to_string();
385 assert!(
386 err.contains("session not found for step 'nonexistent'"),
387 "Unexpected error: {}",
388 err
389 );
390 }
391
392 #[test]
393 fn build_args_fork_session_adds_resume_flag() {
394 use crate::steps::{AgentOutput, AgentStats, StepOutput};
395
396 let mut ctx = Context::new(String::new(), HashMap::new());
397 ctx.store(
398 "analyze",
399 StepOutput::Agent(AgentOutput {
400 response: "result".to_string(),
401 session_id: Some("sess-fork-456".to_string()),
402 stats: AgentStats::default(),
403 }),
404 );
405
406 let mut values = HashMap::new();
407 values.insert(
408 "fork_session".to_string(),
409 serde_json::Value::String("analyze".to_string()),
410 );
411 let config = StepConfig { values };
412
413 let args = AgentExecutor::build_args(&config, &ctx).unwrap();
414 let resume_idx = args.iter().position(|a| a == "--resume").expect("--resume not found");
415 assert_eq!(args[resume_idx + 1], "sess-fork-456");
416 }
417
418 #[tokio::test]
419 async fn agent_sandbox_aware_no_sandbox_uses_host() {
420 let mock_script = format!("{}/tests/fixtures/mock_claude.sh", env!("CARGO_MANIFEST_DIR"));
421
422 use std::os::unix::fs::PermissionsExt;
423 let mut perms = std::fs::metadata(&mock_script).unwrap().permissions();
424 perms.set_mode(0o755);
425 std::fs::set_permissions(&mock_script, perms).unwrap();
426
427 let step = agent_step("test prompt");
428 let mut values = HashMap::new();
429 values.insert(
430 "command".to_string(),
431 serde_json::Value::String(mock_script),
432 );
433 let config = StepConfig { values };
434 let ctx = Context::new(String::new(), HashMap::new());
435
436 let result = AgentExecutor
438 .execute_sandboxed(&step, &config, &ctx, &None)
439 .await
440 .unwrap();
441 assert!(matches!(result, StepOutput::Agent(_)));
442 }
443}