haz-exec 0.1.0

Async task execution engine for haz.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
//! Production [`ProcessSpawner`] backend over [`tokio::process::Command`].
//!
//! Pipes are attached to stdout and stderr, stdin is closed, and
//! the child is configured to receive `SIGKILL` on drop so a
//! panicking executor task does not leak processes. On Unix the
//! child is also placed in its own process group at spawn time so
//! the cancellation flow (`EXEC-013`, `EXEC-014`) can signal the
//! whole group with `kill(-pgid, sig)` and reach any subprocesses
//! the task itself spawned. On Windows there is no equivalent
//! process-group primitive: only [`Signal::Kill`] is implemented
//! (via [`tokio::process::Child::start_kill`]), and SIGTERM /
//! SIGINT surface [`std::io::ErrorKind::Unsupported`].

use std::io::{Error as IoError, ErrorKind};
use std::process::Stdio;

use snafu::ResultExt;
use tokio::process::{Child, ChildStderr, ChildStdout, Command};

use crate::process::{
    ExitStatus, Process, ProcessError, ProcessId, ProcessSpawner, Signal, SpawnFailedSnafu,
    SpawnPlan, Spawned, WaitFailedSnafu,
};

/// Production [`ProcessSpawner`] over [`tokio::process::Command`].
///
/// The type holds no state and is freely cloneable; instantiate it
/// with [`Self::new`] (or [`Default::default`]) and pass it as a
/// generic parameter to the executor.
#[derive(Debug, Default, Clone, Copy)]
pub struct StdProcessSpawner;

impl StdProcessSpawner {
    /// Construct a new spawner.
    #[must_use]
    pub fn new() -> Self {
        Self
    }
}

impl ProcessSpawner for StdProcessSpawner {
    type Process = StdProcess;

    async fn spawn(&self, plan: &SpawnPlan) -> Result<Spawned<Self::Process>, ProcessError> {
        if !plan.cwd.is_absolute() {
            return Err(ProcessError::NonAbsoluteCwd {
                cwd: plan.cwd.clone(),
            });
        }

        let mut command = Command::new(&plan.program);
        command.args(&plan.args);
        command.current_dir(&plan.cwd);
        command.env_clear();
        for (key, value) in &plan.env {
            command.env(key, value);
        }
        command.stdin(Stdio::null());
        command.stdout(Stdio::piped());
        command.stderr(Stdio::piped());
        command.kill_on_drop(true);
        #[cfg(unix)]
        {
            // Place the child in its own process group so the
            // cancellation flow can `kill(-pgid, sig)` and reach
            // any subprocesses the task itself spawned
            // (`EXEC-013` step 2, `EXEC-014`).
            command.process_group(0);
        }

        let mut child = command.spawn().with_context(|_| SpawnFailedSnafu {
            program: plan.program.clone(),
        })?;

        let stdout = child
            .stdout
            .take()
            .expect("stdout was configured as a pipe");
        let stderr = child
            .stderr
            .take()
            .expect("stderr was configured as a pipe");

        Ok(Spawned {
            process: StdProcess { child },
            stdout,
            stderr,
        })
    }
}

/// Production [`Process`] wrapping [`tokio::process::Child`].
#[derive(Debug)]
pub struct StdProcess {
    child: Child,
}

impl Process for StdProcess {
    type Stdout = ChildStdout;
    type Stderr = ChildStderr;

    fn id(&self) -> Option<ProcessId> {
        self.child.id().map(ProcessId)
    }

    fn send_signal(&mut self, signal: Signal) -> Result<(), ProcessError> {
        let pid = self.id();
        #[cfg(unix)]
        {
            deliver_unix_signal(&mut self.child, signal, pid)
        }
        #[cfg(not(unix))]
        {
            deliver_non_unix_signal(&mut self.child, signal, pid)
        }
    }

