Skip to main content

ralph_adapters/
cli_executor.rs

1//! CLI executor for running prompts through backends.
2//!
3//! Executes prompts via CLI tools with real-time streaming output.
4//! Supports optional execution timeout with graceful SIGTERM termination.
5
6#[cfg(test)]
7use crate::cli_backend::PromptMode;
8use crate::cli_backend::{CliBackend, OutputFormat};
9use crate::copilot_stream::CopilotStreamParser;
10#[cfg(unix)]
11use nix::sys::signal::{Signal, kill};
12#[cfg(unix)]
13use nix::unistd::Pid;
14use std::env;
15use std::io::Write;
16use std::process::Stdio;
17use std::time::Duration;
18use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWriteExt, BufReader};
19use tokio::process::Command;
20use tracing::{debug, warn};
21
22/// Result of a CLI execution.
23#[derive(Debug)]
24pub struct ExecutionResult {
25    /// The full output from the CLI.
26    pub output: String,
27    /// Whether the execution succeeded (exit code 0).
28    pub success: bool,
29    /// The exit code.
30    pub exit_code: Option<i32>,
31    /// Whether the execution was terminated due to timeout.
32    pub timed_out: bool,
33}
34
35/// Executor for running prompts through CLI backends.
36#[derive(Debug)]
37pub struct CliExecutor {
38    backend: CliBackend,
39}
40
41enum StreamEvent {
42    StdoutLine(String),
43    StderrLine(String),
44    StdoutEof,
45    StderrEof,
46}
47
48enum StreamKind {
49    Stdout,
50    Stderr,
51}
52
53impl CliExecutor {
54    /// Creates a new executor with the given backend.
55    pub fn new(backend: CliBackend) -> Self {
56        Self { backend }
57    }
58
59    /// Executes a prompt and streams output to the provided writer.
60    ///
61    /// Output is streamed line-by-line to the writer while being accumulated
62    /// for the return value. If `timeout` is provided and the execution produces
63    /// no stdout/stderr activity for longer than that duration, the process
64    /// receives SIGTERM and the result indicates timeout.
65    ///
66    /// When `verbose` is true, stderr output is also written to the output writer
67    /// with a `[stderr]` prefix. When false, stderr is captured but not displayed.
68    pub async fn execute<W: Write + Send>(
69        &self,
70        prompt: &str,
71        mut output_writer: W,
72        timeout: Option<Duration>,
73        verbose: bool,
74    ) -> std::io::Result<ExecutionResult> {
75        // Note: _temp_file is kept alive for the duration of this function scope.
76        // Some Arg-mode backends use temp-file indirection for very large prompts.
77        let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
78
79        let mut command = Command::new(&cmd);
80        command.args(&args);
81        command.stdout(Stdio::piped());
82        command.stderr(Stdio::piped());
83
84        // Set working directory to current directory (mirrors PTY executor behavior)
85        // Use fallback to "." if current_dir fails (e.g., E2E test workspaces)
86        let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
87        command.current_dir(&cwd);
88        inject_ralph_runtime_env(&mut command, &cwd);
89
90        // Apply backend-specific environment variables (e.g., Agent Teams env var)
91        command.envs(self.backend.env_vars.iter().map(|(k, v)| (k, v)));
92
93        debug!(
94            command = %cmd,
95            args = ?args,
96            cwd = ?cwd,
97            "Spawning CLI command"
98        );
99
100        if stdin_input.is_some() {
101            command.stdin(Stdio::piped());
102        }
103
104        let mut child = command.spawn()?;
105
106        // Write to stdin if needed
107        if let Some(input) = stdin_input
108            && let Some(mut stdin) = child.stdin.take()
109        {
110            stdin.write_all(input.as_bytes()).await?;
111            drop(stdin); // Close stdin to signal EOF
112        }
113
114        let mut timed_out = false;
115
116        // Take both stdout and stderr handles upfront to read concurrently.
117        // Each emitted line resets the inactivity timeout.
118        let stdout_handle = child.stdout.take();
119        let stderr_handle = child.stderr.take();
120        let (event_tx, mut event_rx) = tokio::sync::mpsc::channel(256);
121
122        let stdout_task = stdout_handle.map(|stdout| {
123            let tx = event_tx.clone();
124            tokio::spawn(async move { read_stream(stdout, tx, StreamKind::Stdout).await })
125        });
126        let stderr_task = stderr_handle.map(|stderr| {
127            let tx = event_tx.clone();
128            tokio::spawn(async move { read_stream(stderr, tx, StreamKind::Stderr).await })
129        });
130        drop(event_tx);
131
132        let mut stdout_done = stdout_task.is_none();
133        let mut stderr_done = stderr_task.is_none();
134        let mut accumulated_output = String::new();
135
136        if let Some(duration) = timeout {
137            debug!(
138                timeout_secs = duration.as_secs(),
139                "Executing with inactivity timeout"
140            );
141        }
142
143        while !stdout_done || !stderr_done {
144            let next_event = match timeout {
145                Some(duration) => match tokio::time::timeout(duration, event_rx.recv()).await {
146                    Ok(event) => event,
147                    Err(_) => {
148                        warn!(
149                            timeout_secs = duration.as_secs(),
150                            "Execution inactivity timeout reached, sending SIGTERM"
151                        );
152                        timed_out = true;
153                        Self::terminate_child(&mut child)?;
154                        break;
155                    }
156                },
157                None => event_rx.recv().await,
158            };
159
160            match next_event {
161                Some(StreamEvent::StdoutLine(line)) => {
162                    if self.backend.output_format == OutputFormat::CopilotStreamJson {
163                        if let Some(text) = CopilotStreamParser::extract_text(&line) {
164                            write!(output_writer, "{text}")?;
165                            if !text.ends_with('\n') {
166                                writeln!(output_writer)?;
167                            }
168                        }
169                    } else {
170                        writeln!(output_writer, "{line}")?;
171                    }
172                    output_writer.flush()?;
173                    accumulated_output.push_str(&line);
174                    accumulated_output.push('\n');
175                }
176                Some(StreamEvent::StderrLine(line)) => {
177                    if verbose {
178                        writeln!(output_writer, "[stderr] {line}")?;
179                        output_writer.flush()?;
180                    }
181                    accumulated_output.push_str("[stderr] ");
182                    accumulated_output.push_str(&line);
183                    accumulated_output.push('\n');
184                }
185                Some(StreamEvent::StdoutEof) => stdout_done = true,
186                Some(StreamEvent::StderrEof) => stderr_done = true,
187                None => {
188                    stdout_done = true;
189                    stderr_done = true;
190                }
191            }
192        }
193
194        let status = child.wait().await?;
195
196        if let Some(handle) = stdout_task {
197            handle.await.map_err(join_error_to_io)??;
198        }
199        if let Some(handle) = stderr_task {
200            handle.await.map_err(join_error_to_io)??;
201        }
202
203        Ok(ExecutionResult {
204            output: accumulated_output,
205            success: status.success() && !timed_out,
206            exit_code: status.code(),
207            timed_out,
208        })
209    }
210
211    /// Terminates the child process with SIGTERM.
212    fn terminate_child(child: &mut tokio::process::Child) -> std::io::Result<()> {
213        #[cfg(not(unix))]
214        {
215            // SIGTERM doesn't exist on Windows. Best-effort termination:
216            // On Unix this would be SIGKILL, on Windows it maps to process termination.
217            child.start_kill()
218        }
219
220        #[cfg(unix)]
221        if let Some(pid) = child.id() {
222            #[allow(clippy::cast_possible_wrap)]
223            let pid = Pid::from_raw(pid as i32);
224            debug!(%pid, "Sending SIGTERM to child process");
225            let _ = kill(pid, Signal::SIGTERM);
226            Ok(())
227        } else {
228            Ok(())
229        }
230    }
231
232    /// Executes a prompt without streaming (captures all output).
233    ///
234    /// Uses no timeout by default. For timed execution, use `execute_capture_with_timeout`.
235    pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
236        self.execute_capture_with_timeout(prompt, None).await
237    }
238
239    /// Executes a prompt without streaming, with optional timeout.
240    pub async fn execute_capture_with_timeout(
241        &self,
242        prompt: &str,
243        timeout: Option<Duration>,
244    ) -> std::io::Result<ExecutionResult> {
245        // Use a sink that discards output for non-streaming execution
246        // verbose=false since output is being discarded anyway
247        let sink = std::io::sink();
248        self.execute(prompt, sink, timeout, false).await
249    }
250}
251
252async fn read_stream<R>(
253    stream: R,
254    tx: tokio::sync::mpsc::Sender<StreamEvent>,
255    stream_kind: StreamKind,
256) -> std::io::Result<()>
257where
258    R: AsyncRead + Unpin,
259{
260    let reader = BufReader::new(stream);
261    let mut lines = reader.lines();
262    while let Some(line) = lines.next_line().await? {
263        let event = match stream_kind {
264            StreamKind::Stdout => StreamEvent::StdoutLine(line),
265            StreamKind::Stderr => StreamEvent::StderrLine(line),
266        };
267        if tx.send(event).await.is_err() {
268            return Ok(());
269        }
270    }
271
272    let eof_event = match stream_kind {
273        StreamKind::Stdout => StreamEvent::StdoutEof,
274        StreamKind::Stderr => StreamEvent::StderrEof,
275    };
276    let _ = tx.send(eof_event).await;
277    Ok(())
278}
279
280fn join_error_to_io(error: tokio::task::JoinError) -> std::io::Error {
281    std::io::Error::other(error.to_string())
282}
283
284fn inject_ralph_runtime_env(command: &mut Command, workspace_root: &std::path::Path) {
285    let Ok(current_exe) = env::current_exe() else {
286        return;
287    };
288    let Some(bin_dir) = current_exe.parent() else {
289        return;
290    };
291
292    let mut path_entries = vec![bin_dir.to_path_buf()];
293    if let Some(existing_path) = env::var_os("PATH") {
294        path_entries.extend(env::split_paths(&existing_path));
295    }
296
297    if let Ok(joined_path) = env::join_paths(path_entries) {
298        command.env("PATH", joined_path);
299    }
300    command.env("RALPH_BIN", &current_exe);
301    command.env("RALPH_WORKSPACE_ROOT", workspace_root);
302    if std::path::Path::new("/var/tmp").is_dir() {
303        command.env("TMPDIR", "/var/tmp");
304        command.env("TMP", "/var/tmp");
305        command.env("TEMP", "/var/tmp");
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[tokio::test]
314    async fn test_execute_echo() {
315        // Use echo as a simple test backend
316        let backend = CliBackend {
317            command: "echo".to_string(),
318            args: vec![],
319            prompt_mode: PromptMode::Arg,
320            prompt_flag: None,
321            output_format: OutputFormat::Text,
322            env_vars: vec![],
323        };
324
325        let executor = CliExecutor::new(backend);
326        let mut output = Vec::new();
327
328        let result = executor
329            .execute("hello world", &mut output, None, true)
330            .await
331            .unwrap();
332
333        assert!(result.success);
334        assert!(!result.timed_out);
335        assert!(result.output.contains("hello world"));
336    }
337
338    #[tokio::test]
339    async fn test_execute_stdin() {
340        // Use cat to test stdin mode
341        let backend = CliBackend {
342            command: "cat".to_string(),
343            args: vec![],
344            prompt_mode: PromptMode::Stdin,
345            prompt_flag: None,
346            output_format: OutputFormat::Text,
347            env_vars: vec![],
348        };
349
350        let executor = CliExecutor::new(backend);
351        let result = executor.execute_capture("stdin test").await.unwrap();
352
353        assert!(result.success);
354        assert!(result.output.contains("stdin test"));
355    }
356
357    #[tokio::test]
358    async fn test_execute_failure() {
359        let backend = CliBackend {
360            command: "false".to_string(), // Always exits with code 1
361            args: vec![],
362            prompt_mode: PromptMode::Arg,
363            prompt_flag: None,
364            output_format: OutputFormat::Text,
365            env_vars: vec![],
366        };
367
368        let executor = CliExecutor::new(backend);
369        let result = executor.execute_capture("").await.unwrap();
370
371        assert!(!result.success);
372        assert!(!result.timed_out);
373        assert_eq!(result.exit_code, Some(1));
374    }
375
376    #[tokio::test]
377    async fn test_execute_timeout() {
378        // Use sleep to test timeout behavior
379        // The sleep command ignores stdin, so we use PromptMode::Stdin
380        // to avoid appending the prompt as an argument
381        let backend = CliBackend {
382            command: "sleep".to_string(),
383            args: vec!["10".to_string()],   // Sleep for 10 seconds
384            prompt_mode: PromptMode::Stdin, // Use stdin mode so prompt doesn't interfere
385            prompt_flag: None,
386            output_format: OutputFormat::Text,
387            env_vars: vec![],
388        };
389
390        let executor = CliExecutor::new(backend);
391
392        // Execute with a 100ms timeout - should trigger timeout
393        let timeout = Some(Duration::from_millis(100));
394        let result = executor
395            .execute_capture_with_timeout("", timeout)
396            .await
397            .unwrap();
398
399        assert!(result.timed_out, "Expected execution to time out");
400        assert!(
401            !result.success,
402            "Timed out execution should not be successful"
403        );
404    }
405
406    #[tokio::test]
407    async fn test_execute_timeout_resets_on_output_activity() {
408        let backend = CliBackend {
409            command: "sh".to_string(),
410            args: vec!["-c".to_string()],
411            prompt_mode: PromptMode::Arg,
412            prompt_flag: None,
413            output_format: OutputFormat::Text,
414            env_vars: vec![],
415        };
416
417        let executor = CliExecutor::new(backend);
418        let timeout = Some(Duration::from_millis(300));
419        let result = executor
420            .execute_capture_with_timeout(
421                "printf 'start\\n'; sleep 0.2; printf 'middle\\n'; sleep 0.2; printf 'done\\n'",
422                timeout,
423            )
424            .await
425            .unwrap();
426
427        assert!(
428            !result.timed_out,
429            "Periodic output should reset the inactivity timeout"
430        );
431        assert!(result.success, "Periodic-output command should succeed");
432        assert!(result.output.contains("start"));
433        assert!(result.output.contains("middle"));
434        assert!(result.output.contains("done"));
435    }
436
437    #[tokio::test]
438    async fn test_execute_streams_output_before_inactivity_timeout() {
439        let backend = CliBackend {
440            command: "sh".to_string(),
441            args: vec!["-c".to_string(), "printf 'hello\\n'; sleep 10".to_string()],
442            prompt_mode: PromptMode::Stdin,
443            prompt_flag: None,
444            output_format: OutputFormat::Text,
445            env_vars: vec![],
446        };
447
448        let executor = CliExecutor::new(backend);
449        let mut output = Vec::new();
450        let result = executor
451            .execute("", &mut output, Some(Duration::from_millis(200)), false)
452            .await
453            .unwrap();
454
455        assert!(
456            result.timed_out,
457            "Expected inactivity timeout after output stops"
458        );
459        assert_eq!(String::from_utf8(output).unwrap(), "hello\n");
460        assert!(result.output.contains("hello"));
461    }
462
463    #[tokio::test]
464    async fn test_execute_no_timeout_when_fast() {
465        // Use echo which completes immediately
466        let backend = CliBackend {
467            command: "echo".to_string(),
468            args: vec![],
469            prompt_mode: PromptMode::Arg,
470            prompt_flag: None,
471            output_format: OutputFormat::Text,
472            env_vars: vec![],
473        };
474
475        let executor = CliExecutor::new(backend);
476
477        // Execute with a generous timeout - should complete before timeout
478        let timeout = Some(Duration::from_secs(10));
479        let result = executor
480            .execute_capture_with_timeout("fast", timeout)
481            .await
482            .unwrap();
483
484        assert!(!result.timed_out, "Fast command should not time out");
485        assert!(result.success);
486        assert!(result.output.contains("fast"));
487    }
488
489    #[tokio::test]
490    async fn test_execute_copilot_stream_writes_extracted_text() {
491        let backend = CliBackend {
492            command: "printf".to_string(),
493            args: vec![
494                "%s\n%s\n".to_string(),
495                r#"{"type":"assistant.turn_start","data":{"turnId":"0"}}"#.to_string(),
496                r#"{"type":"assistant.message","data":{"content":"hello from copilot"}}"#
497                    .to_string(),
498            ],
499            prompt_mode: PromptMode::Stdin,
500            prompt_flag: None,
501            output_format: OutputFormat::CopilotStreamJson,
502            env_vars: vec![],
503        };
504
505        let executor = CliExecutor::new(backend);
506        let mut output = Vec::new();
507
508        let result = executor
509            .execute("ignored", &mut output, None, false)
510            .await
511            .unwrap();
512
513        assert!(result.success);
514        assert!(result.output.contains("\"assistant.message\""));
515        assert_eq!(String::from_utf8(output).unwrap(), "hello from copilot\n");
516    }
517}