1#[cfg(test)]
7use crate::cli_backend::PromptMode;
8use crate::cli_backend::{CliBackend, OutputFormat};
9use crate::copilot_stream::CopilotStreamParser;
10#[cfg(unix)]
11use nix::sys::signal::{Signal, kill};
12#[cfg(unix)]
13use nix::unistd::Pid;
14use std::env;
15use std::io::Write;
16use std::process::Stdio;
17use std::time::Duration;
18use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
19use tokio::process::Command;
20use tracing::{debug, warn};
21
22#[derive(Debug)]
24pub struct ExecutionResult {
25 pub output: String,
27 pub success: bool,
29 pub exit_code: Option<i32>,
31 pub timed_out: bool,
33}
34
35#[derive(Debug)]
37pub struct CliExecutor {
38 backend: CliBackend,
39}
40
41enum StreamEvent {
42 StdoutLine(String),
43 StderrLine(String),
44 StdoutEof,
45 StderrEof,
46}
47
48enum StreamKind {
49 Stdout,
50 Stderr,
51}
52
53impl CliExecutor {
54 pub fn new(backend: CliBackend) -> Self {
56 Self { backend }
57 }
58
59 pub async fn execute<W: Write + Send>(
69 &self,
70 prompt: &str,
71 mut output_writer: W,
72 timeout: Option<Duration>,
73 verbose: bool,
74 ) -> std::io::Result<ExecutionResult> {
75 let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
78
79 let mut command = Command::new(&cmd);
80 command.args(&args);
81 command.stdout(Stdio::piped());
82 command.stderr(Stdio::piped());
83
84 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
87 command.current_dir(&cwd);
88 inject_ralph_runtime_env(&mut command, &cwd);
89
90 command.envs(self.backend.env_vars.iter().map(|(k, v)| (k, v)));
92
93 debug!(
94 command = %cmd,
95 args = ?args,
96 cwd = ?cwd,
97 "Spawning CLI command"
98 );
99
100 if stdin_input.is_some() {
101 command.stdin(Stdio::piped());
102 }
103
104 let mut child = command.spawn()?;
105
106 if let Some(input) = stdin_input
108 && let Some(mut stdin) = child.stdin.take()
109 {
110 stdin.write_all(input.as_bytes()).await?;
111 drop(stdin); }
113
114 let mut timed_out = false;
115
116 let stdout_handle = child.stdout.take();
119 let stderr_handle = child.stderr.take();
120 let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(256);
121
122 let stdout_task = stdout_handle.map(|stdout| {
123 let tx = event_tx.clone();
124 tokio::spawn(async move { read_stream(stdout, tx, StreamKind::Stdout).await })
125 });
126 let stderr_task = stderr_handle.map(|stderr| {
127 let tx = event_tx.clone();
128 tokio::spawn(async move { read_stream(stderr, tx, StreamKind::Stderr).await })
129 });
130 drop(event_tx);
131
132 let mut stdout_done = stdout_task.is_none();
133 let mut stderr_done = stderr_task.is_none();
134 let mut accumulated_output = String::new();
135
136 if let Some(duration) = timeout {
137 debug!(
138 timeout_secs = duration.as_secs(),
139 "Executing with inactivity timeout"
140 );
141 }
142
143 while !stdout_done || !stderr_done {
144 let next_event = match timeout {
145 Some(duration) => match tokio::time::timeout(duration, event_rx.recv()).await {
146 Ok(event) => event,
147 Err(_) => {
148 warn!(
149 timeout_secs = duration.as_secs(),
150 "Execution inactivity timeout reached, sending SIGTERM"
151 );
152 timed_out = true;
153 Self::terminate_child(&mut child)?;
154 break;
155 }
156 },
157 None => event_rx.recv().await,
158 };
159
160 match next_event {
161 Some(StreamEvent::StdoutLine(line)) => {
162 if self.backend.output_format == OutputFormat::CopilotStreamJson {
163 if let Some(text) = CopilotStreamParser::extract_text(&line) {
164 write!(output_writer, "{text}")?;
165 if !text.ends_with('\n') {
166 writeln!(output_writer)?;
167 }
168 }
169 } else {
170 writeln!(output_writer, "{line}")?;
171 }
172 output_writer.flush()?;
173 accumulated_output.push_str(&line);
174 accumulated_output.push('\n');
175 }
176 Some(StreamEvent::StderrLine(line)) => {
177 if verbose {
178 writeln!(output_writer, "[stderr] {line}")?;
179 output_writer.flush()?;
180 }
181 accumulated_output.push_str("[stderr] ");
182 accumulated_output.push_str(&line);
183 accumulated_output.push('\n');
184 }
185 Some(StreamEvent::StdoutEof) => stdout_done = true,
186 Some(StreamEvent::StderrEof) => stderr_done = true,
187 None => {
188 stdout_done = true;
189 stderr_done = true;
190 }
191 }
192 }
193
194 let status = child.wait().await?;
195
196 if let Some(handle) = stdout_task {
197 handle.await.map_err(join_error_to_io)??;
198 }
199 if let Some(handle) = stderr_task {
200 handle.await.map_err(join_error_to_io)??;
201 }
202
203 Ok(ExecutionResult {
204 output: accumulated_output,
205 success: status.success() && !timed_out,
206 exit_code: status.code(),
207 timed_out,
208 })
209 }
210
211 fn terminate_child(child: &mut tokio::process::Child) -> std::io::Result<()> {
213 #[cfg(not(unix))]
214 {
215 child.start_kill()
218 }
219
220 #[cfg(unix)]
221 if let Some(pid) = child.id() {
222 #[allow(clippy::cast_possible_wrap)]
223 let pid = Pid::from_raw(pid as i32);
224 debug!(%pid, "Sending SIGTERM to child process");
225 let _ = kill(pid, Signal::SIGTERM);
226 Ok(())
227 } else {
228 Ok(())
229 }
230 }
231
232 pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
236 self.execute_capture_with_timeout(prompt, None).await
237 }
238
239 pub async fn execute_capture_with_timeout(
241 &self,
242 prompt: &str,
243 timeout: Option<Duration>,
244 ) -> std::io::Result<ExecutionResult> {
245 let sink = std::io::sink();
248 self.execute(prompt, sink, timeout, false).await
249 }
250}
251
252async fn read_stream<R>(
253 stream: R,
254 tx: tokio::sync::mpsc::Sender<StreamEvent>,
255 stream_kind: StreamKind,
256) -> std::io::Result<()>
257where
258 R: AsyncRead + Unpin,
259{
260 let reader = BufReader::new(stream);
261 let mut lines = reader.lines();
262 while let Some(line) = lines.next_line().await? {
263 let event = match stream_kind {
264 StreamKind::Stdout => StreamEvent::StdoutLine(line),
265 StreamKind::Stderr => StreamEvent::StderrLine(line),
266 };
267 if tx.send(event).await.is_err() {
268 return Ok(());
269 }
270 }
271
272 let eof_event = match stream_kind {
273 StreamKind::Stdout => StreamEvent::StdoutEof,
274 StreamKind::Stderr => StreamEvent::StderrEof,
275 };
276 let _ = tx.send(eof_event).await;
277 Ok(())
278}
279
280fn join_error_to_io(error: tokio::task::JoinError) -> std::io::Error {
281 std::io::Error::other(error.to_string())
282}
283
284fn inject_ralph_runtime_env(command: &mut Command, workspace_root: &std::path::Path) {
285 let Ok(current_exe) = env::current_exe() else {
286 return;
287 };
288 let Some(bin_dir) = current_exe.parent() else {
289 return;
290 };
291
292 let mut path_entries = vec![bin_dir.to_path_buf()];
293 if let Some(existing_path) = env::var_os("PATH") {
294 path_entries.extend(env::split_paths(&existing_path));
295 }
296
297 if let Ok(joined_path) = env::join_paths(path_entries) {
298 command.env("PATH", joined_path);
299 }
300 command.env("RALPH_BIN", ¤t_exe);
301 command.env("RALPH_WORKSPACE_ROOT", workspace_root);
302 if std::path::Path::new("/var/tmp").is_dir() {
303 command.env("TMPDIR", "/var/tmp");
304 command.env("TMP", "/var/tmp");
305 command.env("TEMP", "/var/tmp");
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[tokio::test]
314 async fn test_execute_echo() {
315 let backend = CliBackend {
317 command: "echo".to_string(),
318 args: vec![],
319 prompt_mode: PromptMode::Arg,
320 prompt_flag: None,
321 output_format: OutputFormat::Text,
322 env_vars: vec![],
323 };
324
325 let executor = CliExecutor::new(backend);
326 let mut output = Vec::new();
327
328 let result = executor
329 .execute("hello world", &mut output, None, true)
330 .await
331 .unwrap();
332
333 assert!(result.success);
334 assert!(!result.timed_out);
335 assert!(result.output.contains("hello world"));
336 }
337
338 #[tokio::test]
339 async fn test_execute_stdin() {
340 let backend = CliBackend {
342 command: "cat".to_string(),
343 args: vec![],
344 prompt_mode: PromptMode::Stdin,
345 prompt_flag: None,
346 output_format: OutputFormat::Text,
347 env_vars: vec![],
348 };
349
350 let executor = CliExecutor::new(backend);
351 let result = executor.execute_capture("stdin test").await.unwrap();
352
353 assert!(result.success);
354 assert!(result.output.contains("stdin test"));
355 }
356
357 #[tokio::test]
358 async fn test_execute_failure() {
359 let backend = CliBackend {
360 command: "false".to_string(), args: vec![],
362 prompt_mode: PromptMode::Arg,
363 prompt_flag: None,
364 output_format: OutputFormat::Text,
365 env_vars: vec![],
366 };
367
368 let executor = CliExecutor::new(backend);
369 let result = executor.execute_capture("").await.unwrap();
370
371 assert!(!result.success);
372 assert!(!result.timed_out);
373 assert_eq!(result.exit_code, Some(1));
374 }
375
376 #[tokio::test]
377 async fn test_execute_timeout() {
378 let backend = CliBackend {
382 command: "sleep".to_string(),
383 args: vec!["10".to_string()], prompt_mode: PromptMode::Stdin, prompt_flag: None,
386 output_format: OutputFormat::Text,
387 env_vars: vec![],
388 };
389
390 let executor = CliExecutor::new(backend);
391
392 let timeout = Some(Duration::from_millis(100));
394 let result = executor
395 .execute_capture_with_timeout("", timeout)
396 .await
397 .unwrap();
398
399 assert!(result.timed_out, "Expected execution to time out");
400 assert!(
401 !result.success,
402 "Timed out execution should not be successful"
403 );
404 }
405
406 #[tokio::test]
407 async fn test_execute_timeout_resets_on_output_activity() {
408 let backend = CliBackend {
409 command: "sh".to_string(),
410 args: vec!["-c".to_string()],
411 prompt_mode: PromptMode::Arg,
412 prompt_flag: None,
413 output_format: OutputFormat::Text,
414 env_vars: vec![],
415 };
416
417 let executor = CliExecutor::new(backend);
418 let timeout = Some(Duration::from_millis(300));
419 let result = executor
420 .execute_capture_with_timeout(
421 "printf 'start\\n'; sleep 0.2; printf 'middle\\n'; sleep 0.2; printf 'done\\n'",
422 timeout,
423 )
424 .await
425 .unwrap();
426
427 assert!(
428 !result.timed_out,
429 "Periodic output should reset the inactivity timeout"
430 );
431 assert!(result.success, "Periodic-output command should succeed");
432 assert!(result.output.contains("start"));
433 assert!(result.output.contains("middle"));
434 assert!(result.output.contains("done"));
435 }
436
437 #[tokio::test]
438 async fn test_execute_streams_output_before_inactivity_timeout() {
439 let backend = CliBackend {
440 command: "sh".to_string(),
441 args: vec!["-c".to_string(), "printf 'hello\\n'; sleep 10".to_string()],
442 prompt_mode: PromptMode::Stdin,
443 prompt_flag: None,
444 output_format: OutputFormat::Text,
445 env_vars: vec![],
446 };
447
448 let executor = CliExecutor::new(backend);
449 let mut output = Vec::new();
450 let result = executor
451 .execute("", &mut output, Some(Duration::from_millis(200)), false)
452 .await
453 .unwrap();
454
455 assert!(
456 result.timed_out,
457 "Expected inactivity timeout after output stops"
458 );
459 assert_eq!(String::from_utf8(output).unwrap(), "hello\n");
460 assert!(result.output.contains("hello"));
461 }
462
463 #[tokio::test]
464 async fn test_execute_no_timeout_when_fast() {
465 let backend = CliBackend {
467 command: "echo".to_string(),
468 args: vec![],
469 prompt_mode: PromptMode::Arg,
470 prompt_flag: None,
471 output_format: OutputFormat::Text,
472 env_vars: vec![],
473 };
474
475 let executor = CliExecutor::new(backend);
476
477 let timeout = Some(Duration::from_secs(10));
479 let result = executor
480 .execute_capture_with_timeout("fast", timeout)
481 .await
482 .unwrap();
483
484 assert!(!result.timed_out, "Fast command should not time out");
485 assert!(result.success);
486 assert!(result.output.contains("fast"));
487 }
488
489 #[tokio::test]
490 async fn test_execute_copilot_stream_writes_extracted_text() {
491 let backend = CliBackend {
492 command: "printf".to_string(),
493 args: vec![
494 "%s\n%s\n".to_string(),
495 r#"{"type":"assistant.turn_start","data":{"turnId":"0"}}"#.to_string(),
496 r#"{"type":"assistant.message","data":{"content":"hello from copilot"}}"#
497 .to_string(),
498 ],
499 prompt_mode: PromptMode::Stdin,
500 prompt_flag: None,
501 output_format: OutputFormat::CopilotStreamJson,
502 env_vars: vec![],
503 };
504
505 let executor = CliExecutor::new(backend);
506 let mut output = Vec::new();
507
508 let result = executor
509 .execute("ignored", &mut output, None, false)
510 .await
511 .unwrap();
512
513 assert!(result.success);
514 assert!(result.output.contains("\"assistant.message\""));
515 assert_eq!(String::from_utf8(output).unwrap(), "hello from copilot\n");
516 }
517}