ralph_workflow/executor/
real.rs1use super::{AgentChildHandle, AgentSpawnConfig, ProcessExecutor, ProcessOutput, RealAgentChild};
7use std::io;
8use std::path::Path;
9
10#[cfg(unix)]
11fn set_nonblocking_fd(fd: std::os::unix::io::RawFd) -> io::Result<()> {
12 unsafe {
17 let flags = libc::fcntl(fd, libc::F_GETFL);
18 if flags < 0 {
19 return Err(io::Error::last_os_error());
20 }
21 if libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) < 0 {
22 return Err(io::Error::last_os_error());
23 }
24 }
25 Ok(())
26}
27
28#[cfg(unix)]
29fn ensure_nonblocking_or_terminate(
30 child: &mut std::process::Child,
31 stdout_fd: std::os::unix::io::RawFd,
32 stderr_fd: std::os::unix::io::RawFd,
33) -> io::Result<()> {
34 fn terminate_child_best_effort(child: &mut std::process::Child) {
35 use std::time::{Duration, Instant};
36
37 let pid = child.id() as i32;
38
39 unsafe {
41 let _ = libc::kill(-pid, libc::SIGTERM);
42 let _ = libc::kill(pid, libc::SIGTERM);
43 }
44
45 let term_deadline = Instant::now() + Duration::from_millis(250);
46 while Instant::now() < term_deadline {
47 match child.try_wait() {
48 Ok(Some(_)) | Err(_) => return,
49 Ok(None) => std::thread::sleep(Duration::from_millis(10)),
50 }
51 }
52
53 unsafe {
54 let _ = libc::kill(-pid, libc::SIGKILL);
55 let _ = libc::kill(pid, libc::SIGKILL);
56 }
57
58 let kill_deadline = Instant::now() + Duration::from_millis(500);
59 while Instant::now() < kill_deadline {
60 match child.try_wait() {
61 Ok(Some(_)) | Err(_) => return,
62 Ok(None) => std::thread::sleep(Duration::from_millis(10)),
63 }
64 }
65 }
66
67 if let Err(e) = set_nonblocking_fd(stdout_fd) {
68 terminate_child_best_effort(child);
69 return Err(e);
70 }
71
72 if let Err(e) = set_nonblocking_fd(stderr_fd) {
73 terminate_child_best_effort(child);
74 return Err(e);
75 }
76
77 Ok(())
78}
79
80#[derive(Debug, Clone, Default)]
84pub struct RealProcessExecutor;
85
86impl RealProcessExecutor {
87 pub fn new() -> Self {
89 Self
90 }
91}
92
93impl ProcessExecutor for RealProcessExecutor {
94 fn execute(
95 &self,
96 command: &str,
97 args: &[&str],
98 env: &[(String, String)],
99 workdir: Option<&Path>,
100 ) -> io::Result<ProcessOutput> {
101 let mut cmd = std::process::Command::new(command);
102 cmd.args(args);
103
104 for (key, value) in env {
105 cmd.env(key, value);
106 }
107
108 if let Some(dir) = workdir {
109 cmd.current_dir(dir);
110 }
111
112 let output = cmd.output()?;
113
114 Ok(ProcessOutput {
115 status: output.status,
116 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
117 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
118 })
119 }
120
121 fn spawn(
122 &self,
123 command: &str,
124 args: &[&str],
125 env: &[(String, String)],
126 workdir: Option<&Path>,
127 ) -> io::Result<std::process::Child> {
128 let mut cmd = std::process::Command::new(command);
129 cmd.args(args);
130
131 for (key, value) in env {
132 cmd.env(key, value);
133 }
134
135 if let Some(dir) = workdir {
136 cmd.current_dir(dir);
137 }
138
139 cmd.stdin(std::process::Stdio::piped())
140 .stdout(std::process::Stdio::piped())
141 .stderr(std::process::Stdio::piped())
142 .spawn()
143 }
144
145 fn spawn_agent(&self, config: &AgentSpawnConfig) -> io::Result<AgentChildHandle> {
146 let mut cmd = std::process::Command::new(&config.command);
147 cmd.args(&config.args);
148
149 for (key, value) in &config.env {
151 cmd.env(key, value);
152 }
153
154 cmd.arg(&config.prompt);
156
157 cmd.env("PYTHONUNBUFFERED", "1");
159 cmd.env("NODE_ENV", "production");
160
161 #[cfg(unix)]
164 unsafe {
165 use std::os::unix::process::CommandExt;
166 cmd.pre_exec(|| {
167 if libc::setpgid(0, 0) != 0 {
168 return Err(io::Error::last_os_error());
169 }
170 Ok(())
171 });
172 }
173
174 let mut child = cmd
176 .stdin(std::process::Stdio::null())
177 .stdout(std::process::Stdio::piped())
178 .stderr(std::process::Stdio::piped())
179 .spawn()?;
180
181 let stdout = child
182 .stdout
183 .take()
184 .ok_or_else(|| io::Error::other("Failed to capture stdout"))?;
185 let stderr = child
186 .stderr
187 .take()
188 .ok_or_else(|| io::Error::other("Failed to capture stderr"))?;
189
190 #[cfg(unix)]
193 {
194 use std::os::unix::io::AsRawFd;
195 ensure_nonblocking_or_terminate(&mut child, stdout.as_raw_fd(), stderr.as_raw_fd())?;
196 }
197
198 Ok(AgentChildHandle {
199 stdout: Box::new(stdout),
200 stderr: Box::new(stderr),
201 inner: Box::new(RealAgentChild(child)),
202 })
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 #[test]
211 #[cfg(unix)]
212 fn ensure_nonblocking_or_terminate_kills_child_on_failure() {
213 use std::process::Command;
214 use std::time::{Duration, Instant};
215
216 let mut child = Command::new("sleep")
217 .arg("60")
218 .stdin(std::process::Stdio::null())
219 .stdout(std::process::Stdio::null())
220 .stderr(std::process::Stdio::null())
221 .spawn()
222 .expect("spawn sleep");
223
224 let result = ensure_nonblocking_or_terminate(&mut child, -1, -1);
225 assert!(result.is_err(), "expected nonblocking setup to fail");
226
227 let deadline = Instant::now() + Duration::from_secs(2);
228 let mut exited = false;
229 while Instant::now() < deadline {
230 match child.try_wait() {
231 Ok(Some(_)) => {
232 exited = true;
233 break;
234 }
235 Ok(None) => std::thread::sleep(Duration::from_millis(10)),
236 Err(_) => {
237 exited = true;
238 break;
239 }
240 }
241 }
242
243 if !exited {
245 let _ = child.kill();
246 let _ = child.wait();
247 }
248
249 assert!(
250 exited,
251 "expected child to be terminated when nonblocking setup fails"
252 );
253 }
254}