Skip to main content

ralph_adapters/
cli_executor.rs

1//! CLI executor for running prompts through backends.
2//!
3//! Executes prompts via CLI tools with real-time streaming output.
4//! Supports optional execution timeout with graceful SIGTERM termination.
5
6use crate::cli_backend::CliBackend;
7#[cfg(test)]
8use crate::cli_backend::{OutputFormat, PromptMode};
9use nix::sys::signal::{kill, Signal};
10use nix::unistd::Pid;
11use std::io::Write;
12use std::process::Stdio;
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::process::Command;
16use tracing::{debug, warn};
17
18/// Result of a CLI execution.
19#[derive(Debug)]
20pub struct ExecutionResult {
21    /// The full output from the CLI.
22    pub output: String,
23    /// Whether the execution succeeded (exit code 0).
24    pub success: bool,
25    /// The exit code.
26    pub exit_code: Option<i32>,
27    /// Whether the execution was terminated due to timeout.
28    pub timed_out: bool,
29}
30
31/// Executor for running prompts through CLI backends.
32#[derive(Debug)]
33pub struct CliExecutor {
34    backend: CliBackend,
35}
36
37impl CliExecutor {
38    /// Creates a new executor with the given backend.
39    pub fn new(backend: CliBackend) -> Self {
40        Self { backend }
41    }
42
43    /// Executes a prompt and streams output to the provided writer.
44    ///
45    /// Output is streamed line-by-line to the writer while being accumulated
46    /// for the return value. If `timeout` is provided and the execution exceeds
47    /// it, the process receives SIGTERM and the result indicates timeout.
48    ///
49    /// When `verbose` is true, stderr output is also written to the output writer
50    /// with a `[stderr]` prefix. When false, stderr is captured but not displayed.
51    pub async fn execute<W: Write + Send>(
52        &self,
53        prompt: &str,
54        mut output_writer: W,
55        timeout: Option<Duration>,
56        verbose: bool,
57    ) -> std::io::Result<ExecutionResult> {
58        // Note: _temp_file is kept alive for the duration of this function scope.
59        // For large prompts (>7000 chars), Claude reads from the temp file.
60        let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
61
62        let mut command = Command::new(&cmd);
63        command.args(&args);
64        command.stdout(Stdio::piped());
65        command.stderr(Stdio::piped());
66
67        // Set working directory to current directory (mirrors PTY executor behavior)
68        let cwd = std::env::current_dir()?;
69        command.current_dir(&cwd);
70
71        debug!(
72            command = %cmd,
73            args = ?args,
74            cwd = ?cwd,
75            "Spawning CLI command"
76        );
77
78        if stdin_input.is_some() {
79            command.stdin(Stdio::piped());
80        }
81
82        let mut child = command.spawn()?;
83
84        // Write to stdin if needed
85        if let Some(input) = stdin_input {
86            if let Some(mut stdin) = child.stdin.take() {
87                stdin.write_all(input.as_bytes()).await?;
88                drop(stdin); // Close stdin to signal EOF
89            }
90        }
91
92        let mut timed_out = false;
93
94        // Take both stdout and stderr handles upfront to read concurrently
95        // This prevents deadlock when stderr fills its buffer before stdout produces output
96        let stdout_handle = child.stdout.take();
97        let stderr_handle = child.stderr.take();
98
99        // Wrap the streaming in a timeout if configured
100        // Read stdout and stderr CONCURRENTLY to avoid pipe buffer deadlock
101        let stream_result = async {
102            // Create futures for reading both streams
103            let stdout_future = async {
104                let mut lines_out = Vec::new();
105                if let Some(stdout) = stdout_handle {
106                    let reader = BufReader::new(stdout);
107                    let mut lines = reader.lines();
108                    while let Some(line) = lines.next_line().await? {
109                        lines_out.push(line);
110                    }
111                }
112                Ok::<_, std::io::Error>(lines_out)
113            };
114
115            let stderr_future = async {
116                let mut lines_out = Vec::new();
117                if let Some(stderr) = stderr_handle {
118                    let reader = BufReader::new(stderr);
119                    let mut lines = reader.lines();
120                    while let Some(line) = lines.next_line().await? {
121                        lines_out.push(line);
122                    }
123                }
124                Ok::<_, std::io::Error>(lines_out)
125            };
126
127            // Read both streams concurrently to prevent deadlock
128            let (stdout_lines, stderr_lines) = tokio::try_join!(stdout_future, stderr_future)?;
129
130            // Write stdout lines first (main output)
131            for line in &stdout_lines {
132                writeln!(output_writer, "{line}")?;
133            }
134
135            // Write stderr lines (prefixed) only in verbose mode
136            if verbose {
137                for line in &stderr_lines {
138                    writeln!(output_writer, "[stderr] {line}")?;
139                }
140            }
141
142            output_writer.flush()?;
143
144            // Build accumulated output (stdout first, then stderr)
145            let mut accumulated = String::new();
146            for line in stdout_lines {
147                accumulated.push_str(&line);
148                accumulated.push('\n');
149            }
150            for line in stderr_lines {
151                accumulated.push_str("[stderr] ");
152                accumulated.push_str(&line);
153                accumulated.push('\n');
154            }
155
156            Ok::<_, std::io::Error>(accumulated)
157        };
158
159        let accumulated_output = match timeout {
160            Some(duration) => {
161                debug!(timeout_secs = duration.as_secs(), "Executing with timeout");
162                match tokio::time::timeout(duration, stream_result).await {
163                    Ok(result) => result?,
164                    Err(_) => {
165                        // Timeout elapsed - send SIGTERM to the child process
166                        warn!(
167                            timeout_secs = duration.as_secs(),
168                            "Execution timeout reached, sending SIGTERM"
169                        );
170                        timed_out = true;
171                        Self::terminate_child(&mut child)?;
172                        String::new() // Return empty output on timeout
173                    }
174                }
175            }
176            None => {
177                stream_result.await?
178            }
179        };
180
181        let status = child.wait().await?;
182
183        Ok(ExecutionResult {
184            output: accumulated_output,
185            success: status.success() && !timed_out,
186            exit_code: status.code(),
187            timed_out,
188        })
189    }
190
191    /// Terminates the child process with SIGTERM.
192    fn terminate_child(child: &mut tokio::process::Child) -> std::io::Result<()> {
193        if let Some(pid) = child.id() {
194            #[allow(clippy::cast_possible_wrap)]
195            let pid = Pid::from_raw(pid as i32);
196            debug!(%pid, "Sending SIGTERM to child process");
197            let _ = kill(pid, Signal::SIGTERM);
198        }
199        Ok(())
200    }
201
202    /// Executes a prompt without streaming (captures all output).
203    ///
204    /// Uses no timeout by default. For timed execution, use `execute_capture_with_timeout`.
205    pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
206        self.execute_capture_with_timeout(prompt, None).await
207    }
208
209    /// Executes a prompt without streaming, with optional timeout.
210    pub async fn execute_capture_with_timeout(
211        &self,
212        prompt: &str,
213        timeout: Option<Duration>,
214    ) -> std::io::Result<ExecutionResult> {
215        // Use a sink that discards output for non-streaming execution
216        // verbose=false since output is being discarded anyway
217        let sink = std::io::sink();
218        self.execute(prompt, sink, timeout, false).await
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[tokio::test]
227    async fn test_execute_echo() {
228        // Use echo as a simple test backend
229        let backend = CliBackend {
230            command: "echo".to_string(),
231            args: vec![],
232            prompt_mode: PromptMode::Arg,
233            prompt_flag: None,
234            output_format: OutputFormat::Text,
235        };
236
237        let executor = CliExecutor::new(backend);
238        let mut output = Vec::new();
239
240        let result = executor.execute("hello world", &mut output, None, true).await.unwrap();
241
242        assert!(result.success);
243        assert!(!result.timed_out);
244        assert!(result.output.contains("hello world"));
245    }
246
247    #[tokio::test]
248    async fn test_execute_stdin() {
249        // Use cat to test stdin mode
250        let backend = CliBackend {
251            command: "cat".to_string(),
252            args: vec![],
253            prompt_mode: PromptMode::Stdin,
254            prompt_flag: None,
255            output_format: OutputFormat::Text,
256        };
257
258        let executor = CliExecutor::new(backend);
259        let result = executor.execute_capture("stdin test").await.unwrap();
260
261        assert!(result.success);
262        assert!(result.output.contains("stdin test"));
263    }
264
265    #[tokio::test]
266    async fn test_execute_failure() {
267        let backend = CliBackend {
268            command: "false".to_string(), // Always exits with code 1
269            args: vec![],
270            prompt_mode: PromptMode::Arg,
271            prompt_flag: None,
272            output_format: OutputFormat::Text,
273        };
274
275        let executor = CliExecutor::new(backend);
276        let result = executor.execute_capture("").await.unwrap();
277
278        assert!(!result.success);
279        assert!(!result.timed_out);
280        assert_eq!(result.exit_code, Some(1));
281    }
282
283    #[tokio::test]
284    async fn test_execute_timeout() {
285        // Use sleep to test timeout behavior
286        // The sleep command ignores stdin, so we use PromptMode::Stdin
287        // to avoid appending the prompt as an argument
288        let backend = CliBackend {
289            command: "sleep".to_string(),
290            args: vec!["10".to_string()], // Sleep for 10 seconds
291            prompt_mode: PromptMode::Stdin, // Use stdin mode so prompt doesn't interfere
292            prompt_flag: None,
293            output_format: OutputFormat::Text,
294        };
295
296        let executor = CliExecutor::new(backend);
297
298        // Execute with a 100ms timeout - should trigger timeout
299        let timeout = Some(Duration::from_millis(100));
300        let result = executor.execute_capture_with_timeout("", timeout).await.unwrap();
301
302        assert!(result.timed_out, "Expected execution to time out");
303        assert!(!result.success, "Timed out execution should not be successful");
304    }
305
306    #[tokio::test]
307    async fn test_execute_no_timeout_when_fast() {
308        // Use echo which completes immediately
309        let backend = CliBackend {
310            command: "echo".to_string(),
311            args: vec![],
312            prompt_mode: PromptMode::Arg,
313            prompt_flag: None,
314            output_format: OutputFormat::Text,
315        };
316
317        let executor = CliExecutor::new(backend);
318
319        // Execute with a generous timeout - should complete before timeout
320        let timeout = Some(Duration::from_secs(10));
321        let result = executor.execute_capture_with_timeout("fast", timeout).await.unwrap();
322
323        assert!(!result.timed_out, "Fast command should not time out");
324        assert!(result.success);
325        assert!(result.output.contains("fast"));
326    }
327}