Skip to main content

ralph_workflow/executor/
real.rs

1//! Real process executor implementation.
2//!
3//! This module provides the production implementation that spawns actual processes
4//! using `std::process::Command`.
5
6use super::{AgentChildHandle, AgentSpawnConfig, ProcessExecutor, ProcessOutput, RealAgentChild};
7use std::io;
8use std::path::Path;
9
10#[cfg(unix)]
11fn set_nonblocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
12    // Make the file descriptor non-blocking so readers can poll/cancel without
13    // getting stuck in a blocking read().
14    //
15    // Safety: fcntl is called with a valid fd owned by this process.
16    unsafe {
17        let flags = libc::fcntl(fd, libc::F_GETFL);
18        if flags < 0 {
19            return Err(io::Error::last_os_error());
20        }
21        if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
22            return Err(io::Error::last_os_error());
23        }
24    }
25    Ok(())
26}
27
28#[cfg(unix)]
29fn ensure_nonblocking_or_terminate(
30    child: &mut std::process::Child,
31    stdout_fd: std::os::unix::io::RawFd,
32    stderr_fd: std::os::unix::io::RawFd,
33) -> io::Result<()> {
34    fn terminate_child_best_effort(child: &mut std::process::Child) {
35        use std::time::{Duration, Instant};
36
37        let pid = child.id() as i32;
38
39        // Prefer killing the process group first (agent is in its own pgid).
40        unsafe {
41            let _ = libc::kill(-pid, libc::SIGTERM);
42            let _ = libc::kill(pid, libc::SIGTERM);
43        }
44
45        let term_deadline = Instant::now() + Duration::from_millis(250);
46        while Instant::now() < term_deadline {
47            match child.try_wait() {
48                Ok(Some(_)) | Err(_) => return,
49                Ok(None) => std::thread::sleep(Duration::from_millis(10)),
50            }
51        }
52
53        unsafe {
54            let _ = libc::kill(-pid, libc::SIGKILL);
55            let _ = libc::kill(pid, libc::SIGKILL);
56        }
57
58        let kill_deadline = Instant::now() + Duration::from_millis(500);
59        while Instant::now() < kill_deadline {
60            match child.try_wait() {
61                Ok(Some(_)) | Err(_) => return,
62                Ok(None) => std::thread::sleep(Duration::from_millis(10)),
63            }
64        }
65    }
66
67    if let Err(e) = set_nonblocking_fd(stdout_fd) {
68        terminate_child_best_effort(child);
69        return Err(e);
70    }
71
72    if let Err(e) = set_nonblocking_fd(stderr_fd) {
73        terminate_child_best_effort(child);
74        return Err(e);
75    }
76
77    Ok(())
78}
79
80/// Real process executor that uses `std::process::Command`.
81///
82/// This is the production implementation that spawns actual processes.
83#[derive(Debug, Clone, Default)]
84pub struct RealProcessExecutor;
85
86impl RealProcessExecutor {
87    /// Create a new RealProcessExecutor.
88    pub fn new() -> Self {
89        Self
90    }
91}
92
93impl ProcessExecutor for RealProcessExecutor {
94    fn execute(
95        &self,
96        command: &str,
97        args: &[&str],
98        env: &[(String, String)],
99        workdir: Option<&Path>,
100    ) -> io::Result<ProcessOutput> {
101        let mut cmd = std::process::Command::new(command);
102        cmd.args(args);
103
104        for (key, value) in env {
105            cmd.env(key, value);
106        }
107
108        if let Some(dir) = workdir {
109            cmd.current_dir(dir);
110        }
111
112        let output = cmd.output()?;
113
114        Ok(ProcessOutput {
115            status: output.status,
116            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
117            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
118        })
119    }
120
121    fn spawn(
122        &self,
123        command: &str,
124        args: &[&str],
125        env: &[(String, String)],
126        workdir: Option<&Path>,
127    ) -> io::Result<std::process::Child> {
128        let mut cmd = std::process::Command::new(command);
129        cmd.args(args);
130
131        for (key, value) in env {
132            cmd.env(key, value);
133        }
134
135        if let Some(dir) = workdir {
136            cmd.current_dir(dir);
137        }
138
139        cmd.stdin(std::process::Stdio::piped())
140            .stdout(std::process::Stdio::piped())
141            .stderr(std::process::Stdio::piped())
142            .spawn()
143    }
144
145    fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
146        let mut cmd = std::process::Command::new(&config.command);
147        cmd.args(&config.args);
148
149        // Set environment variables
150        for (key, value) in &config.env {
151            cmd.env(key, value);
152        }
153
154        // Add the prompt as the final argument
155        cmd.arg(&config.prompt);
156
157        // Set buffering variables for real-time streaming
158        cmd.env("PYTHONUNBUFFERED", "1");
159        cmd.env("NODE_ENV", "production");
160
161        // Put the agent in its own process group so idle-timeout enforcement can
162        // terminate the whole subtree (and not just the direct child PID).
163        #[cfg(unix)]
164        unsafe {
165            use std::os::unix::process::CommandExt;
166            cmd.pre_exec(|| {
167                if libc::setpgid(0, 0) != 0 {
168                    return Err(io::Error::last_os_error());
169                }
170                Ok(())
171            });
172        }
173
174        // Spawn the process with piped stdout/stderr
175        let mut child = cmd
176            .stdin(std::process::Stdio::null())
177            .stdout(std::process::Stdio::piped())
178            .stderr(std::process::Stdio::piped())
179            .spawn()?;
180
181        let stdout = child
182            .stdout
183            .take()
184            .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
185        let stderr = child
186            .stderr
187            .take()
188            .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
189
190        // The stderr collector and stdout pump rely on non-blocking reads so they can
191        // be cancelled promptly (idle timeout, early failures).
192        #[cfg(unix)]
193        {
194            use std::os::unix::io::AsRawFd;
195            ensure_nonblocking_or_terminate(&mut child, stdout.as_raw_fd(), stderr.as_raw_fd())?;
196        }
197
198        Ok(AgentChildHandle {
199            stdout: Box::new(stdout),
200            stderr: Box::new(stderr),
201            inner: Box::new(RealAgentChild(child)),
202        })
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    #[cfg(unix)]
212    fn ensure_nonblocking_or_terminate_kills_child_on_failure() {
213        use std::process::Command;
214        use std::time::{Duration, Instant};
215
216        let mut child = Command::new("sleep")
217            .arg("60")
218            .stdin(std::process::Stdio::null())
219            .stdout(std::process::Stdio::null())
220            .stderr(std::process::Stdio::null())
221            .spawn()
222            .expect("spawn sleep");
223
224        let result = ensure_nonblocking_or_terminate(&mut child, -1, -1);
225        assert!(result.is_err(), "expected nonblocking setup to fail");
226
227        let deadline = Instant::now() + Duration::from_secs(2);
228        let mut exited = false;
229        while Instant::now() < deadline {
230            match child.try_wait() {
231                Ok(Some(_)) => {
232                    exited = true;
233                    break;
234                }
235                Ok(None) => std::thread::sleep(Duration::from_millis(10)),
236                Err(_) => {
237                    exited = true;
238                    break;
239                }
240            }
241        }
242
243        // Ensure we don't leave a live subprocess behind even if the assertion fails.
244        if !exited {
245            let _ = child.kill();
246            let _ = child.wait();
247        }
248
249        assert!(
250            exited,
251            "expected child to be terminated when nonblocking setup fails"
252        );
253    }
254}