    async fn wait(&mut self) -> Result<ExitStatus, ProcessError> {
        let pid = self.id();
        self.child
            .wait()
            .await
            .with_context(|_| WaitFailedSnafu { pid })
    }
}

/// Deliver `signal` to the spawned child's process group on Unix.
///
/// All three [`Signal`] variants route through
/// `nix::sys::signal::kill` against `-pgid` so any subprocesses the
/// task itself forked also receive the signal (`EXEC-013` step 2,
/// `EXEC-014`). The child was placed in its own process group at
/// spawn time, so the group id equals the child's pid.
#[cfg(unix)]
fn deliver_unix_signal(
    child: &mut Child,
    signal: Signal,
    pid: Option<ProcessId>,
) -> Result<(), ProcessError> {
    use nix::sys::signal::{Signal as NixSignal, kill as nix_kill};
    use nix::unistd::Pid;

    let Some(ProcessId(raw_pid)) = pid else {
        return Err(ProcessError::SignalFailed {
            signal,
            pid: None,
            source: IoError::new(ErrorKind::NotFound, "child has already been reaped"),
        });
    };

    let signed_pid = i32::try_from(raw_pid).map_err(|_| ProcessError::SignalFailed {
        signal,
        pid,
        source: IoError::other("pid exceeds i32 range"),
    })?;
    let group_target = Pid::from_raw(-signed_pid);

    let nix_sig = match signal {
        Signal::Terminate => NixSignal::SIGTERM,
        Signal::Interrupt => NixSignal::SIGINT,
        Signal::Kill => NixSignal::SIGKILL,
    };

    let send_result = nix_kill(group_target, nix_sig).map_err(|errno| ProcessError::SignalFailed {
        signal,
        pid,
        source: IoError::from_raw_os_error(errno as i32),
    });

    // SIGKILL also needs the kill_on_drop bookkeeping in
    // tokio so a later drop does not race against the kernel
    // having already reaped the child.
    if signal == Signal::Kill && send_result.is_ok() {
        let _ = child.start_kill();
    }

    send_result
}

/// Deliver `signal` to the spawned child on non-Unix hosts.
///
/// Only [`Signal::Kill`] is supported; SIGTERM / SIGINT surface
/// [`std::io::ErrorKind::Unsupported`]. The per-future grace
/// dance in `run_fresh` swallows the SIGTERM error and proceeds
/// straight to the grace timer (then SIGKILL), so Windows hosts
/// still terminate cancelled children, just without any clean-up
/// signal.
#[cfg(not(unix))]
fn deliver_non_unix_signal(
    child: &mut Child,
    signal: Signal,
    pid: Option<ProcessId>,
) -> Result<(), ProcessError> {
    match signal {
        Signal::Kill => child
            .start_kill()
            .map_err(|source| ProcessError::SignalFailed {
                signal,
                pid,
                source,
            }),
        Signal::Interrupt | Signal::Terminate => Err(ProcessError::SignalFailed {
            signal,
            pid,
            source: IoError::new(
                ErrorKind::Unsupported,
                "SIGTERM and SIGINT are Unix-only in haz",
            ),
        }),
    }
}

#[cfg(all(test, unix))]
mod unix_tests {
    use std::ffi::OsString;
    use std::os::unix::process::ExitStatusExt;

    use tokio::io::AsyncReadExt;

    use super::{StdProcessSpawner, *};

    fn temp_cwd() -> std::path::PathBuf {
        std::env::temp_dir()
    }

    fn plan(program: &str, args: &[&str]) -> SpawnPlan {
        SpawnPlan {
            program: OsString::from(program),
            args: args.iter().map(OsString::from).collect(),
            env: Vec::new(),
            cwd: temp_cwd(),
        }
    }

