ralph_workflow/executor/
real.rs1use super::{
7 AgentChildHandle, AgentSpawnConfig, ProcessExecutor, ProcessOutput, RealAgentChild,
8 SpawnedProcess,
9};
10use std::io;
11use std::path::Path;
12
13#[cfg(unix)]
14fn set_nonblocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
15 unsafe {
20 let flags = libc::fcntl(fd, libc::F_GETFL);
21 if flags < 0 {
22 return Err(io::Error::last_os_error());
23 }
24 if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
25 return Err(io::Error::last_os_error());
26 }
27 }
28 Ok(())
29}
30
31#[cfg(unix)]
32fn terminate_child_best_effort(child: &mut std::process::Child) {
33 let pid = child.id().min(i32::MAX as u32).cast_signed();
34
35 unsafe {
36 let _ = libc::kill(-pid, libc::SIGTERM);
37 let _ = libc::kill(pid, libc::SIGTERM);
38 }
39
40 wait_for_termination_or_send_sigkill(child, pid);
41}
42
43#[cfg(unix)]
44fn poll_child_once(child: &mut std::process::Child) -> bool {
45 use std::time::Duration;
46 match child.try_wait() {
47 Ok(Some(_)) | Err(_) => true,
48 Ok(None) => {
49 std::thread::sleep(Duration::from_millis(10));
50 false
51 }
52 }
53}
54
55#[cfg(unix)]
56fn wait_until_deadline(child: &mut std::process::Child, deadline: std::time::Instant) {
57 use std::time::Instant;
58 while Instant::now() < deadline {
59 if poll_child_once(child) {
60 return;
61 }
62 }
63}
64
65#[cfg(unix)]
66fn wait_for_termination_or_send_sigkill(child: &mut std::process::Child, pid: i32) {
67 let (term_deadline, kill_deadline) = compute_termination_deadlines();
68 wait_until_deadline(child, term_deadline);
69 send_sigkill(pid);
70 wait_until_deadline(child, kill_deadline);
71}
72
73#[cfg(unix)]
74fn compute_termination_deadlines() -> (std::time::Instant, std::time::Instant) {
75 use std::time::{Duration, Instant};
76
77 let term_deadline = Instant::now() + Duration::from_millis(250);
78 let kill_deadline = term_deadline + Duration::from_millis(500);
79 (term_deadline, kill_deadline)
80}
81
82#[cfg(unix)]
83fn send_sigkill(pid: i32) {
84 unsafe {
85 let _ = libc::kill(-pid, libc::SIGKILL);
86 let _ = libc::kill(pid, libc::SIGKILL);
87 }
88}
89
90#[cfg(unix)]
91fn ensure_nonblocking_or_terminate(
92 child: &mut std::process::Child,
93 stdout_fd: std::os::unix::io::RawFd,
94 stderr_fd: std::os::unix::io::RawFd,
95) -> io::Result<()> {
96 if let Err(e) = set_nonblocking_fd(stdout_fd) {
97 terminate_child_best_effort(child);
98 return Err(e);
99 }
100
101 if let Err(e) = set_nonblocking_fd(stderr_fd) {
102 terminate_child_best_effort(child);
103 return Err(e);
104 }
105
106 Ok(())
107}
108
109fn wrap_process_output(output: std::process::Output) -> ProcessOutput {
110 ProcessOutput {
111 status: output.status,
112 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
113 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
114 }
115}
116
117#[derive(Debug, Clone, Default)]
121pub struct RealProcessExecutor;
122
123impl RealProcessExecutor {
124 #[must_use]
126 pub const fn new() -> Self {
127 Self
128 }
129}
130
131impl ProcessExecutor for RealProcessExecutor {
132 fn execute(
133 &self,
134 command: &str,
135 args: &[&str],
136 env: &[(String, String)],
137 workdir: Option<&Path>,
138 ) -> io::Result<ProcessOutput> {
139 let output = build_and_run_command(command, args, env, workdir)?;
140 Ok(wrap_process_output(output))
141 }
142
143 fn spawn(
144 &self,
145 command: &str,
146 args: &[&str],
147 env: &[(String, String)],
148 workdir: Option<&Path>,
149 ) -> io::Result<SpawnedProcess> {
150 let mut cmd = std::process::Command::new(command);
151 cmd.args(args);
152 env.iter().for_each(|(k, v)| {
153 cmd.env(k, v);
154 });
155 if let Some(dir) = workdir {
156 cmd.current_dir(dir);
157 }
158 let mut child = cmd
159 .stdin(std::process::Stdio::piped())
160 .stdout(std::process::Stdio::piped())
161 .stderr(std::process::Stdio::piped())
162 .spawn()?;
163 let stdin = child.stdin.take();
164 Ok(SpawnedProcess {
165 stdin,
166 inner: child,
167 })
168 }
169
170 fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
171 let mut cmd = build_agent_command(config);
172 let child = spawn_agent_child(&mut cmd)?;
173 finalize_agent_child(child)
174 }
175
176 #[cfg(unix)]
177 fn kill_process_group(&self, pgid: u32) -> io::Result<()> {
178 let pgid_t = libc::pid_t::try_from(pgid)
183 .map_err(|_| io::Error::other(format!("pgid {pgid} does not fit in pid_t")))?;
184 if pgid_t == 0 {
185 return Err(io::Error::other(
186 "pgid must not be 0: kill(0, SIGKILL) sends to the caller's process group",
187 ));
188 }
189 let result = unsafe { libc::kill(-pgid_t, libc::SIGKILL) };
190 if result == 0 {
191 Ok(())
192 } else {
193 Err(io::Error::last_os_error())
194 }
195 }
196}
197
198fn take_child_streams(
199 child: &mut std::process::Child,
200) -> io::Result<(std::process::ChildStdout, std::process::ChildStderr)> {
201 let stdout = child
202 .stdout
203 .take()
204 .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
205 let stderr = child
206 .stderr
207 .take()
208 .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
209 Ok((stdout, stderr))
210}
211
212fn finalize_agent_child(mut child: std::process::Child) -> io::Result<AgentChildHandle> {
213 let (stdout, stderr) = take_child_streams(&mut child)?;
214
215 #[cfg(unix)]
216 {
217 use std::os::unix::io::AsRawFd;
218 ensure_nonblocking_or_terminate(&mut child, stdout.as_raw_fd(), stderr.as_raw_fd())?;
219 }
220
221 #[cfg(not(unix))]
222 let _ = (&child, &stdout, &stderr);
223
224 Ok(AgentChildHandle {
225 stdout: Box::new(stdout),
226 stderr: Box::new(stderr),
227 inner: Box::new(RealAgentChild(child)),
228 })
229}
230
231fn build_agent_command(config: &AgentSpawnConfig) -> std::process::Command {
232 let mut cmd = std::process::Command::new(&config.command);
233 cmd.args(&config.args);
234 config.env.iter().for_each(|(k, v)| {
235 cmd.env(k, v);
236 });
237 cmd.arg(&config.prompt);
238 cmd.env("PYTHONUNBUFFERED", "1");
239 cmd.env("NODE_ENV", "production");
240
241 #[cfg(unix)]
242 unsafe {
243 use std::os::unix::process::CommandExt;
244 cmd.pre_exec(|| {
245 if libc::setpgid(0, 0) != 0 {
246 return Err(io::Error::last_os_error());
247 }
248 Ok(())
249 });
250 }
251
252 cmd.stdin(std::process::Stdio::null());
253 cmd.stdout(std::process::Stdio::piped());
254 cmd.stderr(std::process::Stdio::piped());
255 cmd
256}
257
258fn spawn_agent_child(cmd: &mut std::process::Command) -> io::Result<std::process::Child> {
259 cmd.spawn()
260}
261
262fn build_and_run_command(
263 command: &str,
264 args: &[&str],
265 env: &[(String, String)],
266 workdir: Option<&Path>,
267) -> io::Result<std::process::Output> {
268 let mut cmd = std::process::Command::new(command);
269 cmd.args(args);
270 env.iter().for_each(|(k, v)| {
271 cmd.env(k, v);
272 });
273 if let Some(dir) = workdir {
274 cmd.current_dir(dir);
275 }
276 cmd.output()
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
284 #[cfg(unix)]
285 fn ensure_nonblocking_or_terminate_kills_child_on_failure() {
286 use std::process::Command;
287 use std::time::{Duration, Instant};
288
289 let mut child = Command::new("sleep")
290 .arg("60")
291 .stdin(std::process::Stdio::null())
292 .stdout(std::process::Stdio::null())
293 .stderr(std::process::Stdio::null())
294 .spawn()
295 .expect("spawn sleep");
296
297 let result = ensure_nonblocking_or_terminate(&mut child, -1, -1);
298 assert!(result.is_err(), "expected nonblocking setup to fail");
299
300 let deadline = Instant::now() + Duration::from_secs(2);
301 let mut exited = false;
302 while Instant::now() < deadline {
303 if !matches!(child.try_wait(), Ok(None)) {
304 exited = true;
305 break;
306 }
307 std::thread::sleep(Duration::from_millis(10));
308 }
309
310 if !exited {
311 let _ = child.kill();
312 let _ = child.wait();
313 }
314
315 assert!(
316 exited,
317 "expected child to be terminated when nonblocking setup fails"
318 );
319 }
320}