Skip to main content

ralph_workflow/executor/
real.rs

1//! Real process executor implementation.
2//!
3//! This module provides the production implementation that spawns actual processes
4//! using `std::process::Command`.
5
6use super::{
7    AgentChildHandle, AgentSpawnConfig, ProcessExecutor, ProcessOutput, RealAgentChild,
8    SpawnedProcess,
9};
10use std::io;
11use std::path::Path;
12
13#[cfg(unix)]
14fn set_nonblocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
15    // Make the file descriptor non-blocking so readers can poll/cancel without
16    // getting stuck in a blocking read().
17    //
18    // Safety: fcntl is called with a valid fd owned by this process.
19    unsafe {
20        let flags = libc::fcntl(fd, libc::F_GETFL);
21        if flags < 0 {
22            return Err(io::Error::last_os_error());
23        }
24        if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
25            return Err(io::Error::last_os_error());
26        }
27    }
28    Ok(())
29}
30
31#[cfg(unix)]
32fn terminate_child_best_effort(child: &mut std::process::Child) {
33    let pid = child.id().min(i32::MAX as u32).cast_signed();
34
35    unsafe {
36        let _ = libc::kill(-pid, libc::SIGTERM);
37        let _ = libc::kill(pid, libc::SIGTERM);
38    }
39
40    wait_for_termination_or_send_sigkill(child, pid);
41}
42
43#[cfg(unix)]
44fn poll_child_once(child: &mut std::process::Child) -> bool {
45    use std::time::Duration;
46    match child.try_wait() {
47        Ok(Some(_)) | Err(_) => true,
48        Ok(None) => {
49            std::thread::sleep(Duration::from_millis(10));
50            false
51        }
52    }
53}
54
55#[cfg(unix)]
56fn wait_until_deadline(child: &mut std::process::Child, deadline: std::time::Instant) {
57    use std::time::Instant;
58    while Instant::now() < deadline {
59        if poll_child_once(child) {
60            return;
61        }
62    }
63}
64
65#[cfg(unix)]
66fn wait_for_termination_or_send_sigkill(child: &mut std::process::Child, pid: i32) {
67    let (term_deadline, kill_deadline) = compute_termination_deadlines();
68    wait_until_deadline(child, term_deadline);
69    send_sigkill(pid);
70    wait_until_deadline(child, kill_deadline);
71}
72
73#[cfg(unix)]
74fn compute_termination_deadlines() -> (std::time::Instant, std::time::Instant) {
75    use std::time::{Duration, Instant};
76
77    let term_deadline = Instant::now() + Duration::from_millis(250);
78    let kill_deadline = term_deadline + Duration::from_millis(500);
79    (term_deadline, kill_deadline)
80}
81
82#[cfg(unix)]
83fn send_sigkill(pid: i32) {
84    unsafe {
85        let _ = libc::kill(-pid, libc::SIGKILL);
86        let _ = libc::kill(pid, libc::SIGKILL);
87    }
88}
89
90#[cfg(unix)]
91fn ensure_nonblocking_or_terminate(
92    child: &mut std::process::Child,
93    stdout_fd: std::os::unix::io::RawFd,
94    stderr_fd: std::os::unix::io::RawFd,
95) -> io::Result<()> {
96    if let Err(e) = set_nonblocking_fd(stdout_fd) {
97        terminate_child_best_effort(child);
98        return Err(e);
99    }
100
101    if let Err(e) = set_nonblocking_fd(stderr_fd) {
102        terminate_child_best_effort(child);
103        return Err(e);
104    }
105
106    Ok(())
107}
108
109fn wrap_process_output(output: std::process::Output) -> ProcessOutput {
110    ProcessOutput {
111        status: output.status,
112        stdout: String::from_utf8_lossy(&output.stdout).to_string(),
113        stderr: String::from_utf8_lossy(&output.stderr).to_string(),
114    }
115}
116
117/// Real process executor that uses `std::process::Command`.
118///
119/// This is the production implementation that spawns actual processes.
120#[derive(Debug, Clone, Default)]
121pub struct RealProcessExecutor;
122
123impl RealProcessExecutor {
124    /// Create a new `RealProcessExecutor`.
125    #[must_use]
126    pub const fn new() -> Self {
127        Self
128    }
129}
130
131impl ProcessExecutor for RealProcessExecutor {
132    fn execute(
133        &self,
134        command: &str,
135        args: &[&str],
136        env: &[(String, String)],
137        workdir: Option<&Path>,
138    ) -> io::Result<ProcessOutput> {
139        let output = build_and_run_command(command, args, env, workdir)?;
140        Ok(wrap_process_output(output))
141    }
142
143    fn spawn(
144        &self,
145        command: &str,
146        args: &[&str],
147        env: &[(String, String)],
148        workdir: Option<&Path>,
149    ) -> io::Result<SpawnedProcess> {
150        let mut cmd = std::process::Command::new(command);
151        cmd.args(args);
152        env.iter().for_each(|(k, v)| {
153            cmd.env(k, v);
154        });
155        if let Some(dir) = workdir {
156            cmd.current_dir(dir);
157        }
158        let mut child = cmd
159            .stdin(std::process::Stdio::piped())
160            .stdout(std::process::Stdio::piped())
161            .stderr(std::process::Stdio::piped())
162            .spawn()?;
163        let stdin = child.stdin.take();
164        Ok(SpawnedProcess {
165            stdin,
166            inner: child,
167        })
168    }
169
170    fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
171        let mut cmd = build_agent_command(config);
172        let child = spawn_agent_child(&mut cmd)?;
173        finalize_agent_child(child)
174    }
175}
176
177fn take_child_streams(
178    child: &mut std::process::Child,
179) -> io::Result<(std::process::ChildStdout, std::process::ChildStderr)> {
180    let stdout = child
181        .stdout
182        .take()
183        .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
184    let stderr = child
185        .stderr
186        .take()
187        .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
188    Ok((stdout, stderr))
189}
190
191fn finalize_agent_child(mut child: std::process::Child) -> io::Result<AgentChildHandle> {
192    let (stdout, stderr) = take_child_streams(&mut child)?;
193
194    #[cfg(unix)]
195    {
196        use std::os::unix::io::AsRawFd;
197        ensure_nonblocking_or_terminate(&mut child, stdout.as_raw_fd(), stderr.as_raw_fd())?;
198    }
199
200    #[cfg(not(unix))]
201    let _ = (&child, &stdout, &stderr);
202
203    Ok(AgentChildHandle {
204        stdout: Box::new(stdout),
205        stderr: Box::new(stderr),
206        inner: Box::new(RealAgentChild(child)),
207    })
208}
209
210fn build_agent_command(config: &AgentSpawnConfig) -> std::process::Command {
211    let mut cmd = std::process::Command::new(&config.command);
212    cmd.args(&config.args);
213    config.env.iter().for_each(|(k, v)| {
214        cmd.env(k, v);
215    });
216    cmd.arg(&config.prompt);
217    cmd.env("PYTHONUNBUFFERED", "1");
218    cmd.env("NODE_ENV", "production");
219
220    #[cfg(unix)]
221    unsafe {
222        use std::os::unix::process::CommandExt;
223        cmd.pre_exec(|| {
224            if libc::setpgid(0, 0) != 0 {
225                return Err(io::Error::last_os_error());
226            }
227            Ok(())
228        });
229    }
230
231    cmd.stdin(std::process::Stdio::null());
232    cmd.stdout(std::process::Stdio::piped());
233    cmd.stderr(std::process::Stdio::piped());
234    cmd
235}
236
237fn spawn_agent_child(cmd: &mut std::process::Command) -> io::Result<std::process::Child> {
238    cmd.spawn()
239}
240
241fn build_and_run_command(
242    command: &str,
243    args: &[&str],
244    env: &[(String, String)],
245    workdir: Option<&Path>,
246) -> io::Result<std::process::Output> {
247    let mut cmd = std::process::Command::new(command);
248    cmd.args(args);
249    env.iter().for_each(|(k, v)| {
250        cmd.env(k, v);
251    });
252    if let Some(dir) = workdir {
253        cmd.current_dir(dir);
254    }
255    cmd.output()
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    #[test]
263    #[cfg(unix)]
264    fn ensure_nonblocking_or_terminate_kills_child_on_failure() {
265        use std::process::Command;
266        use std::time::{Duration, Instant};
267
268        let mut child = Command::new("sleep")
269            .arg("60")
270            .stdin(std::process::Stdio::null())
271            .stdout(std::process::Stdio::null())
272            .stderr(std::process::Stdio::null())
273            .spawn()
274            .expect("spawn sleep");
275
276        let result = ensure_nonblocking_or_terminate(&mut child, -1, -1);
277        assert!(result.is_err(), "expected nonblocking setup to fail");
278
279        let deadline = Instant::now() + Duration::from_secs(2);
280        let mut exited = false;
281        while Instant::now() < deadline {
282            if !matches!(child.try_wait(), Ok(None)) {
283                exited = true;
284                break;
285            }
286            std::thread::sleep(Duration::from_millis(10));
287        }
288
289        if !exited {
290            let _ = child.kill();
291            let _ = child.wait();
292        }
293
294        assert!(
295            exited,
296            "expected child to be terminated when nonblocking setup fails"
297        );
298    }
299}