Skip to main content

ralph_adapters/
cli_executor.rs

1//! CLI executor for running prompts through backends.
2//!
3//! Executes prompts via CLI tools with real-time streaming output.
4//! Supports optional execution timeout with graceful SIGTERM termination.
5
6use crate::cli_backend::CliBackend;
7#[cfg(test)]
8use crate::cli_backend::{OutputFormat, PromptMode};
9#[cfg(unix)]
10use nix::sys::signal::{Signal, kill};
11#[cfg(unix)]
12use nix::unistd::Pid;
13use std::io::Write;
14use std::process::Stdio;
15use std::time::Duration;
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::process::Command;
18use tracing::{debug, warn};
19
20/// Result of a CLI execution.
21#[derive(Debug)]
22pub struct ExecutionResult {
23    /// The full output from the CLI.
24    pub output: String,
25    /// Whether the execution succeeded (exit code 0).
26    pub success: bool,
27    /// The exit code.
28    pub exit_code: Option<i32>,
29    /// Whether the execution was terminated due to timeout.
30    pub timed_out: bool,
31}
32
33/// Executor for running prompts through CLI backends.
34#[derive(Debug)]
35pub struct CliExecutor {
36    backend: CliBackend,
37}
38
39impl CliExecutor {
40    /// Creates a new executor with the given backend.
41    pub fn new(backend: CliBackend) -> Self {
42        Self { backend }
43    }
44
45    /// Executes a prompt and streams output to the provided writer.
46    ///
47    /// Output is streamed line-by-line to the writer while being accumulated
48    /// for the return value. If `timeout` is provided and the execution exceeds
49    /// it, the process receives SIGTERM and the result indicates timeout.
50    ///
51    /// When `verbose` is true, stderr output is also written to the output writer
52    /// with a `[stderr]` prefix. When false, stderr is captured but not displayed.
53    pub async fn execute<W: Write + Send>(
54        &self,
55        prompt: &str,
56        mut output_writer: W,
57        timeout: Option<Duration>,
58        verbose: bool,
59    ) -> std::io::Result<ExecutionResult> {
60        // Note: _temp_file is kept alive for the duration of this function scope.
61        // For large prompts (>7000 chars), Claude reads from the temp file.
62        let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
63
64        let mut command = Command::new(&cmd);
65        command.args(&args);
66        command.stdout(Stdio::piped());
67        command.stderr(Stdio::piped());
68
69        // Set working directory to current directory (mirrors PTY executor behavior)
70        // Use fallback to "." if current_dir fails (e.g., E2E test workspaces)
71        let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
72        command.current_dir(&cwd);
73
74        debug!(
75            command = %cmd,
76            args = ?args,
77            cwd = ?cwd,
78            "Spawning CLI command"
79        );
80
81        if stdin_input.is_some() {
82            command.stdin(Stdio::piped());
83        }
84
85        let mut child = command.spawn()?;
86
87        // Write to stdin if needed
88        if let Some(input) = stdin_input
89            && let Some(mut stdin) = child.stdin.take()
90        {
91            stdin.write_all(input.as_bytes()).await?;
92            drop(stdin); // Close stdin to signal EOF
93        }
94
95        let mut timed_out = false;
96
97        // Take both stdout and stderr handles upfront to read concurrently
98        // This prevents deadlock when stderr fills its buffer before stdout produces output
99        let stdout_handle = child.stdout.take();
100        let stderr_handle = child.stderr.take();
101
102        // Wrap the streaming in a timeout if configured
103        // Read stdout and stderr CONCURRENTLY to avoid pipe buffer deadlock
104        let stream_result = async {
105            // Create futures for reading both streams
106            let stdout_future = async {
107                let mut lines_out = Vec::new();
108                if let Some(stdout) = stdout_handle {
109                    let reader = BufReader::new(stdout);
110                    let mut lines = reader.lines();
111                    while let Some(line) = lines.next_line().await? {
112                        lines_out.push(line);
113                    }
114                }
115                Ok::<_, std::io::Error>(lines_out)
116            };
117
118            let stderr_future = async {
119                let mut lines_out = Vec::new();
120                if let Some(stderr) = stderr_handle {
121                    let reader = BufReader::new(stderr);
122                    let mut lines = reader.lines();
123                    while let Some(line) = lines.next_line().await? {
124                        lines_out.push(line);
125                    }
126                }
127                Ok::<_, std::io::Error>(lines_out)
128            };
129
130            // Read both streams concurrently to prevent deadlock
131            let (stdout_lines, stderr_lines) = tokio::try_join!(stdout_future, stderr_future)?;
132
133            // Write stdout lines first (main output)
134            for line in &stdout_lines {
135                writeln!(output_writer, "{line}")?;
136            }
137
138            // Write stderr lines (prefixed) only in verbose mode
139            if verbose {
140                for line in &stderr_lines {
141                    writeln!(output_writer, "[stderr] {line}")?;
142                }
143            }
144
145            output_writer.flush()?;
146
147            // Build accumulated output (stdout first, then stderr)
148            let mut accumulated = String::new();
149            for line in stdout_lines {
150                accumulated.push_str(&line);
151                accumulated.push('\n');
152            }
153            for line in stderr_lines {
154                accumulated.push_str("[stderr] ");
155                accumulated.push_str(&line);
156                accumulated.push('\n');
157            }
158
159            Ok::<_, std::io::Error>(accumulated)
160        };
161
162        let accumulated_output = match timeout {
163            Some(duration) => {
164                debug!(timeout_secs = duration.as_secs(), "Executing with timeout");
165                match tokio::time::timeout(duration, stream_result).await {
166                    Ok(result) => result?,
167                    Err(_) => {
168                        // Timeout elapsed - send SIGTERM to the child process
169                        warn!(
170                            timeout_secs = duration.as_secs(),
171                            "Execution timeout reached, sending SIGTERM"
172                        );
173                        timed_out = true;
174                        Self::terminate_child(&mut child)?;
175                        String::new() // Return empty output on timeout
176                    }
177                }
178            }
179            None => stream_result.await?,
180        };
181
182        let status = child.wait().await?;
183
184        Ok(ExecutionResult {
185            output: accumulated_output,
186            success: status.success() && !timed_out,
187            exit_code: status.code(),
188            timed_out,
189        })
190    }
191
192    /// Terminates the child process with SIGTERM.
193    fn terminate_child(child: &mut tokio::process::Child) -> std::io::Result<()> {
194        #[cfg(not(unix))]
195        {
196            // SIGTERM doesn't exist on Windows. Best-effort termination:
197            // On Unix this would be SIGKILL, on Windows it maps to process termination.
198            child.start_kill()
199        }
200
201        #[cfg(unix)]
202        if let Some(pid) = child.id() {
203            #[allow(clippy::cast_possible_wrap)]
204            let pid = Pid::from_raw(pid as i32);
205            debug!(%pid, "Sending SIGTERM to child process");
206            let _ = kill(pid, Signal::SIGTERM);
207            Ok(())
208        } else {
209            Ok(())
210        }
211    }
212
213    /// Executes a prompt without streaming (captures all output).
214    ///
215    /// Uses no timeout by default. For timed execution, use `execute_capture_with_timeout`.
216    pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
217        self.execute_capture_with_timeout(prompt, None).await
218    }
219
220    /// Executes a prompt without streaming, with optional timeout.
221    pub async fn execute_capture_with_timeout(
222        &self,
223        prompt: &str,
224        timeout: Option<Duration>,
225    ) -> std::io::Result<ExecutionResult> {
226        // Use a sink that discards output for non-streaming execution
227        // verbose=false since output is being discarded anyway
228        let sink = std::io::sink();
229        self.execute(prompt, sink, timeout, false).await
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[tokio::test]
238    async fn test_execute_echo() {
239        // Use echo as a simple test backend
240        let backend = CliBackend {
241            command: "echo".to_string(),
242            args: vec![],
243            prompt_mode: PromptMode::Arg,
244            prompt_flag: None,
245            output_format: OutputFormat::Text,
246        };
247
248        let executor = CliExecutor::new(backend);
249        let mut output = Vec::new();
250
251        let result = executor
252            .execute("hello world", &mut output, None, true)
253            .await
254            .unwrap();
255
256        assert!(result.success);
257        assert!(!result.timed_out);
258        assert!(result.output.contains("hello world"));
259    }
260
261    #[tokio::test]
262    async fn test_execute_stdin() {
263        // Use cat to test stdin mode
264        let backend = CliBackend {
265            command: "cat".to_string(),
266            args: vec![],
267            prompt_mode: PromptMode::Stdin,
268            prompt_flag: None,
269            output_format: OutputFormat::Text,
270        };
271
272        let executor = CliExecutor::new(backend);
273        let result = executor.execute_capture("stdin test").await.unwrap();
274
275        assert!(result.success);
276        assert!(result.output.contains("stdin test"));
277    }
278
279    #[tokio::test]
280    async fn test_execute_failure() {
281        let backend = CliBackend {
282            command: "false".to_string(), // Always exits with code 1
283            args: vec![],
284            prompt_mode: PromptMode::Arg,
285            prompt_flag: None,
286            output_format: OutputFormat::Text,
287        };
288
289        let executor = CliExecutor::new(backend);
290        let result = executor.execute_capture("").await.unwrap();
291
292        assert!(!result.success);
293        assert!(!result.timed_out);
294        assert_eq!(result.exit_code, Some(1));
295    }
296
297    #[tokio::test]
298    async fn test_execute_timeout() {
299        // Use sleep to test timeout behavior
300        // The sleep command ignores stdin, so we use PromptMode::Stdin
301        // to avoid appending the prompt as an argument
302        let backend = CliBackend {
303            command: "sleep".to_string(),
304            args: vec!["10".to_string()],   // Sleep for 10 seconds
305            prompt_mode: PromptMode::Stdin, // Use stdin mode so prompt doesn't interfere
306            prompt_flag: None,
307            output_format: OutputFormat::Text,
308        };
309
310        let executor = CliExecutor::new(backend);
311
312        // Execute with a 100ms timeout - should trigger timeout
313        let timeout = Some(Duration::from_millis(100));
314        let result = executor
315            .execute_capture_with_timeout("", timeout)
316            .await
317            .unwrap();
318
319        assert!(result.timed_out, "Expected execution to time out");
320        assert!(
321            !result.success,
322            "Timed out execution should not be successful"
323        );
324    }
325
326    #[tokio::test]
327    async fn test_execute_no_timeout_when_fast() {
328        // Use echo which completes immediately
329        let backend = CliBackend {
330            command: "echo".to_string(),
331            args: vec![],
332            prompt_mode: PromptMode::Arg,
333            prompt_flag: None,
334            output_format: OutputFormat::Text,
335        };
336
337        let executor = CliExecutor::new(backend);
338
339        // Execute with a generous timeout - should complete before timeout
340        let timeout = Some(Duration::from_secs(10));
341        let result = executor
342            .execute_capture_with_timeout("fast", timeout)
343            .await
344            .unwrap();
345
346        assert!(!result.timed_out, "Fast command should not time out");
347        assert!(result.success);
348        assert!(result.output.contains("fast"));
349    }
350}