Skip to main content

ralph/runutil/
shell.rs

1//! Managed subprocess execution for non-runner commands.
2//!
3//! Responsibilities:
4//! - Provide one argv-first subprocess service for CI, plugins, git/gh, doctor, and probes.
5//! - Enforce timeout classes, bounded stdout/stderr capture, and signal escalation.
6//! - Offer lightweight cancellation-aware waiting and retry-friendly command metadata.
7//!
8//! Not handled here:
9//! - Runner streaming/json protocol execution (see `crate::runner::execution`).
10//! - Domain-specific error classification for git/gh/CI/plugin failures.
11//!
12//! Invariants/assumptions:
13//! - Direct argv execution is the only supported CI gate execution path.
14//! - Managed subprocesses run with piped stdout/stderr and bounded in-memory capture.
15//! - On unix, managed subprocesses execute in isolated process groups so SIGINT/SIGKILL can
16//!   target the full subtree rather than only the immediate child.
17
18use anyhow::{Result, bail};
19use std::io::Read;
20use std::path::{Path, PathBuf};
21use std::process::{Command, ExitStatus, Output, Stdio};
22use std::sync::atomic::{AtomicBool, Ordering};
23use std::sync::{Arc, Mutex};
24use std::thread;
25use std::time::{Duration, Instant};
26use thiserror::Error;
27
28use crate::constants::{buffers, timeouts};
29
30#[cfg(unix)]
31use std::os::unix::process::CommandExt;
32
33/// Structured command execution without implicit shell interpolation.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub(crate) enum SafeCommand {
36    Argv { argv: Vec<String> },
37}
38
39/// Timeout category for managed subprocesses.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub(crate) enum TimeoutClass {
42    Probe,
43    Git,
44    GitHubCli,
45    PluginHook,
46    CiGate,
47}
48
49impl TimeoutClass {
50    pub(crate) fn timeout(self) -> Duration {
51        match self {
52            Self::Probe => timeouts::MANAGED_SUBPROCESS_PROBE_TIMEOUT,
53            Self::Git => timeouts::MANAGED_SUBPROCESS_GIT_TIMEOUT,
54            Self::GitHubCli => timeouts::MANAGED_SUBPROCESS_GH_TIMEOUT,
55            Self::PluginHook => timeouts::MANAGED_SUBPROCESS_PLUGIN_TIMEOUT,
56            Self::CiGate => timeouts::MANAGED_SUBPROCESS_CI_TIMEOUT,
57        }
58    }
59
60    fn capture_limits(self) -> CaptureLimits {
61        match self {
62            Self::CiGate => CaptureLimits {
63                stdout_max_bytes: buffers::MANAGED_SUBPROCESS_CI_CAPTURE_MAX_BYTES,
64                stderr_max_bytes: buffers::MANAGED_SUBPROCESS_CI_CAPTURE_MAX_BYTES,
65            },
66            _ => CaptureLimits {
67                stdout_max_bytes: buffers::MANAGED_SUBPROCESS_CAPTURE_MAX_BYTES,
68                stderr_max_bytes: buffers::MANAGED_SUBPROCESS_CAPTURE_MAX_BYTES,
69            },
70        }
71    }
72}
73
74/// Output capture bounds for a managed subprocess.
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub(crate) struct CaptureLimits {
77    pub stdout_max_bytes: usize,
78    pub stderr_max_bytes: usize,
79}
80
81/// Configured managed subprocess invocation.
82#[derive(Debug)]
83pub(crate) struct ManagedCommand {
84    command: Command,
85    description: String,
86    timeout_class: TimeoutClass,
87    timeout_override: Option<Duration>,
88    capture_limits: CaptureLimits,
89    cancellation: Option<Arc<AtomicBool>>,
90}
91
92impl ManagedCommand {
93    pub(crate) fn new(
94        command: Command,
95        description: impl Into<String>,
96        timeout_class: TimeoutClass,
97    ) -> Self {
98        Self {
99            command,
100            description: description.into(),
101            timeout_class,
102            timeout_override: None,
103            capture_limits: timeout_class.capture_limits(),
104            cancellation: None,
105        }
106    }
107
108    #[cfg(test)]
109    pub(crate) fn with_timeout(mut self, timeout: Duration) -> Self {
110        self.timeout_override = Some(timeout);
111        self
112    }
113
114    #[allow(dead_code)]
115    pub(crate) fn with_capture_limits(mut self, capture_limits: CaptureLimits) -> Self {
116        self.capture_limits = capture_limits;
117        self
118    }
119
120    #[allow(dead_code)]
121    pub(crate) fn with_cancellation(mut self, cancellation: Arc<AtomicBool>) -> Self {
122        self.cancellation = Some(cancellation);
123        self
124    }
125}
126
127/// Managed subprocess outcome with bounded capture metadata.
128#[derive(Debug, Clone)]
129pub(crate) struct ManagedOutput {
130    pub status: ExitStatus,
131    pub stdout: Vec<u8>,
132    pub stderr: Vec<u8>,
133    pub stdout_truncated: bool,
134    pub stderr_truncated: bool,
135}
136
137impl ManagedOutput {
138    pub(crate) fn into_output(self) -> Output {
139        Output {
140            status: self.status,
141            stdout: self.stdout,
142            stderr: self.stderr,
143        }
144    }
145}
146
147/// Failure while executing a managed subprocess.
148#[derive(Debug, Error)]
149pub(crate) enum ManagedProcessError {
150    #[error("failed to spawn managed subprocess '{description}': {source}")]
151    Spawn {
152        description: String,
153        #[source]
154        source: std::io::Error,
155    },
156    #[error(
157        "managed subprocess '{description}' timed out after {timeout:?} (stdout_tail={stdout_tail} bytes, stderr_tail={stderr_tail} bytes)"
158    )]
159    TimedOut {
160        description: String,
161        timeout: Duration,
162        stdout_tail: usize,
163        stderr_tail: usize,
164    },
165    #[error(
166        "managed subprocess '{description}' was cancelled (stdout_tail={stdout_tail} bytes, stderr_tail={stderr_tail} bytes)"
167    )]
168    Cancelled {
169        description: String,
170        stdout_tail: usize,
171        stderr_tail: usize,
172    },
173    #[error("failed while waiting for managed subprocess '{description}': {source}")]
174    Wait {
175        description: String,
176        #[source]
177        source: std::io::Error,
178    },
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
182enum TerminationReason {
183    Timeout,
184    Cancelled,
185}
186
187#[derive(Debug)]
188struct BoundedCapture {
189    bytes: Vec<u8>,
190    max_bytes: usize,
191    truncated: bool,
192}
193
194impl BoundedCapture {
195    fn new(max_bytes: usize) -> Self {
196        Self {
197            bytes: Vec::new(),
198            max_bytes,
199            truncated: false,
200        }
201    }
202
203    fn push(&mut self, chunk: &[u8]) {
204        if chunk.is_empty() {
205            return;
206        }
207        if self.max_bytes == 0 {
208            self.truncated = true;
209            return;
210        }
211        if chunk.len() >= self.max_bytes {
212            self.bytes.clear();
213            self.bytes
214                .extend_from_slice(&chunk[chunk.len() - self.max_bytes..]);
215            self.truncated = true;
216            return;
217        }
218
219        let next_len = self.bytes.len() + chunk.len();
220        if next_len > self.max_bytes {
221            let excess = next_len - self.max_bytes;
222            self.bytes.drain(..excess);
223            self.truncated = true;
224        }
225        self.bytes.extend_from_slice(chunk);
226    }
227}
228
229pub(crate) fn execute_safe_command(safe_command: &SafeCommand, cwd: &Path) -> Result<Output> {
230    let (mut command, description) = match safe_command {
231        SafeCommand::Argv { argv } => {
232            let command = build_argv_command(argv)?;
233            (command, argv.join(" "))
234        }
235    };
236
237    command
238        .current_dir(cwd)
239        .stdin(Stdio::null())
240        .stdout(Stdio::piped())
241        .stderr(Stdio::piped());
242
243    execute_managed_command(ManagedCommand::new(
244        command,
245        description,
246        TimeoutClass::CiGate,
247    ))
248    .map(ManagedOutput::into_output)
249    .map_err(Into::into)
250}
251
252pub(crate) fn execute_managed_command(
253    mut managed: ManagedCommand,
254) -> std::result::Result<ManagedOutput, ManagedProcessError> {
255    managed
256        .command
257        .stdin(Stdio::null())
258        .stdout(Stdio::piped())
259        .stderr(Stdio::piped());
260
261    #[cfg(unix)]
262    // SAFETY: `pre_exec` only calls `setpgid(0, 0)` to isolate the child process group.
263    unsafe {
264        managed.command.pre_exec(|| {
265            let _ = libc::setpgid(0, 0);
266            Ok(())
267        });
268    }
269
270    let description = managed.description.clone();
271    let timeout = managed
272        .timeout_override
273        .unwrap_or_else(|| managed.timeout_class.timeout());
274    let mut child = managed
275        .command
276        .spawn()
277        .map_err(|source| ManagedProcessError::Spawn {
278            description: description.clone(),
279            source,
280        })?;
281
282    let stdout = child
283        .stdout
284        .take()
285        .ok_or_else(|| ManagedProcessError::Wait {
286            description: description.clone(),
287            source: std::io::Error::other("child stdout pipe was not available"),
288        })?;
289    let stderr = child
290        .stderr
291        .take()
292        .ok_or_else(|| ManagedProcessError::Wait {
293            description: description.clone(),
294            source: std::io::Error::other("child stderr pipe was not available"),
295        })?;
296
297    let stdout_capture = Arc::new(Mutex::new(BoundedCapture::new(
298        managed.capture_limits.stdout_max_bytes,
299    )));
300    let stderr_capture = Arc::new(Mutex::new(BoundedCapture::new(
301        managed.capture_limits.stderr_max_bytes,
302    )));
303    let stdout_handle = spawn_capture_thread(stdout, Arc::clone(&stdout_capture));
304    let stderr_handle = spawn_capture_thread(stderr, Arc::clone(&stderr_capture));
305
306    let termination = wait_for_child(
307        &mut child,
308        timeout,
309        managed.cancellation.as_ref().map(Arc::clone),
310    )
311    .map_err(|source| ManagedProcessError::Wait {
312        description: description.clone(),
313        source,
314    })?;
315
316    join_capture_thread(stdout_handle);
317    join_capture_thread(stderr_handle);
318
319    let stdout_capture = unwrap_capture(stdout_capture);
320    let stderr_capture = unwrap_capture(stderr_capture);
321
322    if let Some(reason) = termination {
323        return Err(match reason {
324            TerminationReason::Timeout => ManagedProcessError::TimedOut {
325                description,
326                timeout,
327                stdout_tail: stdout_capture.bytes.len(),
328                stderr_tail: stderr_capture.bytes.len(),
329            },
330            TerminationReason::Cancelled => ManagedProcessError::Cancelled {
331                description,
332                stdout_tail: stdout_capture.bytes.len(),
333                stderr_tail: stderr_capture.bytes.len(),
334            },
335        });
336    }
337
338    let status = child.wait().map_err(|source| ManagedProcessError::Wait {
339        description,
340        source,
341    })?;
342
343    Ok(ManagedOutput {
344        status,
345        stdout: stdout_capture.bytes,
346        stderr: stderr_capture.bytes,
347        stdout_truncated: stdout_capture.truncated,
348        stderr_truncated: stderr_capture.truncated,
349    })
350}
351
352pub(crate) fn sleep_with_cancellation(
353    duration: Duration,
354    cancellation: Option<&AtomicBool>,
355) -> std::result::Result<(), RetryWaitError> {
356    let deadline = Instant::now() + duration;
357    while Instant::now() < deadline {
358        if cancellation.is_some_and(|flag| flag.load(Ordering::SeqCst)) {
359            return Err(RetryWaitError::Cancelled);
360        }
361        let remaining = deadline.saturating_duration_since(Instant::now());
362        thread::sleep(remaining.min(timeouts::MANAGED_RETRY_POLL_INTERVAL));
363    }
364    Ok(())
365}
366
367#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
368pub(crate) enum RetryWaitError {
369    #[error("retry wait cancelled")]
370    Cancelled,
371}
372
373fn build_argv_command(argv: &[String]) -> Result<Command> {
374    let (program, args) = argv
375        .split_first()
376        .ok_or_else(|| anyhow::anyhow!("argv command must include at least one element"))?;
377    if program.trim().is_empty() {
378        bail!("argv command program must be non-empty.");
379    }
380
381    let mut command = Command::new(PathBuf::from(program));
382    command.args(args);
383    Ok(command)
384}
385
386fn spawn_capture_thread(
387    mut reader: impl Read + Send + 'static,
388    capture: Arc<Mutex<BoundedCapture>>,
389) -> thread::JoinHandle<()> {
390    thread::spawn(move || {
391        let mut buf = [0_u8; 8192];
392        loop {
393            match reader.read(&mut buf) {
394                Ok(0) => break,
395                Ok(n) => {
396                    let mut guard = capture
397                        .lock()
398                        .unwrap_or_else(|poisoned| poisoned.into_inner());
399                    guard.push(&buf[..n]);
400                }
401                Err(err) => {
402                    log::debug!(
403                        "managed subprocess reader exiting after read error: {}",
404                        err
405                    );
406                    break;
407                }
408            }
409        }
410    })
411}
412
413fn join_capture_thread(handle: thread::JoinHandle<()>) {
414    if let Err(err) = handle.join() {
415        log::debug!("managed subprocess capture thread panicked: {:?}", err);
416    }
417}
418
419fn unwrap_capture(capture: Arc<Mutex<BoundedCapture>>) -> BoundedCapture {
420    match Arc::try_unwrap(capture) {
421        Ok(mutex) => mutex
422            .into_inner()
423            .unwrap_or_else(|poisoned| poisoned.into_inner()),
424        Err(shared) => {
425            let mut guard = shared
426                .lock()
427                .unwrap_or_else(|poisoned| poisoned.into_inner());
428            BoundedCapture {
429                bytes: std::mem::take(&mut guard.bytes),
430                max_bytes: guard.max_bytes,
431                truncated: guard.truncated,
432            }
433        }
434    }
435}
436
437fn wait_for_child(
438    child: &mut std::process::Child,
439    timeout: Duration,
440    cancellation: Option<Arc<AtomicBool>>,
441) -> std::io::Result<Option<TerminationReason>> {
442    let start = Instant::now();
443    let mut termination: Option<(TerminationReason, Instant)> = None;
444
445    loop {
446        if termination.is_none() {
447            if cancellation
448                .as_ref()
449                .is_some_and(|flag| flag.load(Ordering::SeqCst))
450            {
451                signal_child(child, false);
452                termination = Some((TerminationReason::Cancelled, Instant::now()));
453            } else if start.elapsed() > timeout {
454                signal_child(child, false);
455                termination = Some((TerminationReason::Timeout, Instant::now()));
456            }
457        }
458
459        if let Some((_, interrupted_at)) = termination
460            && interrupted_at.elapsed() > timeouts::MANAGED_SUBPROCESS_INTERRUPT_GRACE
461        {
462            signal_child(child, true);
463            reap_killed_child(child)?;
464        }
465
466        if let Some(status) = child.try_wait()? {
467            if status.success() {
468                return Ok(None);
469            }
470            return Ok(termination.map(|(reason, _)| reason));
471        }
472
473        thread::sleep(timeouts::MANAGED_SUBPROCESS_POLL_INTERVAL);
474    }
475}
476
477fn signal_child(child: &mut std::process::Child, hard_kill: bool) {
478    #[cfg(unix)]
479    {
480        let pid = child.id() as i32;
481        let signal = if hard_kill {
482            libc::SIGKILL
483        } else {
484            libc::SIGINT
485        };
486        // SAFETY: pid is the live child PID returned by std::process::Child. Negative pid
487        // targets the child's process group, which we created in `pre_exec`.
488        unsafe {
489            let _ = libc::kill(-pid, signal);
490        }
491    }
492
493    #[cfg(not(unix))]
494    {
495        let _ = child.kill();
496    }
497}
498
499fn reap_killed_child(child: &mut std::process::Child) -> std::io::Result<()> {
500    let start = Instant::now();
501    while start.elapsed() <= timeouts::MANAGED_SUBPROCESS_REAP_TIMEOUT {
502        if child.try_wait()?.is_some() {
503            return Ok(());
504        }
505        thread::sleep(timeouts::MANAGED_SUBPROCESS_POLL_INTERVAL);
506    }
507    Ok(())
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513
514    #[cfg(unix)]
515    #[test]
516    fn managed_command_bounds_captured_output() {
517        let mut command = Command::new("/bin/sh");
518        command.arg("-c").arg("printf '1234567890abcdef'");
519        let output = execute_managed_command(
520            ManagedCommand::new(command, "capture", TimeoutClass::Probe).with_capture_limits(
521                CaptureLimits {
522                    stdout_max_bytes: 4,
523                    stderr_max_bytes: 4,
524                },
525            ),
526        )
527        .expect("managed command should succeed");
528
529        assert_eq!(String::from_utf8_lossy(&output.stdout), "cdef");
530        assert!(output.stdout_truncated);
531    }
532
533    #[cfg(unix)]
534    #[test]
535    fn sleep_with_cancellation_stops_early() {
536        let cancelled = AtomicBool::new(true);
537        let result = sleep_with_cancellation(Duration::from_secs(1), Some(&cancelled));
538        assert!(matches!(result, Err(RetryWaitError::Cancelled)));
539    }
540
541    #[cfg(unix)]
542    #[test]
543    fn managed_command_times_out() {
544        let mut command = Command::new("/bin/sh");
545        command.arg("-c").arg("sleep 5");
546        let err = execute_managed_command(
547            ManagedCommand::new(command, "timeout", TimeoutClass::Probe)
548                .with_timeout(Duration::from_millis(100)),
549        )
550        .expect_err("managed command should time out");
551
552        assert!(matches!(err, ManagedProcessError::TimedOut { .. }));
553    }
554}