1use std::time::{Duration, Instant};
2
3use async_trait::async_trait;
4use tokio::io::AsyncWriteExt;
5use tokio::io::{AsyncBufReadExt, BufReader};
6use tokio::process::Command;
7
8use crate::cli::display;
9use crate::config::StepConfig;
10use crate::engine::context::Context;
11use crate::error::StepError;
12use crate::workflow::schema::StepDef;
13
14use super::{AgentOutput, AgentStats, SandboxAwareExecutor, SharedSandbox, StepExecutor, StepOutput};
15
16pub struct AgentExecutor;
17
18impl AgentExecutor {
19 pub(crate) fn build_args(config: &StepConfig, ctx: &Context) -> Result<Vec<String>, StepError> {
21 let mut args: Vec<String> = vec![
22 "-p".into(),
23 "--verbose".into(),
24 "--output-format".into(),
25 "stream-json".into(),
26 ];
27
28 if let Some(model) = config.get_str("model") {
29 args.extend(["--model".into(), model.into()]);
30 }
31 if let Some(sp) = config.get_str("system_prompt_append") {
32 args.extend(["--append-system-prompt".into(), sp.into()]);
33 }
34 if config.get_str("permissions") == Some("skip") {
35 args.push("--dangerously-skip-permissions".into());
36 }
37
38 if let Some(resume_step) = config.get_str("resume") {
40 let session_id = lookup_session_id(ctx, resume_step)?;
41 args.extend(["--resume".into(), session_id]);
42 }
43
44 if let Some(fork_step) = config.get_str("fork_session") {
46 let session_id = lookup_session_id(ctx, fork_step)?;
47 args.extend(["--resume".into(), session_id]);
48 }
49
50 Ok(args)
51 }
52
53 fn parse_stream_json(line: &str, response: &mut String, session_id: &mut Option<String>, stats: &mut AgentStats) {
55 if let Ok(msg) = serde_json::from_str::<serde_json::Value>(line) {
56 match msg.get("type").and_then(|t| t.as_str()) {
57 Some("result") => {
58 if let Some(r) = msg.get("result").and_then(|r| r.as_str()) {
59 *response = r.to_string();
60 }
61 *session_id =
62 msg.get("session_id").and_then(|s| s.as_str()).map(String::from);
63 if let Some(usage) = msg.get("usage") {
64 stats.input_tokens =
65 usage.get("input_tokens").and_then(|v| v.as_u64()).unwrap_or(0);
66 stats.output_tokens = usage
67 .get("output_tokens")
68 .and_then(|v| v.as_u64())
69 .unwrap_or(0);
70 }
71 if let Some(cost) = msg.get("cost_usd").and_then(|c| c.as_f64()) {
72 stats.cost_usd = cost;
73 }
74 }
75 Some("assistant") => {
76 if let Some(content) = msg.get("content").and_then(|c| c.as_str()) {
77 display::agent_progress(content);
78 }
79 }
80 Some("tool_use") => {
81 if let Some(tool) = msg.get("tool").and_then(|t| t.as_str()) {
82 display::tool_use(tool, "");
83 }
84 }
85 _ => {}
86 }
87 }
88 }
89
90 async fn execute_on_host(
92 &self,
93 prompt: &str,
94 command: &str,
95 args: &[String],
96 timeout: Duration,
97 ) -> Result<StepOutput, StepError> {
98 let mut child = Command::new(command)
99 .args(args)
100 .stdin(std::process::Stdio::piped())
101 .stdout(std::process::Stdio::piped())
102 .stderr(std::process::Stdio::piped())
103 .spawn()
104 .map_err(|e| StepError::Fail(format!("Failed to spawn {command}: {e}")))?;
105
106 if let Some(mut stdin) = child.stdin.take() {
108 stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
109 StepError::Fail(format!("Failed to write prompt to stdin: {e}"))
110 })?;
111 drop(stdin);
112 }
113
114 let stdout = child.stdout.take().unwrap();
116 let reader = BufReader::new(stdout);
117 let mut lines = reader.lines();
118
119 let start = Instant::now();
120 let mut response = String::new();
121 let mut session_id = None;
122 let mut stats = AgentStats::default();
123
124 let parse_result = tokio::time::timeout(timeout, async {
125 while let Ok(Some(line)) = lines.next_line().await {
126 Self::parse_stream_json(&line, &mut response, &mut session_id, &mut stats);
127 }
128 })
129 .await;
130
131 if parse_result.is_err() {
132 let _ = child.kill().await;
133 return Err(StepError::Timeout(timeout));
134 }
135
136 let status = child.wait().await.map_err(|e| {
137 StepError::Fail(format!("Failed to wait for claude process: {e}"))
138 })?;
139
140 if !status.success() && response.is_empty() {
141 return Err(StepError::Fail(format!(
142 "Claude Code exited with status {}",
143 status.code().unwrap_or(-1)
144 )));
145 }
146
147 stats.duration = start.elapsed();
148
149 Ok(StepOutput::Agent(AgentOutput {
150 response,
151 session_id,
152 stats,
153 }))
154 }
155
156 async fn execute_in_sandbox(
158 &self,
159 prompt: &str,
160 command: &str,
161 args: &[String],
162 timeout: Duration,
163 sandbox: &SharedSandbox,
164 ) -> Result<StepOutput, StepError> {
165 let sb = sandbox.as_ref().ok_or_else(|| {
166 StepError::Fail("Sandbox reference is None but sandbox execution was requested".into())
167 })?;
168
169 let start = Instant::now();
170
171 let escaped_prompt = prompt.replace('\'', "'\\''");
173
174 let args_str = args.join(" ");
177 let sandbox_cmd = format!(
178 "echo '{}' | {} {}",
179 escaped_prompt, command, args_str
180 );
181
182 let sb_guard = sb.lock().await;
183 let sb_output = tokio::time::timeout(timeout, sb_guard.run_command(&sandbox_cmd))
184 .await
185 .map_err(|_| StepError::Timeout(timeout))?
186 .map_err(|e| StepError::Fail(format!("Sandbox agent execution failed: {e}")))?;
187
188 let mut response = String::new();
190 let mut session_id = None;
191 let mut stats = AgentStats::default();
192
193 for line in sb_output.stdout.lines() {
194 Self::parse_stream_json(line, &mut response, &mut session_id, &mut stats);
195 }
196
197 if sb_output.exit_code != 0 && response.is_empty() {
198 return Err(StepError::Fail(format!(
199 "Claude Code in sandbox exited with status {}: {}",
200 sb_output.exit_code,
201 sb_output.stderr.trim()
202 )));
203 }
204
205 stats.duration = start.elapsed();
206
207 Ok(StepOutput::Agent(AgentOutput {
208 response,
209 session_id,
210 stats,
211 }))
212 }
213}
214
215fn lookup_session_id(ctx: &Context, step_name: &str) -> Result<String, StepError> {
216 ctx.get_step(step_name)
217 .and_then(|out| {
218 if let StepOutput::Agent(a) = out {
219 a.session_id.clone()
220 } else {
221 None
222 }
223 })
224 .ok_or_else(|| StepError::Fail(format!("session not found for step '{}'", step_name)))
225}
226
227#[async_trait]
228impl StepExecutor for AgentExecutor {
229 async fn execute(
230 &self,
231 step: &StepDef,
232 config: &StepConfig,
233 ctx: &Context,
234 ) -> Result<StepOutput, StepError> {
235 self.execute_sandboxed(step, config, ctx, &None).await
236 }
237}
238
239#[async_trait]
240impl SandboxAwareExecutor for AgentExecutor {
241 async fn execute_sandboxed(
242 &self,
243 step: &StepDef,
244 config: &StepConfig,
245 ctx: &Context,
246 sandbox: &SharedSandbox,
247 ) -> Result<StepOutput, StepError> {
248 let prompt_template = step
249 .prompt
250 .as_ref()
251 .ok_or_else(|| StepError::Fail("agent step missing 'prompt' field".into()))?;
252
253 let prompt = ctx.render_template(prompt_template)?;
254 let command = config.get_str("command").unwrap_or("claude");
255 let timeout = config
256 .get_duration("timeout")
257 .unwrap_or(Duration::from_secs(600));
258 let args = Self::build_args(config, ctx)?;
259
260 if sandbox.is_some() {
261 self.execute_in_sandbox(&prompt, command, &args, timeout, sandbox).await
262 } else {
263 self.execute_on_host(&prompt, command, &args, timeout).await
264 }
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use super::*;
271 use crate::config::StepConfig;
272 use crate::engine::context::Context;
273 use crate::workflow::schema::StepType;
274 use std::collections::HashMap;
275
276 fn agent_step(prompt: &str) -> StepDef {
277 StepDef {
278 name: "test".to_string(),
279 step_type: StepType::Agent,
280 run: None,
281 prompt: Some(prompt.to_string()),
282 condition: None,
283 on_pass: None,
284 on_fail: None,
285 message: None,
286 scope: None,
287 max_iterations: None,
288 initial_value: None,
289 items: None,
290 parallel: None,
291 steps: None,
292 config: HashMap::new(),
293 outputs: None,
294 output_type: None,
295 async_exec: None,
296 }
297 }
298
299 #[tokio::test]
300 async fn agent_mock_claude() {
301 let mock_script = format!("{}/tests/fixtures/mock_claude.sh", env!("CARGO_MANIFEST_DIR"));
302
303 use std::os::unix::fs::PermissionsExt;
305 let mut perms = std::fs::metadata(&mock_script).unwrap().permissions();
306 perms.set_mode(0o755);
307 std::fs::set_permissions(&mock_script, perms).unwrap();
308
309 let step = agent_step("test prompt");
310 let mut values = HashMap::new();
311 values.insert(
312 "command".to_string(),
313 serde_json::Value::String(mock_script.clone()),
314 );
315 let config = StepConfig { values };
316 let ctx = Context::new(String::new(), HashMap::new());
317
318 let result = AgentExecutor.execute(&step, &config, &ctx).await.unwrap();
319 if let StepOutput::Agent(out) = result {
320 assert_eq!(out.response, "Task completed successfully");
321 assert_eq!(out.session_id.as_deref(), Some("mock-session-123"));
322 assert_eq!(out.stats.input_tokens, 10);
323 assert_eq!(out.stats.output_tokens, 20);
324 } else {
325 panic!("Expected Agent output");
326 }
327 }
328
329 #[tokio::test]
330 async fn resume_missing_step_returns_error() {
331 let step = agent_step("test prompt");
332 let mut values = HashMap::new();
333 values.insert("resume".to_string(), serde_json::Value::String("nonexistent".to_string()));
334 let config = StepConfig { values };
335 let ctx = Context::new(String::new(), HashMap::new());
336
337 let result = AgentExecutor.execute(&step, &config, &ctx).await;
338 assert!(result.is_err());
339 let err = result.unwrap_err().to_string();
340 assert!(
341 err.contains("session not found for step 'nonexistent'"),
342 "Unexpected error: {}",
343 err
344 );
345 }
346
347 #[test]
348 fn build_args_resume_adds_flag() {
349 use crate::steps::{AgentOutput, AgentStats, StepOutput};
350
351 let mut ctx = Context::new(String::new(), HashMap::new());
352 ctx.store(
353 "analyze",
354 StepOutput::Agent(AgentOutput {
355 response: "result".to_string(),
356 session_id: Some("sess-123".to_string()),
357 stats: AgentStats::default(),
358 }),
359 );
360
361 let mut values = HashMap::new();
362 values.insert("resume".to_string(), serde_json::Value::String("analyze".to_string()));
363 let config = StepConfig { values };
364
365 let args = AgentExecutor::build_args(&config, &ctx).unwrap();
366 let resume_idx = args.iter().position(|a| a == "--resume").expect("--resume not found");
367 assert_eq!(args[resume_idx + 1], "sess-123");
368 }
369
370 #[tokio::test]
371 async fn fork_session_missing_step_returns_error() {
372 let step = agent_step("test prompt");
373 let mut values = HashMap::new();
374 values.insert(
375 "fork_session".to_string(),
376 serde_json::Value::String("nonexistent".to_string()),
377 );
378 let config = StepConfig { values };
379 let ctx = Context::new(String::new(), HashMap::new());
380
381 let result = AgentExecutor.execute(&step, &config, &ctx).await;
382 assert!(result.is_err());
383 let err = result.unwrap_err().to_string();
384 assert!(
385 err.contains("session not found for step 'nonexistent'"),
386 "Unexpected error: {}",
387 err
388 );
389 }
390
391 #[test]
392 fn build_args_fork_session_adds_resume_flag() {
393 use crate::steps::{AgentOutput, AgentStats, StepOutput};
394
395 let mut ctx = Context::new(String::new(), HashMap::new());
396 ctx.store(
397 "analyze",
398 StepOutput::Agent(AgentOutput {
399 response: "result".to_string(),
400 session_id: Some("sess-fork-456".to_string()),
401 stats: AgentStats::default(),
402 }),
403 );
404
405 let mut values = HashMap::new();
406 values.insert(
407 "fork_session".to_string(),
408 serde_json::Value::String("analyze".to_string()),
409 );
410 let config = StepConfig { values };
411
412 let args = AgentExecutor::build_args(&config, &ctx).unwrap();
413 let resume_idx = args.iter().position(|a| a == "--resume").expect("--resume not found");
414 assert_eq!(args[resume_idx + 1], "sess-fork-456");
415 }
416
417 #[tokio::test]
418 async fn agent_sandbox_aware_no_sandbox_uses_host() {
419 let mock_script = format!("{}/tests/fixtures/mock_claude.sh", env!("CARGO_MANIFEST_DIR"));
420
421 use std::os::unix::fs::PermissionsExt;
422 let mut perms = std::fs::metadata(&mock_script).unwrap().permissions();
423 perms.set_mode(0o755);
424 std::fs::set_permissions(&mock_script, perms).unwrap();
425
426 let step = agent_step("test prompt");
427 let mut values = HashMap::new();
428 values.insert(
429 "command".to_string(),
430 serde_json::Value::String(mock_script),
431 );
432 let config = StepConfig { values };
433 let ctx = Context::new(String::new(), HashMap::new());
434
435 let result = AgentExecutor
437 .execute_sandboxed(&step, &config, &ctx, &None)
438 .await
439 .unwrap();
440 assert!(matches!(result, StepOutput::Agent(_)));
441 }
442}