    #[tokio::test]
    async fn spawn_echo_yields_stdout_and_zero_exit() {
        let spawner = StdProcessSpawner::new();
        let mut child = spawner
            .spawn(&plan("/bin/echo", &["hello"]))
            .await
            .expect("echo should spawn");

        let mut stdout_bytes = Vec::new();
        child
            .stdout
            .read_to_end(&mut stdout_bytes)
            .await
            .expect("stdout drain should succeed");
        let mut stderr_bytes = Vec::new();
        child
            .stderr
            .read_to_end(&mut stderr_bytes)
            .await
            .expect("stderr drain should succeed");

        let status = child.process.wait().await.expect("wait should succeed");

        assert_eq!(stdout_bytes, b"hello\n");
        assert!(stderr_bytes.is_empty());
        assert!(status.success());
        assert_eq!(status.code(), Some(0));
    }

    #[tokio::test]
    async fn spawn_false_yields_nonzero_exit() {
        let spawner = StdProcessSpawner::new();
        let mut child = spawner
            .spawn(&plan("/bin/sh", &["-c", "exit 7"]))
            .await
            .expect("sh should spawn");

        let status = child.process.wait().await.expect("wait should succeed");

        assert!(!status.success());
        assert_eq!(status.code(), Some(7));
    }

    #[tokio::test]
    async fn spawn_propagates_environment() {
        let spawner = StdProcessSpawner::new();
        let mut p = SpawnPlan {
            program: OsString::from("/bin/sh"),
            args: vec![
                OsString::from("-c"),
                OsString::from("printf '%s' \"$HAZ_TEST_VAR\""),
            ],
            env: vec![(OsString::from("HAZ_TEST_VAR"), OsString::from("propagated"))],
            cwd: temp_cwd(),
        };
        // Mutate to ensure the vector is what gets propagated.
        p.env.push((OsString::from("UNUSED"), OsString::from("ok")));

        let mut child = spawner.spawn(&p).await.expect("sh should spawn");
        let mut stdout = Vec::new();
        child
            .stdout
            .read_to_end(&mut stdout)
            .await
            .expect("stdout drain should succeed");
        let status = child.process.wait().await.expect("wait should succeed");

        assert!(status.success());
        assert_eq!(stdout, b"propagated");
    }

    #[tokio::test]
    async fn spawn_runs_in_supplied_cwd() {
        let temp = std::env::temp_dir();
        let spawner = StdProcessSpawner::new();
        let mut child = spawner
            .spawn(&SpawnPlan {
                program: OsString::from("/bin/sh"),
                args: vec![OsString::from("-c"), OsString::from("pwd")],
                env: Vec::new(),
                cwd: temp.clone(),
            })
            .await
            .expect("sh should spawn");
        let mut stdout = Vec::new();
        child
            .stdout
            .read_to_end(&mut stdout)
            .await
            .expect("stdout drain should succeed");
        let status = child.process.wait().await.expect("wait should succeed");

        assert!(status.success());
        // /tmp may itself be a symlink (e.g. macOS), so we accept any
        // cwd whose canonical form matches the supplied one.
        let observed = std::path::PathBuf::from(
            String::from_utf8(stdout)
                .expect("pwd output should be UTF-8")
                .trim_end()
                .to_owned(),
        );
        let expected_canonical = std::fs::canonicalize(&temp).expect("temp should canonicalise");
        let observed_canonical =
            std::fs::canonicalize(&observed).expect("observed cwd should canonicalise");
        assert_eq!(observed_canonical, expected_canonical);
    }

    #[tokio::test]
    async fn spawn_rejects_relative_cwd() {
        let spawner = StdProcessSpawner::new();
        let p = SpawnPlan {
            program: OsString::from("/bin/echo"),
            args: vec![OsString::from("hi")],
            env: Vec::new(),
            cwd: std::path::PathBuf::from("relative/path"),
        };
        match spawner.spawn(&p).await {
            Err(ProcessError::NonAbsoluteCwd { cwd }) => {
                assert_eq!(cwd, std::path::PathBuf::from("relative/path"));
            }
            Err(other) => panic!("expected NonAbsoluteCwd, got {other:?}"),
            Ok(_) => panic!("expected NonAbsoluteCwd, got success"),
        }
    }

