ralph_adapters/
cli_executor.rs

1//! CLI executor for running prompts through backends.
2//!
3//! Executes prompts via CLI tools with real-time streaming output.
4//! Supports optional execution timeout with graceful SIGTERM termination.
5
6use crate::cli_backend::CliBackend;
7#[cfg(test)]
8use crate::cli_backend::{OutputFormat, PromptMode};
9use nix::sys::signal::{Signal, kill};
10use nix::unistd::Pid;
11use std::io::Write;
12use std::process::Stdio;
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::process::Command;
16use tracing::{debug, warn};
17
18/// Result of a CLI execution.
19#[derive(Debug)]
20pub struct ExecutionResult {
21    /// The full output from the CLI.
22    pub output: String,
23    /// Whether the execution succeeded (exit code 0).
24    pub success: bool,
25    /// The exit code.
26    pub exit_code: Option<i32>,
27    /// Whether the execution was terminated due to timeout.
28    pub timed_out: bool,
29}
30
31/// Executor for running prompts through CLI backends.
32#[derive(Debug)]
33pub struct CliExecutor {
34    backend: CliBackend,
35}
36
37impl CliExecutor {
38    /// Creates a new executor with the given backend.
39    pub fn new(backend: CliBackend) -> Self {
40        Self { backend }
41    }
42
43    /// Executes a prompt and streams output to the provided writer.
44    ///
45    /// Output is streamed line-by-line to the writer while being accumulated
46    /// for the return value. If `timeout` is provided and the execution exceeds
47    /// it, the process receives SIGTERM and the result indicates timeout.
48    ///
49    /// When `verbose` is true, stderr output is also written to the output writer
50    /// with a `[stderr]` prefix. When false, stderr is captured but not displayed.
51    pub async fn execute<W: Write + Send>(
52        &self,
53        prompt: &str,
54        mut output_writer: W,
55        timeout: Option<Duration>,
56        verbose: bool,
57    ) -> std::io::Result<ExecutionResult> {
58        // Note: _temp_file is kept alive for the duration of this function scope.
59        // For large prompts (>7000 chars), Claude reads from the temp file.
60        let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
61
62        let mut command = Command::new(&cmd);
63        command.args(&args);
64        command.stdout(Stdio::piped());
65        command.stderr(Stdio::piped());
66
67        // Set working directory to current directory (mirrors PTY executor behavior)
68        let cwd = std::env::current_dir()?;
69        command.current_dir(&cwd);
70
71        debug!(
72            command = %cmd,
73            args = ?args,
74            cwd = ?cwd,
75            "Spawning CLI command"
76        );
77
78        if stdin_input.is_some() {
79            command.stdin(Stdio::piped());
80        }
81
82        let mut child = command.spawn()?;
83
84        // Write to stdin if needed
85        if let Some(input) = stdin_input
86            && let Some(mut stdin) = child.stdin.take()
87        {
88            stdin.write_all(input.as_bytes()).await?;
89            drop(stdin); // Close stdin to signal EOF
90        }
91
92        let mut timed_out = false;
93
94        // Take both stdout and stderr handles upfront to read concurrently
95        // This prevents deadlock when stderr fills its buffer before stdout produces output
96        let stdout_handle = child.stdout.take();
97        let stderr_handle = child.stderr.take();
98
99        // Wrap the streaming in a timeout if configured
100        // Read stdout and stderr CONCURRENTLY to avoid pipe buffer deadlock
101        let stream_result = async {
102            // Create futures for reading both streams
103            let stdout_future = async {
104                let mut lines_out = Vec::new();
105                if let Some(stdout) = stdout_handle {
106                    let reader = BufReader::new(stdout);
107                    let mut lines = reader.lines();
108                    while let Some(line) = lines.next_line().await? {
109                        lines_out.push(line);
110                    }
111                }
112                Ok::<_, std::io::Error>(lines_out)
113            };
114
115            let stderr_future = async {
116                let mut lines_out = Vec::new();
117                if let Some(stderr) = stderr_handle {
118                    let reader = BufReader::new(stderr);
119                    let mut lines = reader.lines();
120                    while let Some(line) = lines.next_line().await? {
121                        lines_out.push(line);
122                    }
123                }
124                Ok::<_, std::io::Error>(lines_out)
125            };
126
127            // Read both streams concurrently to prevent deadlock
128            let (stdout_lines, stderr_lines) = tokio::try_join!(stdout_future, stderr_future)?;
129
130            // Write stdout lines first (main output)
131            for line in &stdout_lines {
132                writeln!(output_writer, "{line}")?;
133            }
134
135            // Write stderr lines (prefixed) only in verbose mode
136            if verbose {
137                for line in &stderr_lines {
138                    writeln!(output_writer, "[stderr] {line}")?;
139                }
140            }
141
142            output_writer.flush()?;
143
144            // Build accumulated output (stdout first, then stderr)
145            let mut accumulated = String::new();
146            for line in stdout_lines {
147                accumulated.push_str(&line);
148                accumulated.push('\n');
149            }
150            for line in stderr_lines {
151                accumulated.push_str("[stderr] ");
152                accumulated.push_str(&line);
153                accumulated.push('\n');
154            }
155
156            Ok::<_, std::io::Error>(accumulated)
157        };
158
159        let accumulated_output = match timeout {
160            Some(duration) => {
161                debug!(timeout_secs = duration.as_secs(), "Executing with timeout");
162                match tokio::time::timeout(duration, stream_result).await {
163                    Ok(result) => result?,
164                    Err(_) => {
165                        // Timeout elapsed - send SIGTERM to the child process
166                        warn!(
167                            timeout_secs = duration.as_secs(),
168                            "Execution timeout reached, sending SIGTERM"
169                        );
170                        timed_out = true;
171                        Self::terminate_child(&mut child)?;
172                        String::new() // Return empty output on timeout
173                    }
174                }
175            }
176            None => stream_result.await?,
177        };
178
179        let status = child.wait().await?;
180
181        Ok(ExecutionResult {
182            output: accumulated_output,
183            success: status.success() && !timed_out,
184            exit_code: status.code(),
185            timed_out,
186        })
187    }
188
189    /// Terminates the child process with SIGTERM.
190    fn terminate_child(child: &mut tokio::process::Child) -> std::io::Result<()> {
191        if let Some(pid) = child.id() {
192            #[allow(clippy::cast_possible_wrap)]
193            let pid = Pid::from_raw(pid as i32);
194            debug!(%pid, "Sending SIGTERM to child process");
195            let _ = kill(pid, Signal::SIGTERM);
196        }
197        Ok(())
198    }
199
200    /// Executes a prompt without streaming (captures all output).
201    ///
202    /// Uses no timeout by default. For timed execution, use `execute_capture_with_timeout`.
203    pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
204        self.execute_capture_with_timeout(prompt, None).await
205    }
206
207    /// Executes a prompt without streaming, with optional timeout.
208    pub async fn execute_capture_with_timeout(
209        &self,
210        prompt: &str,
211        timeout: Option<Duration>,
212    ) -> std::io::Result<ExecutionResult> {
213        // Use a sink that discards output for non-streaming execution
214        // verbose=false since output is being discarded anyway
215        let sink = std::io::sink();
216        self.execute(prompt, sink, timeout, false).await
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[tokio::test]
225    async fn test_execute_echo() {
226        // Use echo as a simple test backend
227        let backend = CliBackend {
228            command: "echo".to_string(),
229            args: vec![],
230            prompt_mode: PromptMode::Arg,
231            prompt_flag: None,
232            output_format: OutputFormat::Text,
233        };
234
235        let executor = CliExecutor::new(backend);
236        let mut output = Vec::new();
237
238        let result = executor
239            .execute("hello world", &mut output, None, true)
240            .await
241            .unwrap();
242
243        assert!(result.success);
244        assert!(!result.timed_out);
245        assert!(result.output.contains("hello world"));
246    }
247
248    #[tokio::test]
249    async fn test_execute_stdin() {
250        // Use cat to test stdin mode
251        let backend = CliBackend {
252            command: "cat".to_string(),
253            args: vec![],
254            prompt_mode: PromptMode::Stdin,
255            prompt_flag: None,
256            output_format: OutputFormat::Text,
257        };
258
259        let executor = CliExecutor::new(backend);
260        let result = executor.execute_capture("stdin test").await.unwrap();
261
262        assert!(result.success);
263        assert!(result.output.contains("stdin test"));
264    }
265
266    #[tokio::test]
267    async fn test_execute_failure() {
268        let backend = CliBackend {
269            command: "false".to_string(), // Always exits with code 1
270            args: vec![],
271            prompt_mode: PromptMode::Arg,
272            prompt_flag: None,
273            output_format: OutputFormat::Text,
274        };
275
276        let executor = CliExecutor::new(backend);
277        let result = executor.execute_capture("").await.unwrap();
278
279        assert!(!result.success);
280        assert!(!result.timed_out);
281        assert_eq!(result.exit_code, Some(1));
282    }
283
284    #[tokio::test]
285    async fn test_execute_timeout() {
286        // Use sleep to test timeout behavior
287        // The sleep command ignores stdin, so we use PromptMode::Stdin
288        // to avoid appending the prompt as an argument
289        let backend = CliBackend {
290            command: "sleep".to_string(),
291            args: vec!["10".to_string()],   // Sleep for 10 seconds
292            prompt_mode: PromptMode::Stdin, // Use stdin mode so prompt doesn't interfere
293            prompt_flag: None,
294            output_format: OutputFormat::Text,
295        };
296
297        let executor = CliExecutor::new(backend);
298
299        // Execute with a 100ms timeout - should trigger timeout
300        let timeout = Some(Duration::from_millis(100));
301        let result = executor
302            .execute_capture_with_timeout("", timeout)
303            .await
304            .unwrap();
305
306        assert!(result.timed_out, "Expected execution to time out");
307        assert!(
308            !result.success,
309            "Timed out execution should not be successful"
310        );
311    }
312
313    #[tokio::test]
314    async fn test_execute_no_timeout_when_fast() {
315        // Use echo which completes immediately
316        let backend = CliBackend {
317            command: "echo".to_string(),
318            args: vec![],
319            prompt_mode: PromptMode::Arg,
320            prompt_flag: None,
321            output_format: OutputFormat::Text,
322        };
323
324        let executor = CliExecutor::new(backend);
325
326        // Execute with a generous timeout - should complete before timeout
327        let timeout = Some(Duration::from_secs(10));
328        let result = executor
329            .execute_capture_with_timeout("fast", timeout)
330            .await
331            .unwrap();
332
333        assert!(!result.timed_out, "Fast command should not time out");
334        assert!(result.success);
335        assert!(result.output.contains("fast"));
336    }
337}