ralph_workflow/executor/
real.rs1use super::{
7 AgentChildHandle, AgentSpawnConfig, ProcessExecutor, ProcessOutput, RealAgentChild,
8 SpawnedProcess,
9};
10use std::io;
11use std::path::Path;
12
13#[cfg(unix)]
14fn set_nonblocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
15 unsafe {
20 let flags = libc::fcntl(fd, libc::F_GETFL);
21 if flags < 0 {
22 return Err(io::Error::last_os_error());
23 }
24 if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
25 return Err(io::Error::last_os_error());
26 }
27 }
28 Ok(())
29}
30
31#[cfg(unix)]
32fn terminate_child_best_effort(child: &mut std::process::Child) {
33 let pid = child.id().min(i32::MAX as u32).cast_signed();
34
35 unsafe {
36 let _ = libc::kill(-pid, libc::SIGTERM);
37 let _ = libc::kill(pid, libc::SIGTERM);
38 }
39
40 wait_for_termination_or_send_sigkill(child, pid);
41}
42
43#[cfg(unix)]
44fn poll_child_once(child: &mut std::process::Child) -> bool {
45 use std::time::Duration;
46 match child.try_wait() {
47 Ok(Some(_)) | Err(_) => true,
48 Ok(None) => {
49 std::thread::sleep(Duration::from_millis(10));
50 false
51 }
52 }
53}
54
55#[cfg(unix)]
56fn wait_until_deadline(child: &mut std::process::Child, deadline: std::time::Instant) {
57 use std::time::Instant;
58 while Instant::now() < deadline {
59 if poll_child_once(child) {
60 return;
61 }
62 }
63}
64
65#[cfg(unix)]
66fn wait_for_termination_or_send_sigkill(child: &mut std::process::Child, pid: i32) {
67 let (term_deadline, kill_deadline) = compute_termination_deadlines();
68 wait_until_deadline(child, term_deadline);
69 send_sigkill(pid);
70 wait_until_deadline(child, kill_deadline);
71}
72
73#[cfg(unix)]
74fn compute_termination_deadlines() -> (std::time::Instant, std::time::Instant) {
75 use std::time::{Duration, Instant};
76
77 let term_deadline = Instant::now() + Duration::from_millis(250);
78 let kill_deadline = term_deadline + Duration::from_millis(500);
79 (term_deadline, kill_deadline)
80}
81
82#[cfg(unix)]
83fn send_sigkill(pid: i32) {
84 unsafe {
85 let _ = libc::kill(-pid, libc::SIGKILL);
86 let _ = libc::kill(pid, libc::SIGKILL);
87 }
88}
89
90#[cfg(unix)]
91fn ensure_nonblocking_or_terminate(
92 child: &mut std::process::Child,
93 stdout_fd: std::os::unix::io::RawFd,
94 stderr_fd: std::os::unix::io::RawFd,
95) -> io::Result<()> {
96 if let Err(e) = set_nonblocking_fd(stdout_fd) {
97 terminate_child_best_effort(child);
98 return Err(e);
99 }
100
101 if let Err(e) = set_nonblocking_fd(stderr_fd) {
102 terminate_child_best_effort(child);
103 return Err(e);
104 }
105
106 Ok(())
107}
108
109fn wrap_process_output(output: std::process::Output) -> ProcessOutput {
110 ProcessOutput {
111 status: output.status,
112 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
113 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
114 }
115}
116
117#[derive(Debug, Clone, Default)]
121pub struct RealProcessExecutor;
122
123impl RealProcessExecutor {
124 #[must_use]
126 pub const fn new() -> Self {
127 Self
128 }
129}
130
131impl ProcessExecutor for RealProcessExecutor {
132 fn execute(
133 &self,
134 command: &str,
135 args: &[&str],
136 env: &[(String, String)],
137 workdir: Option<&Path>,
138 ) -> io::Result<ProcessOutput> {
139 let output = build_and_run_command(command, args, env, workdir)?;
140 Ok(wrap_process_output(output))
141 }
142
143 fn spawn(
144 &self,
145 command: &str,
146 args: &[&str],
147 env: &[(String, String)],
148 workdir: Option<&Path>,
149 ) -> io::Result<SpawnedProcess> {
150 let mut cmd = std::process::Command::new(command);
151 cmd.args(args);
152 env.iter().for_each(|(k, v)| {
153 cmd.env(k, v);
154 });
155 if let Some(dir) = workdir {
156 cmd.current_dir(dir);
157 }
158 let mut child = cmd
159 .stdin(std::process::Stdio::piped())
160 .stdout(std::process::Stdio::piped())
161 .stderr(std::process::Stdio::piped())
162 .spawn()?;
163 let stdin = child.stdin.take();
164 Ok(SpawnedProcess {
165 stdin,
166 inner: child,
167 })
168 }
169
170 fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
171 let mut cmd = build_agent_command(config);
172 let child = spawn_agent_child(&mut cmd)?;
173 finalize_agent_child(child)
174 }
175}
176
177fn take_child_streams(
178 child: &mut std::process::Child,
179) -> io::Result<(std::process::ChildStdout, std::process::ChildStderr)> {
180 let stdout = child
181 .stdout
182 .take()
183 .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
184 let stderr = child
185 .stderr
186 .take()
187 .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
188 Ok((stdout, stderr))
189}
190
191fn finalize_agent_child(mut child: std::process::Child) -> io::Result<AgentChildHandle> {
192 let (stdout, stderr) = take_child_streams(&mut child)?;
193
194 #[cfg(unix)]
195 {
196 use std::os::unix::io::AsRawFd;
197 ensure_nonblocking_or_terminate(&mut child, stdout.as_raw_fd(), stderr.as_raw_fd())?;
198 }
199
200 #[cfg(not(unix))]
201 let _ = (&child, &stdout, &stderr);
202
203 Ok(AgentChildHandle {
204 stdout: Box::new(stdout),
205 stderr: Box::new(stderr),
206 inner: Box::new(RealAgentChild(child)),
207 })
208}
209
210fn build_agent_command(config: &AgentSpawnConfig) -> std::process::Command {
211 let mut cmd = std::process::Command::new(&config.command);
212 cmd.args(&config.args);
213 config.env.iter().for_each(|(k, v)| {
214 cmd.env(k, v);
215 });
216 cmd.arg(&config.prompt);
217 cmd.env("PYTHONUNBUFFERED", "1");
218 cmd.env("NODE_ENV", "production");
219
220 #[cfg(unix)]
221 unsafe {
222 use std::os::unix::process::CommandExt;
223 cmd.pre_exec(|| {
224 if libc::setpgid(0, 0) != 0 {
225 return Err(io::Error::last_os_error());
226 }
227 Ok(())
228 });
229 }
230
231 cmd.stdin(std::process::Stdio::null());
232 cmd.stdout(std::process::Stdio::piped());
233 cmd.stderr(std::process::Stdio::piped());
234 cmd
235}
236
237fn spawn_agent_child(cmd: &mut std::process::Command) -> io::Result<std::process::Child> {
238 cmd.spawn()
239}
240
241fn build_and_run_command(
242 command: &str,
243 args: &[&str],
244 env: &[(String, String)],
245 workdir: Option<&Path>,
246) -> io::Result<std::process::Output> {
247 let mut cmd = std::process::Command::new(command);
248 cmd.args(args);
249 env.iter().for_each(|(k, v)| {
250 cmd.env(k, v);
251 });
252 if let Some(dir) = workdir {
253 cmd.current_dir(dir);
254 }
255 cmd.output()
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 #[test]
263 #[cfg(unix)]
264 fn ensure_nonblocking_or_terminate_kills_child_on_failure() {
265 use std::process::Command;
266 use std::time::{Duration, Instant};
267
268 let mut child = Command::new("sleep")
269 .arg("60")
270 .stdin(std::process::Stdio::null())
271 .stdout(std::process::Stdio::null())
272 .stderr(std::process::Stdio::null())
273 .spawn()
274 .expect("spawn sleep");
275
276 let result = ensure_nonblocking_or_terminate(&mut child, -1, -1);
277 assert!(result.is_err(), "expected nonblocking setup to fail");
278
279 let deadline = Instant::now() + Duration::from_secs(2);
280 let mut exited = false;
281 while Instant::now() < deadline {
282 if !matches!(child.try_wait(), Ok(None)) {
283 exited = true;
284 break;
285 }
286 std::thread::sleep(Duration::from_millis(10));
287 }
288
289 if !exited {
290 let _ = child.kill();
291 let _ = child.wait();
292 }
293
294 assert!(
295 exited,
296 "expected child to be terminated when nonblocking setup fails"
297 );
298 }
299}