ralph_adapters/
cli_executor.rs1use crate::cli_backend::CliBackend;
7#[cfg(test)]
8use crate::cli_backend::{OutputFormat, PromptMode};
9#[cfg(unix)]
10use nix::sys::signal::{Signal, kill};
11#[cfg(unix)]
12use nix::unistd::Pid;
13use std::io::Write;
14use std::process::Stdio;
15use std::time::Duration;
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::process::Command;
18use tracing::{debug, warn};
19
20#[derive(Debug)]
22pub struct ExecutionResult {
23 pub output: String,
25 pub success: bool,
27 pub exit_code: Option<i32>,
29 pub timed_out: bool,
31}
32
33#[derive(Debug)]
35pub struct CliExecutor {
36 backend: CliBackend,
37}
38
39impl CliExecutor {
40 pub fn new(backend: CliBackend) -> Self {
42 Self { backend }
43 }
44
45 pub async fn execute<W: Write + Send>(
54 &self,
55 prompt: &str,
56 mut output_writer: W,
57 timeout: Option<Duration>,
58 verbose: bool,
59 ) -> std::io::Result<ExecutionResult> {
60 let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
63
64 let mut command = Command::new(&cmd);
65 command.args(&args);
66 command.stdout(Stdio::piped());
67 command.stderr(Stdio::piped());
68
69 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
72 command.current_dir(&cwd);
73
74 debug!(
75 command = %cmd,
76 args = ?args,
77 cwd = ?cwd,
78 "Spawning CLI command"
79 );
80
81 if stdin_input.is_some() {
82 command.stdin(Stdio::piped());
83 }
84
85 let mut child = command.spawn()?;
86
87 if let Some(input) = stdin_input
89 && let Some(mut stdin) = child.stdin.take()
90 {
91 stdin.write_all(input.as_bytes()).await?;
92 drop(stdin); }
94
95 let mut timed_out = false;
96
97 let stdout_handle = child.stdout.take();
100 let stderr_handle = child.stderr.take();
101
102 let stream_result = async {
105 let stdout_future = async {
107 let mut lines_out = Vec::new();
108 if let Some(stdout) = stdout_handle {
109 let reader = BufReader::new(stdout);
110 let mut lines = reader.lines();
111 while let Some(line) = lines.next_line().await? {
112 lines_out.push(line);
113 }
114 }
115 Ok::<_, std::io::Error>(lines_out)
116 };
117
118 let stderr_future = async {
119 let mut lines_out = Vec::new();
120 if let Some(stderr) = stderr_handle {
121 let reader = BufReader::new(stderr);
122 let mut lines = reader.lines();
123 while let Some(line) = lines.next_line().await? {
124 lines_out.push(line);
125 }
126 }
127 Ok::<_, std::io::Error>(lines_out)
128 };
129
130 let (stdout_lines, stderr_lines) = tokio::try_join!(stdout_future, stderr_future)?;
132
133 for line in &stdout_lines {
135 writeln!(output_writer, "{line}")?;
136 }
137
138 if verbose {
140 for line in &stderr_lines {
141 writeln!(output_writer, "[stderr] {line}")?;
142 }
143 }
144
145 output_writer.flush()?;
146
147 let mut accumulated = String::new();
149 for line in stdout_lines {
150 accumulated.push_str(&line);
151 accumulated.push('\n');
152 }
153 for line in stderr_lines {
154 accumulated.push_str("[stderr] ");
155 accumulated.push_str(&line);
156 accumulated.push('\n');
157 }
158
159 Ok::<_, std::io::Error>(accumulated)
160 };
161
162 let accumulated_output = match timeout {
163 Some(duration) => {
164 debug!(timeout_secs = duration.as_secs(), "Executing with timeout");
165 match tokio::time::timeout(duration, stream_result).await {
166 Ok(result) => result?,
167 Err(_) => {
168 warn!(
170 timeout_secs = duration.as_secs(),
171 "Execution timeout reached, sending SIGTERM"
172 );
173 timed_out = true;
174 Self::terminate_child(&mut child)?;
175 String::new() }
177 }
178 }
179 None => stream_result.await?,
180 };
181
182 let status = child.wait().await?;
183
184 Ok(ExecutionResult {
185 output: accumulated_output,
186 success: status.success() && !timed_out,
187 exit_code: status.code(),
188 timed_out,
189 })
190 }
191
192 fn terminate_child(child: &mut tokio::process::Child) -> std::io::Result<()> {
194 #[cfg(not(unix))]
195 {
196 child.start_kill()
199 }
200
201 #[cfg(unix)]
202 if let Some(pid) = child.id() {
203 #[allow(clippy::cast_possible_wrap)]
204 let pid = Pid::from_raw(pid as i32);
205 debug!(%pid, "Sending SIGTERM to child process");
206 let _ = kill(pid, Signal::SIGTERM);
207 Ok(())
208 } else {
209 Ok(())
210 }
211 }
212
213 pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
217 self.execute_capture_with_timeout(prompt, None).await
218 }
219
220 pub async fn execute_capture_with_timeout(
222 &self,
223 prompt: &str,
224 timeout: Option<Duration>,
225 ) -> std::io::Result<ExecutionResult> {
226 let sink = std::io::sink();
229 self.execute(prompt, sink, timeout, false).await
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[tokio::test]
238 async fn test_execute_echo() {
239 let backend = CliBackend {
241 command: "echo".to_string(),
242 args: vec![],
243 prompt_mode: PromptMode::Arg,
244 prompt_flag: None,
245 output_format: OutputFormat::Text,
246 };
247
248 let executor = CliExecutor::new(backend);
249 let mut output = Vec::new();
250
251 let result = executor
252 .execute("hello world", &mut output, None, true)
253 .await
254 .unwrap();
255
256 assert!(result.success);
257 assert!(!result.timed_out);
258 assert!(result.output.contains("hello world"));
259 }
260
261 #[tokio::test]
262 async fn test_execute_stdin() {
263 let backend = CliBackend {
265 command: "cat".to_string(),
266 args: vec![],
267 prompt_mode: PromptMode::Stdin,
268 prompt_flag: None,
269 output_format: OutputFormat::Text,
270 };
271
272 let executor = CliExecutor::new(backend);
273 let result = executor.execute_capture("stdin test").await.unwrap();
274
275 assert!(result.success);
276 assert!(result.output.contains("stdin test"));
277 }
278
279 #[tokio::test]
280 async fn test_execute_failure() {
281 let backend = CliBackend {
282 command: "false".to_string(), args: vec![],
284 prompt_mode: PromptMode::Arg,
285 prompt_flag: None,
286 output_format: OutputFormat::Text,
287 };
288
289 let executor = CliExecutor::new(backend);
290 let result = executor.execute_capture("").await.unwrap();
291
292 assert!(!result.success);
293 assert!(!result.timed_out);
294 assert_eq!(result.exit_code, Some(1));
295 }
296
297 #[tokio::test]
298 async fn test_execute_timeout() {
299 let backend = CliBackend {
303 command: "sleep".to_string(),
304 args: vec!["10".to_string()], prompt_mode: PromptMode::Stdin, prompt_flag: None,
307 output_format: OutputFormat::Text,
308 };
309
310 let executor = CliExecutor::new(backend);
311
312 let timeout = Some(Duration::from_millis(100));
314 let result = executor
315 .execute_capture_with_timeout("", timeout)
316 .await
317 .unwrap();
318
319 assert!(result.timed_out, "Expected execution to time out");
320 assert!(
321 !result.success,
322 "Timed out execution should not be successful"
323 );
324 }
325
326 #[tokio::test]
327 async fn test_execute_no_timeout_when_fast() {
328 let backend = CliBackend {
330 command: "echo".to_string(),
331 args: vec![],
332 prompt_mode: PromptMode::Arg,
333 prompt_flag: None,
334 output_format: OutputFormat::Text,
335 };
336
337 let executor = CliExecutor::new(backend);
338
339 let timeout = Some(Duration::from_secs(10));
341 let result = executor
342 .execute_capture_with_timeout("fast", timeout)
343 .await
344 .unwrap();
345
346 assert!(!result.timed_out, "Fast command should not time out");
347 assert!(result.success);
348 assert!(result.output.contains("fast"));
349 }
350}