ralph_adapters/
cli_executor.rs1use crate::cli_backend::CliBackend;
7#[cfg(test)]
8use crate::cli_backend::{OutputFormat, PromptMode};
9use nix::sys::signal::{Signal, kill};
10use nix::unistd::Pid;
11use std::io::Write;
12use std::process::Stdio;
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::process::Command;
16use tracing::{debug, warn};
17
18#[derive(Debug)]
20pub struct ExecutionResult {
21 pub output: String,
23 pub success: bool,
25 pub exit_code: Option<i32>,
27 pub timed_out: bool,
29}
30
31#[derive(Debug)]
33pub struct CliExecutor {
34 backend: CliBackend,
35}
36
37impl CliExecutor {
38 pub fn new(backend: CliBackend) -> Self {
40 Self { backend }
41 }
42
43 pub async fn execute<W: Write + Send>(
52 &self,
53 prompt: &str,
54 mut output_writer: W,
55 timeout: Option<Duration>,
56 verbose: bool,
57 ) -> std::io::Result<ExecutionResult> {
58 let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
61
62 let mut command = Command::new(&cmd);
63 command.args(&args);
64 command.stdout(Stdio::piped());
65 command.stderr(Stdio::piped());
66
67 let cwd = std::env::current_dir()?;
69 command.current_dir(&cwd);
70
71 debug!(
72 command = %cmd,
73 args = ?args,
74 cwd = ?cwd,
75 "Spawning CLI command"
76 );
77
78 if stdin_input.is_some() {
79 command.stdin(Stdio::piped());
80 }
81
82 let mut child = command.spawn()?;
83
84 if let Some(input) = stdin_input
86 && let Some(mut stdin) = child.stdin.take()
87 {
88 stdin.write_all(input.as_bytes()).await?;
89 drop(stdin); }
91
92 let mut timed_out = false;
93
94 let stdout_handle = child.stdout.take();
97 let stderr_handle = child.stderr.take();
98
99 let stream_result = async {
102 let stdout_future = async {
104 let mut lines_out = Vec::new();
105 if let Some(stdout) = stdout_handle {
106 let reader = BufReader::new(stdout);
107 let mut lines = reader.lines();
108 while let Some(line) = lines.next_line().await? {
109 lines_out.push(line);
110 }
111 }
112 Ok::<_, std::io::Error>(lines_out)
113 };
114
115 let stderr_future = async {
116 let mut lines_out = Vec::new();
117 if let Some(stderr) = stderr_handle {
118 let reader = BufReader::new(stderr);
119 let mut lines = reader.lines();
120 while let Some(line) = lines.next_line().await? {
121 lines_out.push(line);
122 }
123 }
124 Ok::<_, std::io::Error>(lines_out)
125 };
126
127 let (stdout_lines, stderr_lines) = tokio::try_join!(stdout_future, stderr_future)?;
129
130 for line in &stdout_lines {
132 writeln!(output_writer, "{line}")?;
133 }
134
135 if verbose {
137 for line in &stderr_lines {
138 writeln!(output_writer, "[stderr] {line}")?;
139 }
140 }
141
142 output_writer.flush()?;
143
144 let mut accumulated = String::new();
146 for line in stdout_lines {
147 accumulated.push_str(&line);
148 accumulated.push('\n');
149 }
150 for line in stderr_lines {
151 accumulated.push_str("[stderr] ");
152 accumulated.push_str(&line);
153 accumulated.push('\n');
154 }
155
156 Ok::<_, std::io::Error>(accumulated)
157 };
158
159 let accumulated_output = match timeout {
160 Some(duration) => {
161 debug!(timeout_secs = duration.as_secs(), "Executing with timeout");
162 match tokio::time::timeout(duration, stream_result).await {
163 Ok(result) => result?,
164 Err(_) => {
165 warn!(
167 timeout_secs = duration.as_secs(),
168 "Execution timeout reached, sending SIGTERM"
169 );
170 timed_out = true;
171 Self::terminate_child(&mut child)?;
172 String::new() }
174 }
175 }
176 None => stream_result.await?,
177 };
178
179 let status = child.wait().await?;
180
181 Ok(ExecutionResult {
182 output: accumulated_output,
183 success: status.success() && !timed_out,
184 exit_code: status.code(),
185 timed_out,
186 })
187 }
188
189 fn terminate_child(child: &mut tokio::process::Child) -> std::io::Result<()> {
191 if let Some(pid) = child.id() {
192 #[allow(clippy::cast_possible_wrap)]
193 let pid = Pid::from_raw(pid as i32);
194 debug!(%pid, "Sending SIGTERM to child process");
195 let _ = kill(pid, Signal::SIGTERM);
196 }
197 Ok(())
198 }
199
200 pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
204 self.execute_capture_with_timeout(prompt, None).await
205 }
206
207 pub async fn execute_capture_with_timeout(
209 &self,
210 prompt: &str,
211 timeout: Option<Duration>,
212 ) -> std::io::Result<ExecutionResult> {
213 let sink = std::io::sink();
216 self.execute(prompt, sink, timeout, false).await
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[tokio::test]
225 async fn test_execute_echo() {
226 let backend = CliBackend {
228 command: "echo".to_string(),
229 args: vec![],
230 prompt_mode: PromptMode::Arg,
231 prompt_flag: None,
232 output_format: OutputFormat::Text,
233 };
234
235 let executor = CliExecutor::new(backend);
236 let mut output = Vec::new();
237
238 let result = executor
239 .execute("hello world", &mut output, None, true)
240 .await
241 .unwrap();
242
243 assert!(result.success);
244 assert!(!result.timed_out);
245 assert!(result.output.contains("hello world"));
246 }
247
248 #[tokio::test]
249 async fn test_execute_stdin() {
250 let backend = CliBackend {
252 command: "cat".to_string(),
253 args: vec![],
254 prompt_mode: PromptMode::Stdin,
255 prompt_flag: None,
256 output_format: OutputFormat::Text,
257 };
258
259 let executor = CliExecutor::new(backend);
260 let result = executor.execute_capture("stdin test").await.unwrap();
261
262 assert!(result.success);
263 assert!(result.output.contains("stdin test"));
264 }
265
266 #[tokio::test]
267 async fn test_execute_failure() {
268 let backend = CliBackend {
269 command: "false".to_string(), args: vec![],
271 prompt_mode: PromptMode::Arg,
272 prompt_flag: None,
273 output_format: OutputFormat::Text,
274 };
275
276 let executor = CliExecutor::new(backend);
277 let result = executor.execute_capture("").await.unwrap();
278
279 assert!(!result.success);
280 assert!(!result.timed_out);
281 assert_eq!(result.exit_code, Some(1));
282 }
283
284 #[tokio::test]
285 async fn test_execute_timeout() {
286 let backend = CliBackend {
290 command: "sleep".to_string(),
291 args: vec!["10".to_string()], prompt_mode: PromptMode::Stdin, prompt_flag: None,
294 output_format: OutputFormat::Text,
295 };
296
297 let executor = CliExecutor::new(backend);
298
299 let timeout = Some(Duration::from_millis(100));
301 let result = executor
302 .execute_capture_with_timeout("", timeout)
303 .await
304 .unwrap();
305
306 assert!(result.timed_out, "Expected execution to time out");
307 assert!(
308 !result.success,
309 "Timed out execution should not be successful"
310 );
311 }
312
313 #[tokio::test]
314 async fn test_execute_no_timeout_when_fast() {
315 let backend = CliBackend {
317 command: "echo".to_string(),
318 args: vec![],
319 prompt_mode: PromptMode::Arg,
320 prompt_flag: None,
321 output_format: OutputFormat::Text,
322 };
323
324 let executor = CliExecutor::new(backend);
325
326 let timeout = Some(Duration::from_secs(10));
328 let result = executor
329 .execute_capture_with_timeout("fast", timeout)
330 .await
331 .unwrap();
332
333 assert!(!result.timed_out, "Fast command should not time out");
334 assert!(result.success);
335 assert!(result.output.contains("fast"));
336 }
337}