ralph_workflow/executor/executor_trait.rs
1//! `ProcessExecutor` trait definition.
2//!
3//! This module defines the trait abstraction for process execution,
4//! enabling dependency injection for testing.
5
6use super::{AgentChildHandle, AgentSpawnConfig, ProcessOutput, RealAgentChild};
7use std::io;
8use std::path::Path;
9
10/// Trait for executing external processes.
11///
12/// This trait abstracts process execution to allow dependency injection.
13/// Production code uses `RealProcessExecutor` which calls actual commands.
14/// Test code can use `MockProcessExecutor` to control process behavior.
15///
16/// Only external process execution is abstracted. Internal code logic is never mocked.
17pub trait ProcessExecutor: Send + Sync + std::fmt::Debug {
18 /// Execute a command with given arguments and return its output.
19 ///
20 /// # Arguments
21 ///
22 /// * `command` - The program to execute
23 /// * `args` - Command-line arguments to pass to the program
24 /// * `env` - Environment variables to set for the process (optional)
25 /// * `workdir` - Working directory for the process (optional)
26 ///
27 /// # Returns
28 ///
29 /// Returns a `ProcessOutput` containing exit status, stdout, and stderr.
30 ///
31 /// # Errors
32 ///
33 /// Returns an error if command cannot be spawned or if output capture fails.
34 fn execute(
35 &self,
36 command: &str,
37 args: &[&str],
38 env: &[(String, String)],
39 workdir: Option<&Path>,
40 ) -> io::Result<ProcessOutput>;
41
42 /// Spawn a process with stdin input and return the child handle.
43 ///
44 /// This method is used when you need to write to the process's stdin
45 /// or stream its output in real-time. Unlike `execute()`, this returns
46 /// a `Child` handle for direct interaction.
47 ///
48 /// # Arguments
49 ///
50 /// * `command` - The program to execute
51 /// * `args` - Command-line arguments to pass to the program
52 /// * `env` - Environment variables to set for the process (optional)
53 /// * `workdir` - Working directory for the process (optional)
54 ///
55 /// # Returns
56 ///
57 /// Returns a `Child` handle that can be used to interact with the process.
58 ///
59 /// # Errors
60 ///
61 /// Returns an error if command cannot be spawned.
62 fn spawn(
63 &self,
64 command: &str,
65 args: &[&str],
66 env: &[(String, String)],
67 workdir: Option<&Path>,
68 ) -> io::Result<std::process::Child> {
69 let mut cmd = std::process::Command::new(command);
70 cmd.args(args);
71
72 for (key, value) in env {
73 cmd.env(key, value);
74 }
75
76 if let Some(dir) = workdir {
77 cmd.current_dir(dir);
78 }
79
80 cmd.stdin(std::process::Stdio::piped())
81 .stdout(std::process::Stdio::piped())
82 .stderr(std::process::Stdio::piped())
83 .spawn()
84 }
85
86 /// Spawn an agent process with streaming output support.
87 ///
88 /// This method is specifically designed for spawning AI agent subprocesses
89 /// that need to output streaming JSON in real-time. Unlike `spawn()`, this
90 /// returns a handle with boxed stdout for trait object compatibility.
91 ///
92 /// # Arguments
93 ///
94 /// * `config` - Agent spawn configuration including command, args, env, prompt, etc.
95 ///
96 /// # Returns
97 ///
98 /// Returns an `AgentChildHandle` with stdout, stderr, and the child process.
99 ///
100 /// # Errors
101 ///
102 /// Returns an error if the agent cannot be spawned.
103 ///
104 /// # Default Implementation
105 ///
106 /// The default implementation uses the `spawn()` method with additional
107 /// configuration for agent-specific needs. Mock implementations should
108 /// override this to return mock results without spawning real processes.
109 fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
110 let mut cmd = std::process::Command::new(&config.command);
111 cmd.args(&config.args);
112
113 // Set environment variables
114 for (key, value) in &config.env {
115 cmd.env(key, value);
116 }
117
118 // Add the prompt as the final argument
119 cmd.arg(&config.prompt);
120
121 // Set buffering variables for real-time streaming
122 cmd.env("PYTHONUNBUFFERED", "1");
123 cmd.env("NODE_ENV", "production");
124
125 // Spawn the process with piped stdout/stderr
126 let mut child = cmd
127 .stdin(std::process::Stdio::null())
128 .stdout(std::process::Stdio::piped())
129 .stderr(std::process::Stdio::piped())
130 .spawn()?;
131
132 let stdout = child
133 .stdout
134 .take()
135 .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
136 let stderr = child
137 .stderr
138 .take()
139 .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
140
141 Ok(AgentChildHandle {
142 stdout: Box::new(stdout),
143 stderr: Box::new(stderr),
144 inner: Box::new(RealAgentChild(child)),
145 })
146 }
147
148 /// Check if a command exists and can be executed.
149 ///
150 /// This is a convenience method that executes a command with a
151 /// `--version` or similar flag to check if it's available.
152 ///
153 /// # Arguments
154 ///
155 /// * `command` - The program to check
156 ///
157 /// # Returns
158 ///
159 /// Returns `true` if command exists, `false` otherwise.
160 fn command_exists(&self, command: &str) -> bool {
161 match self.execute(command, &[], &[], None) {
162 Ok(output) => output.status.success(),
163 Err(_) => false,
164 }
165 }
166
167 /// Returns true if the process identified by `parent_pid` has at least one
168 /// live child process.
169 ///
170 /// Used by the idle-timeout monitor to avoid false-positive kills when the
171 /// agent has spawned a subprocess (e.g. `cargo test`, `npm install`) that is
172 /// still running even though the agent produces no stdout/stderr output.
173 ///
174 /// Default implementation: invokes `pgrep -P <pid>` on Unix platforms and
175 /// falls back to parsing `ps` output (PID/PPID pairs) if `pgrep` is unavailable.
176 /// Returns `false` (conservative no-op) on non-Unix.
177 ///
178 /// Any execution error is treated as "no children" to avoid blocking the
179 /// timeout system. If both `pgrep` and the `ps` fallback are unavailable or
180 /// fail unexpectedly, a one-time warning is emitted to stderr so operators can
181 /// diagnose reduced protection against false-positive idle kills.
182 fn has_active_child_processes(&self, parent_pid: u32) -> bool {
183 #[cfg(unix)]
184 {
185 use std::sync::OnceLock;
186
187 fn parse_ps_pid_ppid_pairs(stdout: &str, parent_pid: u32) -> Option<bool> {
188 let mut saw_parseable_line = false;
189 for line in stdout.lines() {
190 let mut parts = line.split_whitespace();
191 let Some(child_pid_text) = parts.next() else {
192 continue;
193 };
194 let Some(parent_pid_text) = parts.next() else {
195 continue;
196 };
197
198 let Ok(_pid) = child_pid_text.parse::<u32>() else {
199 continue;
200 };
201 let Ok(ppid) = parent_pid_text.parse::<u32>() else {
202 continue;
203 };
204
205 saw_parseable_line = true;
206 if ppid == parent_pid {
207 return Some(true);
208 }
209 }
210
211 if saw_parseable_line {
212 Some(false)
213 } else {
214 None
215 }
216 }
217
218 fn warn_child_process_detection_degraded() {
219 static WARNED: OnceLock<()> = OnceLock::new();
220 if WARNED.set(()).is_ok() {
221 eprintln!(
222 "Warning: child-process detection degraded (pgrep/ps unavailable or failing); idle-timeout false-positive prevention may be reduced"
223 );
224 }
225 }
226
227 let pid_str = parent_pid.to_string();
228
229 // Primary: `pgrep -P <pid>`
230 if let Ok(out) = self.execute("pgrep", &["-P", &pid_str], &[], None) {
231 let stdout = out.stdout.trim();
232
233 // pgrep exit codes: 0 = match, 1 = no match, 2 = error.
234 if !stdout.is_empty() {
235 return true;
236 }
237
238 if out.status.code() == Some(1) {
239 return false;
240 }
241 }
242
243 // Fallback: parse `ps` output. Avoid GNU-only flags like `--ppid`.
244 //
245 // - BSD/macOS: `ps -ax -o pid= -o ppid=`
246 // - GNU procps: `ps -e -o pid= -o ppid=`
247 let ps_attempts: [&[&str]; 2] = [
248 &["-ax", "-o", "pid=", "-o", "ppid="],
249 &["-e", "-o", "pid=", "-o", "ppid="],
250 ];
251
252 for args in ps_attempts {
253 if let Ok(out) = self.execute("ps", args, &[], None) {
254 if out.status.success() {
255 if let Some(has_children) = parse_ps_pid_ppid_pairs(&out.stdout, parent_pid)
256 {
257 return has_children;
258 }
259 }
260 }
261 }
262
263 warn_child_process_detection_degraded();
264 false
265 }
266 #[cfg(not(unix))]
267 {
268 let _ = parent_pid;
269 false
270 }
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277 use std::collections::HashMap;
278 use std::io;
279 use std::sync::Mutex;
280
281 #[cfg(unix)]
282 fn ok_output(stdout: &str) -> ProcessOutput {
283 use std::os::unix::process::ExitStatusExt;
284
285 ProcessOutput {
286 status: std::process::ExitStatus::from_raw(0),
287 stdout: stdout.to_string(),
288 stderr: String::new(),
289 }
290 }
291
292 #[cfg(unix)]
293 #[derive(Debug, Clone)]
294 enum TestResult {
295 Ok(ProcessOutput),
296 Err {
297 kind: io::ErrorKind,
298 message: String,
299 },
300 }
301
302 #[cfg(unix)]
303 impl TestResult {
304 fn to_io_result(&self) -> io::Result<ProcessOutput> {
305 match self {
306 Self::Ok(out) => Ok(out.clone()),
307 Self::Err { kind, message } => Err(io::Error::new(*kind, message.clone())),
308 }
309 }
310 }
311
312 #[cfg(unix)]
313 #[derive(Debug)]
314 struct TestExecutor {
315 calls: Mutex<Vec<(String, Vec<String>)>>,
316 results: HashMap<(String, Vec<String>), TestResult>,
317 }
318
319 #[cfg(unix)]
320 impl TestExecutor {
321 fn new(results: HashMap<(String, Vec<String>), TestResult>) -> Self {
322 Self {
323 calls: Mutex::new(Vec::new()),
324 results,
325 }
326 }
327
328 fn calls_for(&self, command: &str) -> Vec<(String, Vec<String>)> {
329 self.calls
330 .lock()
331 .unwrap()
332 .iter()
333 .filter(|(c, _)| c == command)
334 .cloned()
335 .collect()
336 }
337 }
338
339 #[cfg(unix)]
340 impl ProcessExecutor for TestExecutor {
341 fn execute(
342 &self,
343 command: &str,
344 args: &[&str],
345 _env: &[(String, String)],
346 _workdir: Option<&std::path::Path>,
347 ) -> std::io::Result<ProcessOutput> {
348 let key = (
349 command.to_string(),
350 args.iter().map(ToString::to_string).collect(),
351 );
352 self.calls.lock().unwrap().push(key.clone());
353 self.results.get(&key).map_or_else(
354 || Err(std::io::Error::other("unexpected execute")),
355 TestResult::to_io_result,
356 )
357 }
358 }
359
360 #[test]
361 #[cfg(unix)]
362 fn has_active_child_processes_falls_back_to_ps_when_pgrep_missing() {
363 let pid = 4242;
364 let pid_str = pid.to_string();
365
366 let mut results: HashMap<(String, Vec<String>), TestResult> = HashMap::new();
367 results.insert(
368 ("pgrep".to_string(), vec!["-P".to_string(), pid_str]),
369 TestResult::Err {
370 kind: io::ErrorKind::NotFound,
371 message: "pgrep missing".to_string(),
372 },
373 );
374 results.insert(
375 (
376 "ps".to_string(),
377 vec![
378 "-ax".to_string(),
379 "-o".to_string(),
380 "pid=".to_string(),
381 "-o".to_string(),
382 "ppid=".to_string(),
383 ],
384 ),
385 TestResult::Ok(ok_output("12345 4242\n")),
386 );
387
388 let exec = TestExecutor::new(results);
389 assert!(
390 exec.has_active_child_processes(pid),
391 "ps fallback should detect children when pgrep is unavailable"
392 );
393 assert!(
394 !exec.calls_for("ps").is_empty(),
395 "ps should be invoked as a fallback"
396 );
397 }
398}