ralph_adapters/
cli_executor.rs1use crate::cli_backend::CliBackend;
7#[cfg(test)]
8use crate::cli_backend::{OutputFormat, PromptMode};
9use nix::sys::signal::{kill, Signal};
10use nix::unistd::Pid;
11use std::io::Write;
12use std::process::Stdio;
13use std::time::Duration;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::process::Command;
16use tracing::{debug, warn};
17
18#[derive(Debug)]
20pub struct ExecutionResult {
21 pub output: String,
23 pub success: bool,
25 pub exit_code: Option<i32>,
27 pub timed_out: bool,
29}
30
31#[derive(Debug)]
33pub struct CliExecutor {
34 backend: CliBackend,
35}
36
37impl CliExecutor {
38 pub fn new(backend: CliBackend) -> Self {
40 Self { backend }
41 }
42
43 pub async fn execute<W: Write + Send>(
52 &self,
53 prompt: &str,
54 mut output_writer: W,
55 timeout: Option<Duration>,
56 verbose: bool,
57 ) -> std::io::Result<ExecutionResult> {
58 let (cmd, args, stdin_input, _temp_file) = self.backend.build_command(prompt, false);
61
62 let mut command = Command::new(&cmd);
63 command.args(&args);
64 command.stdout(Stdio::piped());
65 command.stderr(Stdio::piped());
66
67 let cwd = std::env::current_dir()?;
69 command.current_dir(&cwd);
70
71 debug!(
72 command = %cmd,
73 args = ?args,
74 cwd = ?cwd,
75 "Spawning CLI command"
76 );
77
78 if stdin_input.is_some() {
79 command.stdin(Stdio::piped());
80 }
81
82 let mut child = command.spawn()?;
83
84 if let Some(input) = stdin_input {
86 if let Some(mut stdin) = child.stdin.take() {
87 stdin.write_all(input.as_bytes()).await?;
88 drop(stdin); }
90 }
91
92 let mut timed_out = false;
93
94 let stdout_handle = child.stdout.take();
97 let stderr_handle = child.stderr.take();
98
99 let stream_result = async {
102 let stdout_future = async {
104 let mut lines_out = Vec::new();
105 if let Some(stdout) = stdout_handle {
106 let reader = BufReader::new(stdout);
107 let mut lines = reader.lines();
108 while let Some(line) = lines.next_line().await? {
109 lines_out.push(line);
110 }
111 }
112 Ok::<_, std::io::Error>(lines_out)
113 };
114
115 let stderr_future = async {
116 let mut lines_out = Vec::new();
117 if let Some(stderr) = stderr_handle {
118 let reader = BufReader::new(stderr);
119 let mut lines = reader.lines();
120 while let Some(line) = lines.next_line().await? {
121 lines_out.push(line);
122 }
123 }
124 Ok::<_, std::io::Error>(lines_out)
125 };
126
127 let (stdout_lines, stderr_lines) = tokio::try_join!(stdout_future, stderr_future)?;
129
130 for line in &stdout_lines {
132 writeln!(output_writer, "{line}")?;
133 }
134
135 if verbose {
137 for line in &stderr_lines {
138 writeln!(output_writer, "[stderr] {line}")?;
139 }
140 }
141
142 output_writer.flush()?;
143
144 let mut accumulated = String::new();
146 for line in stdout_lines {
147 accumulated.push_str(&line);
148 accumulated.push('\n');
149 }
150 for line in stderr_lines {
151 accumulated.push_str("[stderr] ");
152 accumulated.push_str(&line);
153 accumulated.push('\n');
154 }
155
156 Ok::<_, std::io::Error>(accumulated)
157 };
158
159 let accumulated_output = match timeout {
160 Some(duration) => {
161 debug!(timeout_secs = duration.as_secs(), "Executing with timeout");
162 match tokio::time::timeout(duration, stream_result).await {
163 Ok(result) => result?,
164 Err(_) => {
165 warn!(
167 timeout_secs = duration.as_secs(),
168 "Execution timeout reached, sending SIGTERM"
169 );
170 timed_out = true;
171 Self::terminate_child(&mut child)?;
172 String::new() }
174 }
175 }
176 None => {
177 stream_result.await?
178 }
179 };
180
181 let status = child.wait().await?;
182
183 Ok(ExecutionResult {
184 output: accumulated_output,
185 success: status.success() && !timed_out,
186 exit_code: status.code(),
187 timed_out,
188 })
189 }
190
191 fn terminate_child(child: &mut tokio::process::Child) -> std::io::Result<()> {
193 if let Some(pid) = child.id() {
194 #[allow(clippy::cast_possible_wrap)]
195 let pid = Pid::from_raw(pid as i32);
196 debug!(%pid, "Sending SIGTERM to child process");
197 let _ = kill(pid, Signal::SIGTERM);
198 }
199 Ok(())
200 }
201
202 pub async fn execute_capture(&self, prompt: &str) -> std::io::Result<ExecutionResult> {
206 self.execute_capture_with_timeout(prompt, None).await
207 }
208
209 pub async fn execute_capture_with_timeout(
211 &self,
212 prompt: &str,
213 timeout: Option<Duration>,
214 ) -> std::io::Result<ExecutionResult> {
215 let sink = std::io::sink();
218 self.execute(prompt, sink, timeout, false).await
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[tokio::test]
227 async fn test_execute_echo() {
228 let backend = CliBackend {
230 command: "echo".to_string(),
231 args: vec![],
232 prompt_mode: PromptMode::Arg,
233 prompt_flag: None,
234 output_format: OutputFormat::Text,
235 };
236
237 let executor = CliExecutor::new(backend);
238 let mut output = Vec::new();
239
240 let result = executor.execute("hello world", &mut output, None, true).await.unwrap();
241
242 assert!(result.success);
243 assert!(!result.timed_out);
244 assert!(result.output.contains("hello world"));
245 }
246
247 #[tokio::test]
248 async fn test_execute_stdin() {
249 let backend = CliBackend {
251 command: "cat".to_string(),
252 args: vec![],
253 prompt_mode: PromptMode::Stdin,
254 prompt_flag: None,
255 output_format: OutputFormat::Text,
256 };
257
258 let executor = CliExecutor::new(backend);
259 let result = executor.execute_capture("stdin test").await.unwrap();
260
261 assert!(result.success);
262 assert!(result.output.contains("stdin test"));
263 }
264
265 #[tokio::test]
266 async fn test_execute_failure() {
267 let backend = CliBackend {
268 command: "false".to_string(), args: vec![],
270 prompt_mode: PromptMode::Arg,
271 prompt_flag: None,
272 output_format: OutputFormat::Text,
273 };
274
275 let executor = CliExecutor::new(backend);
276 let result = executor.execute_capture("").await.unwrap();
277
278 assert!(!result.success);
279 assert!(!result.timed_out);
280 assert_eq!(result.exit_code, Some(1));
281 }
282
283 #[tokio::test]
284 async fn test_execute_timeout() {
285 let backend = CliBackend {
289 command: "sleep".to_string(),
290 args: vec!["10".to_string()], prompt_mode: PromptMode::Stdin, prompt_flag: None,
293 output_format: OutputFormat::Text,
294 };
295
296 let executor = CliExecutor::new(backend);
297
298 let timeout = Some(Duration::from_millis(100));
300 let result = executor.execute_capture_with_timeout("", timeout).await.unwrap();
301
302 assert!(result.timed_out, "Expected execution to time out");
303 assert!(!result.success, "Timed out execution should not be successful");
304 }
305
306 #[tokio::test]
307 async fn test_execute_no_timeout_when_fast() {
308 let backend = CliBackend {
310 command: "echo".to_string(),
311 args: vec![],
312 prompt_mode: PromptMode::Arg,
313 prompt_flag: None,
314 output_format: OutputFormat::Text,
315 };
316
317 let executor = CliExecutor::new(backend);
318
319 let timeout = Some(Duration::from_secs(10));
321 let result = executor.execute_capture_with_timeout("fast", timeout).await.unwrap();
322
323 assert!(!result.timed_out, "Fast command should not time out");
324 assert!(result.success);
325 assert!(result.output.contains("fast"));
326 }
327}