1use anyhow::{Result, bail};
19use std::io::Read;
20use std::path::{Path, PathBuf};
21use std::process::{Command, ExitStatus, Output, Stdio};
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::{Arc, Mutex};
24use std::thread;
25use std::time::{Duration, Instant};
26use thiserror::Error;
27
28use crate::constants::{buffers, timeouts};
29
30#[cfg(unix)]
31use std::os::unix::process::CommandExt;
32
33#[derive(Debug, Clone, PartialEq, Eq)]
35pub(crate) enum SafeCommand {
36 Argv { argv: Vec<String> },
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub(crate) enum TimeoutClass {
42 Probe,
43 Git,
44 GitHubCli,
45 PluginHook,
46 CiGate,
47}
48
49impl TimeoutClass {
50 pub(crate) fn timeout(self) -> Duration {
51 match self {
52 Self::Probe => timeouts::MANAGED_SUBPROCESS_PROBE_TIMEOUT,
53 Self::Git => timeouts::MANAGED_SUBPROCESS_GIT_TIMEOUT,
54 Self::GitHubCli => timeouts::MANAGED_SUBPROCESS_GH_TIMEOUT,
55 Self::PluginHook => timeouts::MANAGED_SUBPROCESS_PLUGIN_TIMEOUT,
56 Self::CiGate => timeouts::MANAGED_SUBPROCESS_CI_TIMEOUT,
57 }
58 }
59
60 fn capture_limits(self) -> CaptureLimits {
61 match self {
62 Self::CiGate => CaptureLimits {
63 stdout_max_bytes: buffers::MANAGED_SUBPROCESS_CI_CAPTURE_MAX_BYTES,
64 stderr_max_bytes: buffers::MANAGED_SUBPROCESS_CI_CAPTURE_MAX_BYTES,
65 },
66 _ => CaptureLimits {
67 stdout_max_bytes: buffers::MANAGED_SUBPROCESS_CAPTURE_MAX_BYTES,
68 stderr_max_bytes: buffers::MANAGED_SUBPROCESS_CAPTURE_MAX_BYTES,
69 },
70 }
71 }
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub(crate) struct CaptureLimits {
77 pub stdout_max_bytes: usize,
78 pub stderr_max_bytes: usize,
79}
80
81#[derive(Debug)]
83pub(crate) struct ManagedCommand {
84 command: Command,
85 description: String,
86 timeout_class: TimeoutClass,
87 timeout_override: Option<Duration>,
88 capture_limits: CaptureLimits,
89 cancellation: Option<Arc<AtomicBool>>,
90}
91
92impl ManagedCommand {
93 pub(crate) fn new(
94 command: Command,
95 description: impl Into<String>,
96 timeout_class: TimeoutClass,
97 ) -> Self {
98 Self {
99 command,
100 description: description.into(),
101 timeout_class,
102 timeout_override: None,
103 capture_limits: timeout_class.capture_limits(),
104 cancellation: None,
105 }
106 }
107
108 #[cfg(test)]
109 pub(crate) fn with_timeout(mut self, timeout: Duration) -> Self {
110 self.timeout_override = Some(timeout);
111 self
112 }
113
114 #[allow(dead_code)]
115 pub(crate) fn with_capture_limits(mut self, capture_limits: CaptureLimits) -> Self {
116 self.capture_limits = capture_limits;
117 self
118 }
119
120 #[allow(dead_code)]
121 pub(crate) fn with_cancellation(mut self, cancellation: Arc<AtomicBool>) -> Self {
122 self.cancellation = Some(cancellation);
123 self
124 }
125}
126
127#[derive(Debug, Clone)]
129pub(crate) struct ManagedOutput {
130 pub status: ExitStatus,
131 pub stdout: Vec<u8>,
132 pub stderr: Vec<u8>,
133 pub stdout_truncated: bool,
134 pub stderr_truncated: bool,
135}
136
137impl ManagedOutput {
138 pub(crate) fn into_output(self) -> Output {
139 Output {
140 status: self.status,
141 stdout: self.stdout,
142 stderr: self.stderr,
143 }
144 }
145}
146
147#[derive(Debug, Error)]
149pub(crate) enum ManagedProcessError {
150 #[error("failed to spawn managed subprocess '{description}': {source}")]
151 Spawn {
152 description: String,
153 #[source]
154 source: std::io::Error,
155 },
156 #[error(
157 "managed subprocess '{description}' timed out after {timeout:?} (stdout_tail={stdout_tail} bytes, stderr_tail={stderr_tail} bytes)"
158 )]
159 TimedOut {
160 description: String,
161 timeout: Duration,
162 stdout_tail: usize,
163 stderr_tail: usize,
164 },
165 #[error(
166 "managed subprocess '{description}' was cancelled (stdout_tail={stdout_tail} bytes, stderr_tail={stderr_tail} bytes)"
167 )]
168 Cancelled {
169 description: String,
170 stdout_tail: usize,
171 stderr_tail: usize,
172 },
173 #[error("failed while waiting for managed subprocess '{description}': {source}")]
174 Wait {
175 description: String,
176 #[source]
177 source: std::io::Error,
178 },
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182enum TerminationReason {
183 Timeout,
184 Cancelled,
185}
186
187#[derive(Debug)]
188struct BoundedCapture {
189 bytes: Vec<u8>,
190 max_bytes: usize,
191 truncated: bool,
192}
193
194impl BoundedCapture {
195 fn new(max_bytes: usize) -> Self {
196 Self {
197 bytes: Vec::new(),
198 max_bytes,
199 truncated: false,
200 }
201 }
202
203 fn push(&mut self, chunk: &[u8]) {
204 if chunk.is_empty() {
205 return;
206 }
207 if self.max_bytes == 0 {
208 self.truncated = true;
209 return;
210 }
211 if chunk.len() >= self.max_bytes {
212 self.bytes.clear();
213 self.bytes
214 .extend_from_slice(&chunk[chunk.len() - self.max_bytes..]);
215 self.truncated = true;
216 return;
217 }
218
219 let next_len = self.bytes.len() + chunk.len();
220 if next_len > self.max_bytes {
221 let excess = next_len - self.max_bytes;
222 self.bytes.drain(..excess);
223 self.truncated = true;
224 }
225 self.bytes.extend_from_slice(chunk);
226 }
227}
228
229pub(crate) fn execute_safe_command(safe_command: &SafeCommand, cwd: &Path) -> Result<Output> {
230 let (mut command, description) = match safe_command {
231 SafeCommand::Argv { argv } => {
232 let command = build_argv_command(argv)?;
233 (command, argv.join(" "))
234 }
235 };
236
237 command
238 .current_dir(cwd)
239 .stdin(Stdio::null())
240 .stdout(Stdio::piped())
241 .stderr(Stdio::piped());
242
243 execute_managed_command(ManagedCommand::new(
244 command,
245 description,
246 TimeoutClass::CiGate,
247 ))
248 .map(ManagedOutput::into_output)
249 .map_err(Into::into)
250}
251
252pub(crate) fn execute_managed_command(
253 mut managed: ManagedCommand,
254) -> std::result::Result<ManagedOutput, ManagedProcessError> {
255 managed
256 .command
257 .stdin(Stdio::null())
258 .stdout(Stdio::piped())
259 .stderr(Stdio::piped());
260
261 #[cfg(unix)]
262 unsafe {
264 managed.command.pre_exec(|| {
265 let _ = libc::setpgid(0, 0);
266 Ok(())
267 });
268 }
269
270 let description = managed.description.clone();
271 let timeout = managed
272 .timeout_override
273 .unwrap_or_else(|| managed.timeout_class.timeout());
274 let mut child = managed
275 .command
276 .spawn()
277 .map_err(|source| ManagedProcessError::Spawn {
278 description: description.clone(),
279 source,
280 })?;
281
282 let stdout = child
283 .stdout
284 .take()
285 .ok_or_else(|| ManagedProcessError::Wait {
286 description: description.clone(),
287 source: std::io::Error::other("child stdout pipe was not available"),
288 })?;
289 let stderr = child
290 .stderr
291 .take()
292 .ok_or_else(|| ManagedProcessError::Wait {
293 description: description.clone(),
294 source: std::io::Error::other("child stderr pipe was not available"),
295 })?;
296
297 let stdout_capture = Arc::new(Mutex::new(BoundedCapture::new(
298 managed.capture_limits.stdout_max_bytes,
299 )));
300 let stderr_capture = Arc::new(Mutex::new(BoundedCapture::new(
301 managed.capture_limits.stderr_max_bytes,
302 )));
303 let stdout_handle = spawn_capture_thread(stdout, Arc::clone(&stdout_capture));
304 let stderr_handle = spawn_capture_thread(stderr, Arc::clone(&stderr_capture));
305
306 let termination = wait_for_child(
307 &mut child,
308 timeout,
309 managed.cancellation.as_ref().map(Arc::clone),
310 )
311 .map_err(|source| ManagedProcessError::Wait {
312 description: description.clone(),
313 source,
314 })?;
315
316 join_capture_thread(stdout_handle);
317 join_capture_thread(stderr_handle);
318
319 let stdout_capture = unwrap_capture(stdout_capture);
320 let stderr_capture = unwrap_capture(stderr_capture);
321
322 if let Some(reason) = termination {
323 return Err(match reason {
324 TerminationReason::Timeout => ManagedProcessError::TimedOut {
325 description,
326 timeout,
327 stdout_tail: stdout_capture.bytes.len(),
328 stderr_tail: stderr_capture.bytes.len(),
329 },
330 TerminationReason::Cancelled => ManagedProcessError::Cancelled {
331 description,
332 stdout_tail: stdout_capture.bytes.len(),
333 stderr_tail: stderr_capture.bytes.len(),
334 },
335 });
336 }
337
338 let status = child.wait().map_err(|source| ManagedProcessError::Wait {
339 description,
340 source,
341 })?;
342
343 Ok(ManagedOutput {
344 status,
345 stdout: stdout_capture.bytes,
346 stderr: stderr_capture.bytes,
347 stdout_truncated: stdout_capture.truncated,
348 stderr_truncated: stderr_capture.truncated,
349 })
350}
351
352pub(crate) fn sleep_with_cancellation(
353 duration: Duration,
354 cancellation: Option<&AtomicBool>,
355) -> std::result::Result<(), RetryWaitError> {
356 let deadline = Instant::now() + duration;
357 while Instant::now() < deadline {
358 if cancellation.is_some_and(|flag| flag.load(Ordering::SeqCst)) {
359 return Err(RetryWaitError::Cancelled);
360 }
361 let remaining = deadline.saturating_duration_since(Instant::now());
362 thread::sleep(remaining.min(timeouts::MANAGED_RETRY_POLL_INTERVAL));
363 }
364 Ok(())
365}
366
367#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
368pub(crate) enum RetryWaitError {
369 #[error("retry wait cancelled")]
370 Cancelled,
371}
372
373fn build_argv_command(argv: &[String]) -> Result<Command> {
374 let (program, args) = argv
375 .split_first()
376 .ok_or_else(|| anyhow::anyhow!("argv command must include at least one element"))?;
377 if program.trim().is_empty() {
378 bail!("argv command program must be non-empty.");
379 }
380
381 let mut command = Command::new(PathBuf::from(program));
382 command.args(args);
383 Ok(command)
384}
385
386fn spawn_capture_thread(
387 mut reader: impl Read + Send + 'static,
388 capture: Arc<Mutex<BoundedCapture>>,
389) -> thread::JoinHandle<()> {
390 thread::spawn(move || {
391 let mut buf = [0_u8; 8192];
392 loop {
393 match reader.read(&mut buf) {
394 Ok(0) => break,
395 Ok(n) => {
396 let mut guard = capture
397 .lock()
398 .unwrap_or_else(|poisoned| poisoned.into_inner());
399 guard.push(&buf[..n]);
400 }
401 Err(err) => {
402 log::debug!(
403 "managed subprocess reader exiting after read error: {}",
404 err
405 );
406 break;
407 }
408 }
409 }
410 })
411}
412
413fn join_capture_thread(handle: thread::JoinHandle<()>) {
414 if let Err(err) = handle.join() {
415 log::debug!("managed subprocess capture thread panicked: {:?}", err);
416 }
417}
418
419fn unwrap_capture(capture: Arc<Mutex<BoundedCapture>>) -> BoundedCapture {
420 match Arc::try_unwrap(capture) {
421 Ok(mutex) => mutex
422 .into_inner()
423 .unwrap_or_else(|poisoned| poisoned.into_inner()),
424 Err(shared) => {
425 let mut guard = shared
426 .lock()
427 .unwrap_or_else(|poisoned| poisoned.into_inner());
428 BoundedCapture {
429 bytes: std::mem::take(&mut guard.bytes),
430 max_bytes: guard.max_bytes,
431 truncated: guard.truncated,
432 }
433 }
434 }
435}
436
437fn wait_for_child(
438 child: &mut std::process::Child,
439 timeout: Duration,
440 cancellation: Option<Arc<AtomicBool>>,
441) -> std::io::Result<Option<TerminationReason>> {
442 let start = Instant::now();
443 let mut termination: Option<(TerminationReason, Instant)> = None;
444
445 loop {
446 if termination.is_none() {
447 if cancellation
448 .as_ref()
449 .is_some_and(|flag| flag.load(Ordering::SeqCst))
450 {
451 signal_child(child, false);
452 termination = Some((TerminationReason::Cancelled, Instant::now()));
453 } else if start.elapsed() > timeout {
454 signal_child(child, false);
455 termination = Some((TerminationReason::Timeout, Instant::now()));
456 }
457 }
458
459 if let Some((_, interrupted_at)) = termination
460 && interrupted_at.elapsed() > timeouts::MANAGED_SUBPROCESS_INTERRUPT_GRACE
461 {
462 signal_child(child, true);
463 reap_killed_child(child)?;
464 }
465
466 if let Some(status) = child.try_wait()? {
467 if status.success() {
468 return Ok(None);
469 }
470 return Ok(termination.map(|(reason, _)| reason));
471 }
472
473 thread::sleep(timeouts::MANAGED_SUBPROCESS_POLL_INTERVAL);
474 }
475}
476
477fn signal_child(child: &mut std::process::Child, hard_kill: bool) {
478 #[cfg(unix)]
479 {
480 let pid = child.id() as i32;
481 let signal = if hard_kill {
482 libc::SIGKILL
483 } else {
484 libc::SIGINT
485 };
486 unsafe {
489 let _ = libc::kill(-pid, signal);
490 }
491 }
492
493 #[cfg(not(unix))]
494 {
495 let _ = child.kill();
496 }
497}
498
499fn reap_killed_child(child: &mut std::process::Child) -> std::io::Result<()> {
500 let start = Instant::now();
501 while start.elapsed() <= timeouts::MANAGED_SUBPROCESS_REAP_TIMEOUT {
502 if child.try_wait()?.is_some() {
503 return Ok(());
504 }
505 thread::sleep(timeouts::MANAGED_SUBPROCESS_POLL_INTERVAL);
506 }
507 Ok(())
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 #[cfg(unix)]
515 #[test]
516 fn managed_command_bounds_captured_output() {
517 let mut command = Command::new("/bin/sh");
518 command.arg("-c").arg("printf '1234567890abcdef'");
519 let output = execute_managed_command(
520 ManagedCommand::new(command, "capture", TimeoutClass::Probe).with_capture_limits(
521 CaptureLimits {
522 stdout_max_bytes: 4,
523 stderr_max_bytes: 4,
524 },
525 ),
526 )
527 .expect("managed command should succeed");
528
529 assert_eq!(String::from_utf8_lossy(&output.stdout), "cdef");
530 assert!(output.stdout_truncated);
531 }
532
533 #[cfg(unix)]
534 #[test]
535 fn sleep_with_cancellation_stops_early() {
536 let cancelled = AtomicBool::new(true);
537 let result = sleep_with_cancellation(Duration::from_secs(1), Some(&cancelled));
538 assert!(matches!(result, Err(RetryWaitError::Cancelled)));
539 }
540
541 #[cfg(unix)]
542 #[test]
543 fn managed_command_times_out() {
544 let mut command = Command::new("/bin/sh");
545 command.arg("-c").arg("sleep 5");
546 let err = execute_managed_command(
547 ManagedCommand::new(command, "timeout", TimeoutClass::Probe)
548 .with_timeout(Duration::from_millis(100)),
549 )
550 .expect_err("managed command should time out");
551
552 assert!(matches!(err, ManagedProcessError::TimedOut { .. }));
553 }
554}