    #[tokio::test]
    async fn spawn_missing_program_surfaces_spawn_failed() {
        let spawner = StdProcessSpawner::new();
        let p = SpawnPlan {
            program: OsString::from("/nonexistent/haz-exec/spawn-target-please-dont-exist"),
            args: Vec::new(),
            env: Vec::new(),
            cwd: temp_cwd(),
        };
        match spawner.spawn(&p).await {
            Err(ProcessError::SpawnFailed { program, .. }) => {
                assert!(program.to_string_lossy().contains("nonexistent"));
            }
            Err(other) => panic!("expected SpawnFailed, got {other:?}"),
            Ok(_) => panic!("expected SpawnFailed, got success"),
        }
    }

    #[tokio::test]
    async fn send_signal_kill_terminates_long_running_child() {
        let spawner = StdProcessSpawner::new();
        let mut child = spawner
            .spawn(&plan("/bin/sh", &["-c", "sleep 60"]))
            .await
            .expect("sh should spawn");

        assert!(child.process.id().is_some());
        child
            .process
            .send_signal(Signal::Kill)
            .expect("kill should be queued");
        let status = child.process.wait().await.expect("wait should succeed");

        assert!(!status.success());
        // SIGKILL == 9 on every Unix platform haz currently targets.
        assert_eq!(status.signal(), Some(9));
        assert!(
            child.process.id().is_none(),
            "id() should be None after wait"
        );
    }

    #[tokio::test]
    async fn send_signal_terminate_kills_child_that_respects_sigterm() {
        let spawner = StdProcessSpawner::new();
        // `sleep 60` exits on SIGTERM with signal-terminated status.
        let mut child = spawner
            .spawn(&plan("/bin/sh", &["-c", "sleep 60"]))
            .await
            .expect("sh should spawn");

        child
            .process
            .send_signal(Signal::Terminate)
            .expect("SIGTERM should be delivered");
        let status = child.process.wait().await.expect("wait should succeed");

        assert!(!status.success());
        // SIGTERM == 15 on every Unix platform haz targets.
        assert_eq!(status.signal(), Some(15));
    }

    #[tokio::test]
    async fn send_signal_interrupt_kills_child_that_respects_sigint() {
        let spawner = StdProcessSpawner::new();
        let mut child = spawner
            .spawn(&plan("/bin/sh", &["-c", "sleep 60"]))
            .await
            .expect("sh should spawn");

        child
            .process
            .send_signal(Signal::Interrupt)
            .expect("SIGINT should be delivered");
        let status = child.process.wait().await.expect("wait should succeed");

        assert!(!status.success());
        // SIGINT == 2 on every Unix platform haz targets.
        assert_eq!(status.signal(), Some(2));
    }

    #[tokio::test]
    async fn send_signal_terminate_after_reap_errors_not_found() {
        let spawner = StdProcessSpawner::new();
        let mut child = spawner
            .spawn(&plan("/bin/sh", &["-c", "exit 0"]))
            .await
            .expect("sh should spawn");
        // Drain the streams so wait can complete.
        let mut stdout = Vec::new();
        child
            .stdout
            .read_to_end(&mut stdout)
            .await
            .expect("stdout drain should succeed");
        let mut stderr = Vec::new();
        child
            .stderr
            .read_to_end(&mut stderr)
            .await
            .expect("stderr drain should succeed");
        let _ = child.process.wait().await.expect("wait should succeed");

        // The child is reaped; pid is None and the helper rejects.
        match child.process.send_signal(Signal::Terminate) {
            Err(ProcessError::SignalFailed {
                signal,
                pid,
                source,
            }) => {
                assert_eq!(signal, Signal::Terminate);
                assert!(pid.is_none());
                assert_eq!(source.kind(), ErrorKind::NotFound);
            }
            other => panic!("expected SignalFailed, got {other:?}"),
        }
    }
}