Skip to main content

ralph_workflow/executor/
real.rs

1//! Real process executor implementation.
2//!
3//! This module provides the production implementation that spawns actual processes
4//! using `std::process::Command`.
5
6use super::{AgentChildHandle, AgentSpawnConfig, ProcessExecutor, ProcessOutput, RealAgentChild};
7use std::io;
8use std::path::Path;
9
10#[cfg(unix)]
11fn set_nonblocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
12    // Make the file descriptor non-blocking so readers can poll/cancel without
13    // getting stuck in a blocking read().
14    //
15    // Safety: fcntl is called with a valid fd owned by this process.
16    unsafe {
17        let flags = libc::fcntl(fd, libc::F_GETFL);
18        if flags < 0 {
19            return Err(io::Error::last_os_error());
20        }
21        if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
22            return Err(io::Error::last_os_error());
23        }
24    }
25    Ok(())
26}
27
28#[cfg(unix)]
29fn ensure_nonblocking_or_terminate(
30    child: &mut std::process::Child,
31    stdout_fd: std::os::unix::io::RawFd,
32    stderr_fd: std::os::unix::io::RawFd,
33) -> io::Result<()> {
34    fn terminate_child_best_effort(child: &mut std::process::Child) {
35        use std::time::{Duration, Instant};
36
37        let pid = child.id().min(i32::MAX as u32).cast_signed();
38
39        // Prefer killing the process group first (agent is in its own pgid).
40        unsafe {
41            let _ = libc::kill(-pid, libc::SIGTERM);
42            let _ = libc::kill(pid, libc::SIGTERM);
43        }
44
45        let term_deadline = Instant::now() + Duration::from_millis(250);
46        while Instant::now() < term_deadline {
47            match child.try_wait() {
48                Ok(Some(_)) | Err(_) => return,
49                Ok(None) => std::thread::sleep(Duration::from_millis(10)),
50            }
51        }
52
53        unsafe {
54            let _ = libc::kill(-pid, libc::SIGKILL);
55            let _ = libc::kill(pid, libc::SIGKILL);
56        }
57
58        let kill_deadline = Instant::now() + Duration::from_millis(500);
59        while Instant::now() < kill_deadline {
60            match child.try_wait() {
61                Ok(Some(_)) | Err(_) => return,
62                Ok(None) => std::thread::sleep(Duration::from_millis(10)),
63            }
64        }
65    }
66
67    if let Err(e) = set_nonblocking_fd(stdout_fd) {
68        terminate_child_best_effort(child);
69        return Err(e);
70    }
71
72    if let Err(e) = set_nonblocking_fd(stderr_fd) {
73        terminate_child_best_effort(child);
74        return Err(e);
75    }
76
77    Ok(())
78}
79
80/// Real process executor that uses `std::process::Command`.
81///
82/// This is the production implementation that spawns actual processes.
83#[derive(Debug, Clone, Default)]
84pub struct RealProcessExecutor;
85
86impl RealProcessExecutor {
87    /// Create a new `RealProcessExecutor`.
88    #[must_use]
89    pub const fn new() -> Self {
90        Self
91    }
92}
93
94impl ProcessExecutor for RealProcessExecutor {
95    fn execute(
96        &self,
97        command: &str,
98        args: &[&str],
99        env: &[(String, String)],
100        workdir: Option<&Path>,
101    ) -> io::Result<ProcessOutput> {
102        let mut cmd = std::process::Command::new(command);
103        cmd.args(args);
104
105        for (key, value) in env {
106            cmd.env(key, value);
107        }
108
109        if let Some(dir) = workdir {
110            cmd.current_dir(dir);
111        }
112
113        let output = cmd.output()?;
114
115        Ok(ProcessOutput {
116            status: output.status,
117            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
118            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
119        })
120    }
121
122    fn spawn(
123        &self,
124        command: &str,
125        args: &[&str],
126        env: &[(String, String)],
127        workdir: Option<&Path>,
128    ) -> io::Result<std::process::Child> {
129        let mut cmd = std::process::Command::new(command);
130        cmd.args(args);
131
132        for (key, value) in env {
133            cmd.env(key, value);
134        }
135
136        if let Some(dir) = workdir {
137            cmd.current_dir(dir);
138        }
139
140        cmd.stdin(std::process::Stdio::piped())
141            .stdout(std::process::Stdio::piped())
142            .stderr(std::process::Stdio::piped())
143            .spawn()
144    }
145
146    fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
147        let mut cmd = std::process::Command::new(&config.command);
148        cmd.args(&config.args);
149
150        // Set environment variables
151        for (key, value) in &config.env {
152            cmd.env(key, value);
153        }
154
155        // Add the prompt as the final argument
156        cmd.arg(&config.prompt);
157
158        // Set buffering variables for real-time streaming
159        cmd.env("PYTHONUNBUFFERED", "1");
160        cmd.env("NODE_ENV", "production");
161
162        // Put the agent in its own process group so idle-timeout enforcement can
163        // terminate the whole subtree (and not just the direct child PID).
164        #[cfg(unix)]
165        unsafe {
166            use std::os::unix::process::CommandExt;
167            cmd.pre_exec(|| {
168                if libc::setpgid(0, 0) != 0 {
169                    return Err(io::Error::last_os_error());
170                }
171                Ok(())
172            });
173        }
174
175        // Spawn the process with piped stdout/stderr
176        let mut child = cmd
177            .stdin(std::process::Stdio::null())
178            .stdout(std::process::Stdio::piped())
179            .stderr(std::process::Stdio::piped())
180            .spawn()?;
181
182        let stdout = child
183            .stdout
184            .take()
185            .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
186        let stderr = child
187            .stderr
188            .take()
189            .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
190
191        // The stderr collector and stdout pump rely on non-blocking reads so they can
192        // be cancelled promptly (idle timeout, early failures).
193        #[cfg(unix)]
194        {
195            use std::os::unix::io::AsRawFd;
196            ensure_nonblocking_or_terminate(&mut child, stdout.as_raw_fd(), stderr.as_raw_fd())?;
197        }
198
199        Ok(AgentChildHandle {
200            stdout: Box::new(stdout),
201            stderr: Box::new(stderr),
202            inner: Box::new(RealAgentChild(child)),
203        })
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    #[cfg(unix)]
213    fn ensure_nonblocking_or_terminate_kills_child_on_failure() {
214        use std::process::Command;
215        use std::time::{Duration, Instant};
216
217        let mut child = Command::new("sleep")
218            .arg("60")
219            .stdin(std::process::Stdio::null())
220            .stdout(std::process::Stdio::null())
221            .stderr(std::process::Stdio::null())
222            .spawn()
223            .expect("spawn sleep");
224
225        let result = ensure_nonblocking_or_terminate(&mut child, -1, -1);
226        assert!(result.is_err(), "expected nonblocking setup to fail");
227
228        let deadline = Instant::now() + Duration::from_secs(2);
229        let mut exited = false;
230        while Instant::now() < deadline {
231            if !matches!(child.try_wait(), Ok(None)) {
232                exited = true;
233                break;
234            }
235            std::thread::sleep(Duration::from_millis(10));
236        }
237
238        // Ensure we don't leave a live subprocess behind even if the assertion fails.
239        if !exited {
240            let _ = child.kill();
241            let _ = child.wait();
242        }
243
244        assert!(
245            exited,
246            "expected child to be terminated when nonblocking setup fails"
247        );
248    }
249}