Skip to main content

ralph_workflow/executor/
executor_trait.rs

1//! `ProcessExecutor` trait definition.
2//!
3//! This module defines the trait abstraction for process execution,
4//! enabling dependency injection for testing.
5
6use super::{AgentChildHandle, AgentSpawnConfig, ProcessOutput, RealAgentChild};
7use std::io;
8use std::path::Path;
9
10/// Trait for executing external processes.
11///
12/// This trait abstracts process execution to allow dependency injection.
13/// Production code uses `RealProcessExecutor` which calls actual commands.
14/// Test code can use `MockProcessExecutor` to control process behavior.
15///
16/// Only external process execution is abstracted. Internal code logic is never mocked.
17pub trait ProcessExecutor: Send + Sync + std::fmt::Debug {
18    /// Execute a command with given arguments and return its output.
19    ///
20    /// # Arguments
21    ///
22    /// * `command` - The program to execute
23    /// * `args` - Command-line arguments to pass to the program
24    /// * `env` - Environment variables to set for the process (optional)
25    /// * `workdir` - Working directory for the process (optional)
26    ///
27    /// # Returns
28    ///
29    /// Returns a `ProcessOutput` containing exit status, stdout, and stderr.
30    ///
31    /// # Errors
32    ///
33    /// Returns an error if command cannot be spawned or if output capture fails.
34    fn execute(
35        &self,
36        command: &str,
37        args: &[&str],
38        env: &[(String, String)],
39        workdir: Option<&Path>,
40    ) -> io::Result<ProcessOutput>;
41
42    /// Spawn a process with stdin input and return the child handle.
43    ///
44    /// This method is used when you need to write to the process's stdin
45    /// or stream its output in real-time. Unlike `execute()`, this returns
46    /// a `Child` handle for direct interaction.
47    ///
48    /// # Arguments
49    ///
50    /// * `command` - The program to execute
51    /// * `args` - Command-line arguments to pass to the program
52    /// * `env` - Environment variables to set for the process (optional)
53    /// * `workdir` - Working directory for the process (optional)
54    ///
55    /// # Returns
56    ///
57    /// Returns a `Child` handle that can be used to interact with the process.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if command cannot be spawned.
62    fn spawn(
63        &self,
64        command: &str,
65        args: &[&str],
66        env: &[(String, String)],
67        workdir: Option<&Path>,
68    ) -> io::Result<std::process::Child> {
69        let mut cmd = std::process::Command::new(command);
70        cmd.args(args);
71
72        for (key, value) in env {
73            cmd.env(key, value);
74        }
75
76        if let Some(dir) = workdir {
77            cmd.current_dir(dir);
78        }
79
80        cmd.stdin(std::process::Stdio::piped())
81            .stdout(std::process::Stdio::piped())
82            .stderr(std::process::Stdio::piped())
83            .spawn()
84    }
85
86    /// Spawn an agent process with streaming output support.
87    ///
88    /// This method is specifically designed for spawning AI agent subprocesses
89    /// that need to output streaming JSON in real-time. Unlike `spawn()`, this
90    /// returns a handle with boxed stdout for trait object compatibility.
91    ///
92    /// # Arguments
93    ///
94    /// * `config` - Agent spawn configuration including command, args, env, prompt, etc.
95    ///
96    /// # Returns
97    ///
98    /// Returns an `AgentChildHandle` with stdout, stderr, and the child process.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if the agent cannot be spawned.
103    ///
104    /// # Default Implementation
105    ///
106    /// The default implementation uses the `spawn()` method with additional
107    /// configuration for agent-specific needs. Mock implementations should
108    /// override this to return mock results without spawning real processes.
109    fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
110        let mut cmd = std::process::Command::new(&config.command);
111        cmd.args(&config.args);
112
113        // Set environment variables
114        for (key, value) in &config.env {
115            cmd.env(key, value);
116        }
117
118        // Add the prompt as the final argument
119        cmd.arg(&config.prompt);
120
121        // Set buffering variables for real-time streaming
122        cmd.env("PYTHONUNBUFFERED", "1");
123        cmd.env("NODE_ENV", "production");
124
125        // Spawn the process with piped stdout/stderr
126        let mut child = cmd
127            .stdin(std::process::Stdio::null())
128            .stdout(std::process::Stdio::piped())
129            .stderr(std::process::Stdio::piped())
130            .spawn()?;
131
132        let stdout = child
133            .stdout
134            .take()
135            .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
136        let stderr = child
137            .stderr
138            .take()
139            .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
140
141        Ok(AgentChildHandle {
142            stdout: Box::new(stdout),
143            stderr: Box::new(stderr),
144            inner: Box::new(RealAgentChild(child)),
145        })
146    }
147
148    /// Check if a command exists and can be executed.
149    ///
150    /// This is a convenience method that executes a command with a
151    /// `--version` or similar flag to check if it's available.
152    ///
153    /// # Arguments
154    ///
155    /// * `command` - The program to check
156    ///
157    /// # Returns
158    ///
159    /// Returns `true` if command exists, `false` otherwise.
160    fn command_exists(&self, command: &str) -> bool {
161        match self.execute(command, &[], &[], None) {
162            Ok(output) => output.status.success(),
163            Err(_) => false,
164        }
165    }
166
167    /// Returns true if the process identified by `parent_pid` has at least one
168    /// live child process.
169    ///
170    /// Used by the idle-timeout monitor to avoid false-positive kills when the
171    /// agent has spawned a subprocess (e.g. `cargo test`, `npm install`) that is
172    /// still running even though the agent produces no stdout/stderr output.
173    ///
174    /// Default implementation: invokes `pgrep -P <pid>` on Unix platforms and
175    /// falls back to parsing `ps` output (PID/PPID pairs) if `pgrep` is unavailable.
176    /// Returns `false` (conservative no-op) on non-Unix.
177    ///
178    /// Any execution error is treated as "no children" to avoid blocking the
179    /// timeout system. If both `pgrep` and the `ps` fallback are unavailable or
180    /// fail unexpectedly, a one-time warning is emitted to stderr so operators can
181    /// diagnose reduced protection against false-positive idle kills.
182    fn has_active_child_processes(&self, parent_pid: u32) -> bool {
183        #[cfg(unix)]
184        {
185            use std::sync::OnceLock;
186
187            fn parse_ps_pid_ppid_pairs(stdout: &str, parent_pid: u32) -> Option<bool> {
188                let mut saw_parseable_line = false;
189                for line in stdout.lines() {
190                    let mut parts = line.split_whitespace();
191                    let Some(child_pid_text) = parts.next() else {
192                        continue;
193                    };
194                    let Some(parent_pid_text) = parts.next() else {
195                        continue;
196                    };
197
198                    let Ok(_pid) = child_pid_text.parse::<u32>() else {
199                        continue;
200                    };
201                    let Ok(ppid) = parent_pid_text.parse::<u32>() else {
202                        continue;
203                    };
204
205                    saw_parseable_line = true;
206                    if ppid == parent_pid {
207                        return Some(true);
208                    }
209                }
210
211                if saw_parseable_line {
212                    Some(false)
213                } else {
214                    None
215                }
216            }
217
218            fn warn_child_process_detection_degraded() {
219                static WARNED: OnceLock<()> = OnceLock::new();
220                if WARNED.set(()).is_ok() {
221                    eprintln!(
222                        "Warning: child-process detection degraded (pgrep/ps unavailable or failing); idle-timeout false-positive prevention may be reduced"
223                    );
224                }
225            }
226
227            let pid_str = parent_pid.to_string();
228
229            // Primary: `pgrep -P <pid>`
230            if let Ok(out) = self.execute("pgrep", &["-P", &pid_str], &[], None) {
231                let stdout = out.stdout.trim();
232
233                // pgrep exit codes: 0 = match, 1 = no match, 2 = error.
234                if !stdout.is_empty() {
235                    return true;
236                }
237
238                if out.status.code() == Some(1) {
239                    return false;
240                }
241            }
242
243            // Fallback: parse `ps` output. Avoid GNU-only flags like `--ppid`.
244            //
245            // - BSD/macOS: `ps -ax -o pid= -o ppid=`
246            // - GNU procps: `ps -e -o pid= -o ppid=`
247            let ps_attempts: [&[&str]; 2] = [
248                &["-ax", "-o", "pid=", "-o", "ppid="],
249                &["-e", "-o", "pid=", "-o", "ppid="],
250            ];
251
252            for args in ps_attempts {
253                if let Ok(out) = self.execute("ps", args, &[], None) {
254                    if out.status.success() {
255                        if let Some(has_children) = parse_ps_pid_ppid_pairs(&out.stdout, parent_pid)
256                        {
257                            return has_children;
258                        }
259                    }
260                }
261            }
262
263            warn_child_process_detection_degraded();
264            false
265        }
266        #[cfg(not(unix))]
267        {
268            let _ = parent_pid;
269            false
270        }
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use std::collections::HashMap;
278    use std::io;
279    use std::sync::Mutex;
280
281    #[cfg(unix)]
282    fn ok_output(stdout: &str) -> ProcessOutput {
283        use std::os::unix::process::ExitStatusExt;
284
285        ProcessOutput {
286            status: std::process::ExitStatus::from_raw(0),
287            stdout: stdout.to_string(),
288            stderr: String::new(),
289        }
290    }
291
292    #[cfg(unix)]
293    #[derive(Debug, Clone)]
294    enum TestResult {
295        Ok(ProcessOutput),
296        Err {
297            kind: io::ErrorKind,
298            message: String,
299        },
300    }
301
302    #[cfg(unix)]
303    impl TestResult {
304        fn to_io_result(&self) -> io::Result<ProcessOutput> {
305            match self {
306                Self::Ok(out) => Ok(out.clone()),
307                Self::Err { kind, message } => Err(io::Error::new(*kind, message.clone())),
308            }
309        }
310    }
311
312    #[cfg(unix)]
313    #[derive(Debug)]
314    struct TestExecutor {
315        calls: Mutex<Vec<(String, Vec<String>)>>,
316        results: HashMap<(String, Vec<String>), TestResult>,
317    }
318
319    #[cfg(unix)]
320    impl TestExecutor {
321        fn new(results: HashMap<(String, Vec<String>), TestResult>) -> Self {
322            Self {
323                calls: Mutex::new(Vec::new()),
324                results,
325            }
326        }
327
328        fn calls_for(&self, command: &str) -> Vec<(String, Vec<String>)> {
329            self.calls
330                .lock()
331                .unwrap()
332                .iter()
333                .filter(|(c, _)| c == command)
334                .cloned()
335                .collect()
336        }
337    }
338
339    #[cfg(unix)]
340    impl ProcessExecutor for TestExecutor {
341        fn execute(
342            &self,
343            command: &str,
344            args: &[&str],
345            _env: &[(String, String)],
346            _workdir: Option<&std::path::Path>,
347        ) -> std::io::Result<ProcessOutput> {
348            let key = (
349                command.to_string(),
350                args.iter().map(ToString::to_string).collect(),
351            );
352            self.calls.lock().unwrap().push(key.clone());
353            self.results.get(&key).map_or_else(
354                || Err(std::io::Error::other("unexpected execute")),
355                TestResult::to_io_result,
356            )
357        }
358    }
359
360    #[test]
361    #[cfg(unix)]
362    fn has_active_child_processes_falls_back_to_ps_when_pgrep_missing() {
363        let pid = 4242;
364        let pid_str = pid.to_string();
365
366        let mut results: HashMap<(String, Vec<String>), TestResult> = HashMap::new();
367        results.insert(
368            ("pgrep".to_string(), vec!["-P".to_string(), pid_str]),
369            TestResult::Err {
370                kind: io::ErrorKind::NotFound,
371                message: "pgrep missing".to_string(),
372            },
373        );
374        results.insert(
375            (
376                "ps".to_string(),
377                vec![
378                    "-ax".to_string(),
379                    "-o".to_string(),
380                    "pid=".to_string(),
381                    "-o".to_string(),
382                    "ppid=".to_string(),
383                ],
384            ),
385            TestResult::Ok(ok_output("12345 4242\n")),
386        );
387
388        let exec = TestExecutor::new(results);
389        assert!(
390            exec.has_active_child_processes(pid),
391            "ps fallback should detect children when pgrep is unavailable"
392        );
393        assert!(
394            !exec.calls_for("ps").is_empty(),
395            "ps should be invoked as a fallback"
396        );
397    }
398}