ralph_workflow/executor/
real.rs1use super::{AgentChildHandle, AgentSpawnConfig, ProcessExecutor, ProcessOutput, RealAgentChild};
7use std::io;
8use std::path::Path;
9
10#[cfg(unix)]
11fn set_nonblocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
12 unsafe {
17 let flags = libc::fcntl(fd, libc::F_GETFL);
18 if flags < 0 {
19 return Err(io::Error::last_os_error());
20 }
21 if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
22 return Err(io::Error::last_os_error());
23 }
24 }
25 Ok(())
26}
27
28#[cfg(unix)]
29fn ensure_nonblocking_or_terminate(
30 child: &mut std::process::Child,
31 stdout_fd: std::os::unix::io::RawFd,
32 stderr_fd: std::os::unix::io::RawFd,
33) -> io::Result<()> {
34 fn terminate_child_best_effort(child: &mut std::process::Child) {
35 use std::time::{Duration, Instant};
36
37 let pid = child.id().min(i32::MAX as u32).cast_signed();
38
39 unsafe {
41 let _ = libc::kill(-pid, libc::SIGTERM);
42 let _ = libc::kill(pid, libc::SIGTERM);
43 }
44
45 let term_deadline = Instant::now() + Duration::from_millis(250);
46 while Instant::now() < term_deadline {
47 match child.try_wait() {
48 Ok(Some(_)) | Err(_) => return,
49 Ok(None) => std::thread::sleep(Duration::from_millis(10)),
50 }
51 }
52
53 unsafe {
54 let _ = libc::kill(-pid, libc::SIGKILL);
55 let _ = libc::kill(pid, libc::SIGKILL);
56 }
57
58 let kill_deadline = Instant::now() + Duration::from_millis(500);
59 while Instant::now() < kill_deadline {
60 match child.try_wait() {
61 Ok(Some(_)) | Err(_) => return,
62 Ok(None) => std::thread::sleep(Duration::from_millis(10)),
63 }
64 }
65 }
66
67 if let Err(e) = set_nonblocking_fd(stdout_fd) {
68 terminate_child_best_effort(child);
69 return Err(e);
70 }
71
72 if let Err(e) = set_nonblocking_fd(stderr_fd) {
73 terminate_child_best_effort(child);
74 return Err(e);
75 }
76
77 Ok(())
78}
79
80#[derive(Debug, Clone, Default)]
84pub struct RealProcessExecutor;
85
86impl RealProcessExecutor {
87 #[must_use]
89 pub const fn new() -> Self {
90 Self
91 }
92}
93
94impl ProcessExecutor for RealProcessExecutor {
95 fn execute(
96 &self,
97 command: &str,
98 args: &[&str],
99 env: &[(String, String)],
100 workdir: Option<&Path>,
101 ) -> io::Result<ProcessOutput> {
102 let mut cmd = std::process::Command::new(command);
103 cmd.args(args);
104
105 for (key, value) in env {
106 cmd.env(key, value);
107 }
108
109 if let Some(dir) = workdir {
110 cmd.current_dir(dir);
111 }
112
113 let output = cmd.output()?;
114
115 Ok(ProcessOutput {
116 status: output.status,
117 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
118 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
119 })
120 }
121
122 fn spawn(
123 &self,
124 command: &str,
125 args: &[&str],
126 env: &[(String, String)],
127 workdir: Option<&Path>,
128 ) -> io::Result<std::process::Child> {
129 let mut cmd = std::process::Command::new(command);
130 cmd.args(args);
131
132 for (key, value) in env {
133 cmd.env(key, value);
134 }
135
136 if let Some(dir) = workdir {
137 cmd.current_dir(dir);
138 }
139
140 cmd.stdin(std::process::Stdio::piped())
141 .stdout(std::process::Stdio::piped())
142 .stderr(std::process::Stdio::piped())
143 .spawn()
144 }
145
146 fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
147 let mut cmd = std::process::Command::new(&config.command);
148 cmd.args(&config.args);
149
150 for (key, value) in &config.env {
152 cmd.env(key, value);
153 }
154
155 cmd.arg(&config.prompt);
157
158 cmd.env("PYTHONUNBUFFERED", "1");
160 cmd.env("NODE_ENV", "production");
161
162 #[cfg(unix)]
165 unsafe {
166 use std::os::unix::process::CommandExt;
167 cmd.pre_exec(|| {
168 if libc::setpgid(0, 0) != 0 {
169 return Err(io::Error::last_os_error());
170 }
171 Ok(())
172 });
173 }
174
175 let mut child = cmd
177 .stdin(std::process::Stdio::null())
178 .stdout(std::process::Stdio::piped())
179 .stderr(std::process::Stdio::piped())
180 .spawn()?;
181
182 let stdout = child
183 .stdout
184 .take()
185 .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
186 let stderr = child
187 .stderr
188 .take()
189 .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
190
191 #[cfg(unix)]
194 {
195 use std::os::unix::io::AsRawFd;
196 ensure_nonblocking_or_terminate(&mut child, stdout.as_raw_fd(), stderr.as_raw_fd())?;
197 }
198
199 Ok(AgentChildHandle {
200 stdout: Box::new(stdout),
201 stderr: Box::new(stderr),
202 inner: Box::new(RealAgentChild(child)),
203 })
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[test]
212 #[cfg(unix)]
213 fn ensure_nonblocking_or_terminate_kills_child_on_failure() {
214 use std::process::Command;
215 use std::time::{Duration, Instant};
216
217 let mut child = Command::new("sleep")
218 .arg("60")
219 .stdin(std::process::Stdio::null())
220 .stdout(std::process::Stdio::null())
221 .stderr(std::process::Stdio::null())
222 .spawn()
223 .expect("spawn sleep");
224
225 let result = ensure_nonblocking_or_terminate(&mut child, -1, -1);
226 assert!(result.is_err(), "expected nonblocking setup to fail");
227
228 let deadline = Instant::now() + Duration::from_secs(2);
229 let mut exited = false;
230 while Instant::now() < deadline {
231 if !matches!(child.try_wait(), Ok(None)) {
232 exited = true;
233 break;
234 }
235 std::thread::sleep(Duration::from_millis(10));
236 }
237
238 if !exited {
240 let _ = child.kill();
241 let _ = child.wait();
242 }
243
244 assert!(
245 exited,
246 "expected child to be terminated when nonblocking setup fails"
247 );
248 }
249}