Skip to main content

ralph_workflow/executor/
real.rs

1//! Real process executor implementation.
2//!
3//! This module provides the production implementation that spawns actual processes
4//! using `std::process::Command`.
5
6use super::{
7    AgentChildHandle, AgentSpawnConfig, ProcessExecutor, ProcessOutput, RealAgentChild,
8    SpawnedProcess,
9};
10use std::io;
11use std::path::Path;
12
13#[cfg(unix)]
14fn set_nonblocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
15    // Make the file descriptor non-blocking so readers can poll/cancel without
16    // getting stuck in a blocking read().
17    //
18    // Safety: fcntl is called with a valid fd owned by this process.
19    unsafe {
20        let flags = libc::fcntl(fd, libc::F_GETFL);
21        if flags < 0 {
22            return Err(io::Error::last_os_error());
23        }
24        if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
25            return Err(io::Error::last_os_error());
26        }
27    }
28    Ok(())
29}
30
31#[cfg(unix)]
32fn terminate_child_best_effort(child: &mut std::process::Child) {
33    let pid = child.id().min(i32::MAX as u32).cast_signed();
34
35    unsafe {
36        let _ = libc::kill(-pid, libc::SIGTERM);
37        let _ = libc::kill(pid, libc::SIGTERM);
38    }
39
40    wait_for_termination_or_send_sigkill(child, pid);
41}
42
43#[cfg(unix)]
44fn poll_child_once(child: &mut std::process::Child) -> bool {
45    use std::time::Duration;
46    match child.try_wait() {
47        Ok(Some(_)) | Err(_) => true,
48        Ok(None) => {
49            std::thread::sleep(Duration::from_millis(10));
50            false
51        }
52    }
53}
54
55#[cfg(unix)]
56fn wait_until_deadline(child: &mut std::process::Child, deadline: std::time::Instant) {
57    use std::time::Instant;
58    while Instant::now() < deadline {
59        if poll_child_once(child) {
60            return;
61        }
62    }
63}
64
65#[cfg(unix)]
66fn wait_for_termination_or_send_sigkill(child: &mut std::process::Child, pid: i32) {
67    let (term_deadline, kill_deadline) = compute_termination_deadlines();
68    wait_until_deadline(child, term_deadline);
69    send_sigkill(pid);
70    wait_until_deadline(child, kill_deadline);
71}
72
73#[cfg(unix)]
74fn compute_termination_deadlines() -> (std::time::Instant, std::time::Instant) {
75    use std::time::{Duration, Instant};
76
77    let term_deadline = Instant::now() + Duration::from_millis(250);
78    let kill_deadline = term_deadline + Duration::from_millis(500);
79    (term_deadline, kill_deadline)
80}
81
82#[cfg(unix)]
83fn send_sigkill(pid: i32) {
84    unsafe {
85        let _ = libc::kill(-pid, libc::SIGKILL);
86        let _ = libc::kill(pid, libc::SIGKILL);
87    }
88}
89
90#[cfg(unix)]
91fn ensure_nonblocking_or_terminate(
92    child: &mut std::process::Child,
93    stdout_fd: std::os::unix::io::RawFd,
94    stderr_fd: std::os::unix::io::RawFd,
95) -> io::Result<()> {
96    if let Err(e) = set_nonblocking_fd(stdout_fd) {
97        terminate_child_best_effort(child);
98        return Err(e);
99    }
100
101    if let Err(e) = set_nonblocking_fd(stderr_fd) {
102        terminate_child_best_effort(child);
103        return Err(e);
104    }
105
106    Ok(())
107}
108
109fn wrap_process_output(output: std::process::Output) -> ProcessOutput {
110    ProcessOutput {
111        status: output.status,
112        stdout: String::from_utf8_lossy(&output.stdout).to_string(),
113        stderr: String::from_utf8_lossy(&output.stderr).to_string(),
114    }
115}
116
117/// Real process executor that uses `std::process::Command`.
118///
119/// This is the production implementation that spawns actual processes.
120#[derive(Debug, Clone, Default)]
121pub struct RealProcessExecutor;
122
123impl RealProcessExecutor {
124    /// Create a new `RealProcessExecutor`.
125    #[must_use]
126    pub const fn new() -> Self {
127        Self
128    }
129}
130
131impl ProcessExecutor for RealProcessExecutor {
132    fn execute(
133        &self,
134        command: &str,
135        args: &[&str],
136        env: &[(String, String)],
137        workdir: Option<&Path>,
138    ) -> io::Result<ProcessOutput> {
139        let output = build_and_run_command(command, args, env, workdir)?;
140        Ok(wrap_process_output(output))
141    }
142
143    fn spawn(
144        &self,
145        command: &str,
146        args: &[&str],
147        env: &[(String, String)],
148        workdir: Option<&Path>,
149    ) -> io::Result<SpawnedProcess> {
150        let mut cmd = std::process::Command::new(command);
151        cmd.args(args);
152        env.iter().for_each(|(k, v)| {
153            cmd.env(k, v);
154        });
155        if let Some(dir) = workdir {
156            cmd.current_dir(dir);
157        }
158        let mut child = cmd
159            .stdin(std::process::Stdio::piped())
160            .stdout(std::process::Stdio::piped())
161            .stderr(std::process::Stdio::piped())
162            .spawn()?;
163        let stdin = child.stdin.take();
164        Ok(SpawnedProcess {
165            stdin,
166            inner: child,
167        })
168    }
169
170    fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
171        let mut cmd = build_agent_command(config);
172        let child = spawn_agent_child(&mut cmd)?;
173        finalize_agent_child(child)
174    }
175
176    #[cfg(unix)]
177    fn kill_process_group(&self, pgid: u32) -> io::Result<()> {
178        // Send SIGKILL to the entire process group using negative PID.
179        // This kills all processes in the group, which is what we want for zombie cleanup.
180        // Reject pgid == 0 since kill(0, SIGKILL) sends to the caller's process group,
181        // which is almost certainly not the intended target for zombie reaping.
182        let pgid_t = libc::pid_t::try_from(pgid)
183            .map_err(|_| io::Error::other(format!("pgid {pgid} does not fit in pid_t")))?;
184        if pgid_t == 0 {
185            return Err(io::Error::other(
186                "pgid must not be 0: kill(0, SIGKILL) sends to the caller's process group",
187            ));
188        }
189        let result = unsafe { libc::kill(-pgid_t, libc::SIGKILL) };
190        if result == 0 {
191            Ok(())
192        } else {
193            Err(io::Error::last_os_error())
194        }
195    }
196}
197
198fn take_child_streams(
199    child: &mut std::process::Child,
200) -> io::Result<(std::process::ChildStdout, std::process::ChildStderr)> {
201    let stdout = child
202        .stdout
203        .take()
204        .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
205    let stderr = child
206        .stderr
207        .take()
208        .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
209    Ok((stdout, stderr))
210}
211
212fn finalize_agent_child(mut child: std::process::Child) -> io::Result<AgentChildHandle> {
213    let (stdout, stderr) = take_child_streams(&mut child)?;
214
215    #[cfg(unix)]
216    {
217        use std::os::unix::io::AsRawFd;
218        ensure_nonblocking_or_terminate(&mut child, stdout.as_raw_fd(), stderr.as_raw_fd())?;
219    }
220
221    #[cfg(not(unix))]
222    let _ = (&child, &stdout, &stderr);
223
224    Ok(AgentChildHandle {
225        stdout: Box::new(stdout),
226        stderr: Box::new(stderr),
227        inner: Box::new(RealAgentChild(child)),
228    })
229}
230
231fn build_agent_command(config: &AgentSpawnConfig) -> std::process::Command {
232    let mut cmd = std::process::Command::new(&config.command);
233    cmd.args(&config.args);
234    config.env.iter().for_each(|(k, v)| {
235        cmd.env(k, v);
236    });
237    cmd.arg(&config.prompt);
238    cmd.env("PYTHONUNBUFFERED", "1");
239    cmd.env("NODE_ENV", "production");
240
241    #[cfg(unix)]
242    unsafe {
243        use std::os::unix::process::CommandExt;
244        cmd.pre_exec(|| {
245            if libc::setpgid(0, 0) != 0 {
246                return Err(io::Error::last_os_error());
247            }
248            Ok(())
249        });
250    }
251
252    cmd.stdin(std::process::Stdio::null());
253    cmd.stdout(std::process::Stdio::piped());
254    cmd.stderr(std::process::Stdio::piped());
255    cmd
256}
257
258fn spawn_agent_child(cmd: &mut std::process::Command) -> io::Result<std::process::Child> {
259    cmd.spawn()
260}
261
262fn build_and_run_command(
263    command: &str,
264    args: &[&str],
265    env: &[(String, String)],
266    workdir: Option<&Path>,
267) -> io::Result<std::process::Output> {
268    let mut cmd = std::process::Command::new(command);
269    cmd.args(args);
270    env.iter().for_each(|(k, v)| {
271        cmd.env(k, v);
272    });
273    if let Some(dir) = workdir {
274        cmd.current_dir(dir);
275    }
276    cmd.output()
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    #[cfg(unix)]
285    fn ensure_nonblocking_or_terminate_kills_child_on_failure() {
286        use std::process::Command;
287        use std::time::{Duration, Instant};
288
289        let mut child = Command::new("sleep")
290            .arg("60")
291            .stdin(std::process::Stdio::null())
292            .stdout(std::process::Stdio::null())
293            .stderr(std::process::Stdio::null())
294            .spawn()
295            .expect("spawn sleep");
296
297        let result = ensure_nonblocking_or_terminate(&mut child, -1, -1);
298        assert!(result.is_err(), "expected nonblocking setup to fail");
299
300        let deadline = Instant::now() + Duration::from_secs(2);
301        let mut exited = false;
302        while Instant::now() < deadline {
303            if !matches!(child.try_wait(), Ok(None)) {
304                exited = true;
305                break;
306            }
307            std::thread::sleep(Duration::from_millis(10));
308        }
309
310        if !exited {
311            let _ = child.kill();
312            let _ = child.wait();
313        }
314
315        assert!(
316            exited,
317            "expected child to be terminated when nonblocking setup fails"
318        );
319    }
320}