Skip to main content

haz_exec/
std_impl.rs

1//! Production [`ProcessSpawner`] backend over [`tokio::process::Command`].
2//!
3//! Pipes are attached to stdout and stderr, stdin is closed, and
4//! the child is configured to receive `SIGKILL` on drop so a
5//! panicking executor task does not leak processes. On Unix the
6//! child is also placed in its own process group at spawn time so
7//! the cancellation flow (`EXEC-013`, `EXEC-014`) can signal the
8//! whole group with `kill(-pgid, sig)` and reach any subprocesses
9//! the task itself spawned. On Windows there is no equivalent
10//! process-group primitive: only [`Signal::Kill`] is implemented
11//! (via [`tokio::process::Child::start_kill`]), and SIGTERM /
12//! SIGINT surface [`std::io::ErrorKind::Unsupported`].
13
14use std::io::{Error as IoError, ErrorKind};
15use std::process::Stdio;
16
17use snafu::ResultExt;
18use tokio::process::{Child, ChildStderr, ChildStdout, Command};
19
20use crate::process::{
21    ExitStatus, Process, ProcessError, ProcessId, ProcessSpawner, Signal, SpawnFailedSnafu,
22    SpawnPlan, Spawned, WaitFailedSnafu,
23};
24
25/// Production [`ProcessSpawner`] over [`tokio::process::Command`].
26///
27/// The type holds no state and is freely cloneable; instantiate it
28/// with [`Self::new`] (or [`Default::default`]) and pass it as a
29/// generic parameter to the executor.
30#[derive(Debug, Default, Clone, Copy)]
31pub struct StdProcessSpawner;
32
33impl StdProcessSpawner {
34    /// Construct a new spawner.
35    #[must_use]
36    pub fn new() -> Self {
37        Self
38    }
39}
40
41impl ProcessSpawner for StdProcessSpawner {
42    type Process = StdProcess;
43
44    async fn spawn(&self, plan: &SpawnPlan) -> Result<Spawned<Self::Process>, ProcessError> {
45        if !plan.cwd.is_absolute() {
46            return Err(ProcessError::NonAbsoluteCwd {
47                cwd: plan.cwd.clone(),
48            });
49        }
50
51        let mut command = Command::new(&plan.program);
52        command.args(&plan.args);
53        command.current_dir(&plan.cwd);
54        command.env_clear();
55        for (key, value) in &plan.env {
56            command.env(key, value);
57        }
58        command.stdin(Stdio::null());
59        command.stdout(Stdio::piped());
60        command.stderr(Stdio::piped());
61        command.kill_on_drop(true);
62        #[cfg(unix)]
63        {
64            // Place the child in its own process group so the
65            // cancellation flow can `kill(-pgid, sig)` and reach
66            // any subprocesses the task itself spawned
67            // (`EXEC-013` step 2, `EXEC-014`).
68            command.process_group(0);
69        }
70
71        let mut child = command.spawn().with_context(|_| SpawnFailedSnafu {
72            program: plan.program.clone(),
73        })?;
74
75        let stdout = child
76            .stdout
77            .take()
78            .expect("stdout was configured as a pipe");
79        let stderr = child
80            .stderr
81            .take()
82            .expect("stderr was configured as a pipe");
83
84        Ok(Spawned {
85            process: StdProcess { child },
86            stdout,
87            stderr,
88        })
89    }
90}
91
92/// Production [`Process`] wrapping [`tokio::process::Child`].
93#[derive(Debug)]
94pub struct StdProcess {
95    child: Child,
96}
97
98impl Process for StdProcess {
99    type Stdout = ChildStdout;
100    type Stderr = ChildStderr;
101
102    fn id(&self) -> Option<ProcessId> {
103        self.child.id().map(ProcessId)
104    }
105
106    fn send_signal(&mut self, signal: Signal) -> Result<(), ProcessError> {
107        let pid = self.id();
108        #[cfg(unix)]
109        {
110            deliver_unix_signal(&mut self.child, signal, pid)
111        }
112        #[cfg(not(unix))]
113        {
114            deliver_non_unix_signal(&mut self.child, signal, pid)
115        }
116    }
117
118    async fn wait(&mut self) -> Result<ExitStatus, ProcessError> {
119        let pid = self.id();
120        self.child
121            .wait()
122            .await
123            .with_context(|_| WaitFailedSnafu { pid })
124    }
125}
126
127/// Deliver `signal` to the spawned child's process group on Unix.
128///
129/// All three [`Signal`] variants route through
130/// `nix::sys::signal::kill` against `-pgid` so any subprocesses the
131/// task itself forked also receive the signal (`EXEC-013` step 2,
132/// `EXEC-014`). The child was placed in its own process group at
133/// spawn time, so the group id equals the child's pid.
134#[cfg(unix)]
135fn deliver_unix_signal(
136    child: &mut Child,
137    signal: Signal,
138    pid: Option<ProcessId>,
139) -> Result<(), ProcessError> {
140    use nix::sys::signal::{Signal as NixSignal, kill as nix_kill};
141    use nix::unistd::Pid;
142
143    let Some(ProcessId(raw_pid)) = pid else {
144        return Err(ProcessError::SignalFailed {
145            signal,
146            pid: None,
147            source: IoError::new(ErrorKind::NotFound, "child has already been reaped"),
148        });
149    };
150
151    let signed_pid = i32::try_from(raw_pid).map_err(|_| ProcessError::SignalFailed {
152        signal,
153        pid,
154        source: IoError::other("pid exceeds i32 range"),
155    })?;
156    let group_target = Pid::from_raw(-signed_pid);
157
158    let nix_sig = match signal {
159        Signal::Terminate => NixSignal::SIGTERM,
160        Signal::Interrupt => NixSignal::SIGINT,
161        Signal::Kill => NixSignal::SIGKILL,
162    };
163
164    let send_result = nix_kill(group_target, nix_sig).map_err(|errno| ProcessError::SignalFailed {
165        signal,
166        pid,
167        source: IoError::from_raw_os_error(errno as i32),
168    });
169
170    // SIGKILL also needs the kill_on_drop bookkeeping in
171    // tokio so a later drop does not race against the kernel
172    // having already reaped the child.
173    if signal == Signal::Kill && send_result.is_ok() {
174        let _ = child.start_kill();
175    }
176
177    send_result
178}
179
180/// Deliver `signal` to the spawned child on non-Unix hosts.
181///
182/// Only [`Signal::Kill`] is supported; SIGTERM / SIGINT surface
183/// [`std::io::ErrorKind::Unsupported`]. The per-future grace
184/// dance in `run_fresh` swallows the SIGTERM error and proceeds
185/// straight to the grace timer (then SIGKILL), so Windows hosts
186/// still terminate cancelled children, just without any clean-up
187/// signal.
188#[cfg(not(unix))]
189fn deliver_non_unix_signal(
190    child: &mut Child,
191    signal: Signal,
192    pid: Option<ProcessId>,
193) -> Result<(), ProcessError> {
194    match signal {
195        Signal::Kill => child
196            .start_kill()
197            .map_err(|source| ProcessError::SignalFailed {
198                signal,
199                pid,
200                source,
201            }),
202        Signal::Interrupt | Signal::Terminate => Err(ProcessError::SignalFailed {
203            signal,
204            pid,
205            source: IoError::new(
206                ErrorKind::Unsupported,
207                "SIGTERM and SIGINT are Unix-only in haz",
208            ),
209        }),
210    }
211}
212
213#[cfg(all(test, unix))]
214mod unix_tests {
215    use std::ffi::OsString;
216    use std::os::unix::process::ExitStatusExt;
217
218    use tokio::io::AsyncReadExt;
219
220    use super::{StdProcessSpawner, *};
221
222    fn temp_cwd() -> std::path::PathBuf {
223        std::env::temp_dir()
224    }
225
226    fn plan(program: &str, args: &[&str]) -> SpawnPlan {
227        SpawnPlan {
228            program: OsString::from(program),
229            args: args.iter().map(OsString::from).collect(),
230            env: Vec::new(),
231            cwd: temp_cwd(),
232        }
233    }
234
235    #[tokio::test]
236    async fn spawn_echo_yields_stdout_and_zero_exit() {
237        let spawner = StdProcessSpawner::new();
238        let mut child = spawner
239            .spawn(&plan("/bin/echo", &["hello"]))
240            .await
241            .expect("echo should spawn");
242
243        let mut stdout_bytes = Vec::new();
244        child
245            .stdout
246            .read_to_end(&mut stdout_bytes)
247            .await
248            .expect("stdout drain should succeed");
249        let mut stderr_bytes = Vec::new();
250        child
251            .stderr
252            .read_to_end(&mut stderr_bytes)
253            .await
254            .expect("stderr drain should succeed");
255
256        let status = child.process.wait().await.expect("wait should succeed");
257
258        assert_eq!(stdout_bytes, b"hello\n");
259        assert!(stderr_bytes.is_empty());
260        assert!(status.success());
261        assert_eq!(status.code(), Some(0));
262    }
263
264    #[tokio::test]
265    async fn spawn_false_yields_nonzero_exit() {
266        let spawner = StdProcessSpawner::new();
267        let mut child = spawner
268            .spawn(&plan("/bin/sh", &["-c", "exit 7"]))
269            .await
270            .expect("sh should spawn");
271
272        let status = child.process.wait().await.expect("wait should succeed");
273
274        assert!(!status.success());
275        assert_eq!(status.code(), Some(7));
276    }
277
278    #[tokio::test]
279    async fn spawn_propagates_environment() {
280        let spawner = StdProcessSpawner::new();
281        let mut p = SpawnPlan {
282            program: OsString::from("/bin/sh"),
283            args: vec![
284                OsString::from("-c"),
285                OsString::from("printf '%s' \"$HAZ_TEST_VAR\""),
286            ],
287            env: vec![(OsString::from("HAZ_TEST_VAR"), OsString::from("propagated"))],
288            cwd: temp_cwd(),
289        };
290        // Mutate to ensure the vector is what gets propagated.
291        p.env.push((OsString::from("UNUSED"), OsString::from("ok")));
292
293        let mut child = spawner.spawn(&p).await.expect("sh should spawn");
294        let mut stdout = Vec::new();
295        child
296            .stdout
297            .read_to_end(&mut stdout)
298            .await
299            .expect("stdout drain should succeed");
300        let status = child.process.wait().await.expect("wait should succeed");
301
302        assert!(status.success());
303        assert_eq!(stdout, b"propagated");
304    }
305
306    #[tokio::test]
307    async fn spawn_runs_in_supplied_cwd() {
308        let temp = std::env::temp_dir();
309        let spawner = StdProcessSpawner::new();
310        let mut child = spawner
311            .spawn(&SpawnPlan {
312                program: OsString::from("/bin/sh"),
313                args: vec![OsString::from("-c"), OsString::from("pwd")],
314                env: Vec::new(),
315                cwd: temp.clone(),
316            })
317            .await
318            .expect("sh should spawn");
319        let mut stdout = Vec::new();
320        child
321            .stdout
322            .read_to_end(&mut stdout)
323            .await
324            .expect("stdout drain should succeed");
325        let status = child.process.wait().await.expect("wait should succeed");
326
327        assert!(status.success());
328        // /tmp may itself be a symlink (e.g. macOS), so we accept any
329        // cwd whose canonical form matches the supplied one.
330        let observed = std::path::PathBuf::from(
331            String::from_utf8(stdout)
332                .expect("pwd output should be UTF-8")
333                .trim_end()
334                .to_owned(),
335        );
336        let expected_canonical = std::fs::canonicalize(&temp).expect("temp should canonicalise");
337        let observed_canonical =
338            std::fs::canonicalize(&observed).expect("observed cwd should canonicalise");
339        assert_eq!(observed_canonical, expected_canonical);
340    }
341
342    #[tokio::test]
343    async fn spawn_rejects_relative_cwd() {
344        let spawner = StdProcessSpawner::new();
345        let p = SpawnPlan {
346            program: OsString::from("/bin/echo"),
347            args: vec![OsString::from("hi")],
348            env: Vec::new(),
349            cwd: std::path::PathBuf::from("relative/path"),
350        };
351        match spawner.spawn(&p).await {
352            Err(ProcessError::NonAbsoluteCwd { cwd }) => {
353                assert_eq!(cwd, std::path::PathBuf::from("relative/path"));
354            }
355            Err(other) => panic!("expected NonAbsoluteCwd, got {other:?}"),
356            Ok(_) => panic!("expected NonAbsoluteCwd, got success"),
357        }
358    }
359
360    #[tokio::test]
361    async fn spawn_missing_program_surfaces_spawn_failed() {
362        let spawner = StdProcessSpawner::new();
363        let p = SpawnPlan {
364            program: OsString::from("/nonexistent/haz-exec/spawn-target-please-dont-exist"),
365            args: Vec::new(),
366            env: Vec::new(),
367            cwd: temp_cwd(),
368        };
369        match spawner.spawn(&p).await {
370            Err(ProcessError::SpawnFailed { program, .. }) => {
371                assert!(program.to_string_lossy().contains("nonexistent"));
372            }
373            Err(other) => panic!("expected SpawnFailed, got {other:?}"),
374            Ok(_) => panic!("expected SpawnFailed, got success"),
375        }
376    }
377
378    #[tokio::test]
379    async fn send_signal_kill_terminates_long_running_child() {
380        let spawner = StdProcessSpawner::new();
381        let mut child = spawner
382            .spawn(&plan("/bin/sh", &["-c", "sleep 60"]))
383            .await
384            .expect("sh should spawn");
385
386        assert!(child.process.id().is_some());
387        child
388            .process
389            .send_signal(Signal::Kill)
390            .expect("kill should be queued");
391        let status = child.process.wait().await.expect("wait should succeed");
392
393        assert!(!status.success());
394        // SIGKILL == 9 on every Unix platform haz currently targets.
395        assert_eq!(status.signal(), Some(9));
396        assert!(
397            child.process.id().is_none(),
398            "id() should be None after wait"
399        );
400    }
401
402    #[tokio::test]
403    async fn send_signal_terminate_kills_child_that_respects_sigterm() {
404        let spawner = StdProcessSpawner::new();
405        // `sleep 60` exits on SIGTERM with signal-terminated status.
406        let mut child = spawner
407            .spawn(&plan("/bin/sh", &["-c", "sleep 60"]))
408            .await
409            .expect("sh should spawn");
410
411        child
412            .process
413            .send_signal(Signal::Terminate)
414            .expect("SIGTERM should be delivered");
415        let status = child.process.wait().await.expect("wait should succeed");
416
417        assert!(!status.success());
418        // SIGTERM == 15 on every Unix platform haz targets.
419        assert_eq!(status.signal(), Some(15));
420    }
421
422    #[tokio::test]
423    async fn send_signal_interrupt_kills_child_that_respects_sigint() {
424        let spawner = StdProcessSpawner::new();
425        let mut child = spawner
426            .spawn(&plan("/bin/sh", &["-c", "sleep 60"]))
427            .await
428            .expect("sh should spawn");
429
430        child
431            .process
432            .send_signal(Signal::Interrupt)
433            .expect("SIGINT should be delivered");
434        let status = child.process.wait().await.expect("wait should succeed");
435
436        assert!(!status.success());
437        // SIGINT == 2 on every Unix platform haz targets.
438        assert_eq!(status.signal(), Some(2));
439    }
440
441    #[tokio::test]
442    async fn send_signal_terminate_after_reap_errors_not_found() {
443        let spawner = StdProcessSpawner::new();
444        let mut child = spawner
445            .spawn(&plan("/bin/sh", &["-c", "exit 0"]))
446            .await
447            .expect("sh should spawn");
448        // Drain the streams so wait can complete.
449        let mut stdout = Vec::new();
450        child
451            .stdout
452            .read_to_end(&mut stdout)
453            .await
454            .expect("stdout drain should succeed");
455        let mut stderr = Vec::new();
456        child
457            .stderr
458            .read_to_end(&mut stderr)
459            .await
460            .expect("stderr drain should succeed");
461        let _ = child.process.wait().await.expect("wait should succeed");
462
463        // The child is reaped; pid is None and the helper rejects.
464        match child.process.send_signal(Signal::Terminate) {
465            Err(ProcessError::SignalFailed {
466                signal,
467                pid,
468                source,
469            }) => {
470                assert_eq!(signal, Signal::Terminate);
471                assert!(pid.is_none());
472                assert_eq!(source.kind(), ErrorKind::NotFound);
473            }
474            other => panic!("expected SignalFailed, got {other:?}"),
475        }
476    }
477}