processkit 1.0.1

Async child-process management for tokio: whole-tree kill-on-drop (no orphans), plus streaming, pipelines, timeouts, and supervision
Documentation
//! Batch helpers over real subprocesses: wait_all and output_all.

use std::time::Duration;

use processkit::{Command, Outcome, ProcessGroup, output_all, wait_all};

use crate::common::*;

#[tokio::test]
#[ignore = "spawns real subprocesses and joins on all of them"]
async fn wait_all_collects_every_exit_code_in_order() {
    let group = ProcessGroup::new().expect("create group");
    // Mixed finish order: a quick printer between two sleepers, so the join
    // genuinely has to wait on the slow ones, not just the first finisher.
    let mut a = group.start(&sleep_secs(1)).await.expect("start a");
    let mut b = group.start(&five_lines()).await.expect("start b");
    let mut c = group.start(&sleep_secs(2)).await.expect("start c");

    let codes = tokio::time::timeout(
        Duration::from_secs(20),
        wait_all(&mut [&mut a, &mut b, &mut c]),
    )
    .await
    .expect("join finished in time")
    .expect("join");

    assert_eq!(codes.len(), 3, "one code per process, in input order");
    assert!(
        codes.iter().all(|c| *c == Outcome::Exited(0)),
        "every child exits cleanly: {codes:?}"
    );
}

#[tokio::test]
#[ignore = "spawns real subprocesses to compare kill-on-drop provenance"]
async fn kill_on_drop_provenance_distinguishes_private_and_shared_groups() {
    // D10: a `Command::start()` handle owns a private group → it kills its tree
    // on drop; a `ProcessGroup::start()` handle lives in a shared group → it does
    // not (the group owner tears the tree down).
    let private = sleep_secs(5).start().await.expect("private start");
    assert!(
        private.kills_tree_on_drop(),
        "a private-group handle must kill its tree on drop"
    );
    drop(private);

    let group = ProcessGroup::new().expect("create group");
    let shared = group.start(&sleep_secs(5)).await.expect("shared start");
    assert!(
        !shared.kills_tree_on_drop(),
        "a shared-group handle must not kill the tree on drop"
    );
    group.shutdown().await.expect("shutdown the shared group");
}

#[tokio::test]
#[ignore = "spawns a stdin-reading subprocess joined via wait_all"]
async fn wait_all_does_not_close_an_untaken_keep_stdin_open_pipe() {
    // B15: the borrowing wait path (`wait_exit`) must NOT close an untaken
    // `keep_stdin_open` pipe — that would break the "losers remain usable"
    // guarantee. The cost (symmetric with the documented "no output pumping"
    // non-feature) is that a `keep_stdin_open` child blocked reading stdin never
    // reaches EOF, so it never exits and `wait_all` must be externally bounded.
    // Here the child wedges, the bounding timeout elapses, and — crucially — its
    // stdin pipe is still available (it was not mutated by the join).
    let group = ProcessGroup::new().expect("create group");
    let reads_stdin = if cfg!(windows) {
        Command::new("cmd").args(["/c", "sort"]).keep_stdin_open()
    } else {
        Command::new("cat").keep_stdin_open()
    };
    let mut child = group.start(&reads_stdin).await.expect("start");
    let joined = tokio::time::timeout(Duration::from_secs(2), wait_all(&mut [&mut child])).await;
    assert!(
        joined.is_err(),
        "B15: wait_all must NOT auto-close the untaken keep_stdin_open pipe; the \
         stdin-blocked child wedges until externally bounded, got {joined:?}"
    );
    // The pipe is still the caller's to use — the loser was not mutated.
    assert!(
        child.take_stdin().is_some(),
        "B15: the untaken keep_stdin_open pipe must remain available after the join"
    );
    // Clean up the wedged child.
    child.start_kill().expect("kill the wedged child");
    let _ = tokio::time::timeout(Duration::from_secs(10), child.wait()).await;
}

#[tokio::test]
#[ignore = "spawns real subprocesses with a bounded batch"]
async fn output_all_runs_a_bounded_batch_and_collects_all() {
    let group = ProcessGroup::new().expect("create group");
    // Six commands, cap two: the bound throttles, every result still lands.
    let cmds: Vec<Command> = (0..6).map(|_| five_lines()).collect();
    let results = output_all(cmds, 2, &group).await;

    assert_eq!(results.len(), 6);
    for (i, r) in results.iter().enumerate() {
        let out = r
            .as_ref()
            .unwrap_or_else(|e| panic!("command {i} errored: {e}"));
        assert!(
            out.is_success(),
            "command {i} exits 0, got {:?}",
            out.code()
        );
    }
}

#[tokio::test]
#[ignore = "spawns real subprocesses; a non-zero exit is collected, not raised"]
async fn output_all_collects_a_failing_command_as_data() {
    let group = ProcessGroup::new().expect("create group");
    let cmds = vec![five_lines(), failing_exit(3), five_lines()];
    let results = output_all(cmds, 3, &group).await;

    assert_eq!(results.len(), 3);
    assert!(results[0].as_ref().expect("ok").is_success());
    assert_eq!(
        results[1]
            .as_ref()
            .expect("non-zero exit is Ok data")
            .code(),
        Some(3),
        "the failure is collected as data, not short-circuited"
    );
    assert!(results[2].as_ref().expect("ok").is_success());
}

#[tokio::test]
#[ignore = "spawns a real subprocess fed a failing stdin source, observed via wait_all"]
async fn failing_stdin_source_surfaces_as_error_stdin_via_wait_all() {
    use processkit::{Error, Stdin};
    use std::pin::Pin;
    use std::task::{Context, Poll};

    // E8: the wait_any/wait_all path (wait_exit) must observe a finished stdin
    // writer that failed for a non-broken-pipe reason and surface it as
    // Error::Stdin on an otherwise-successful run, matching the bulk verbs' B3
    // contract — previously this path never called observe_stdin_task, so the
    // failure was silently lost and the join reported a clean Outcome.
    //
    // Determinism: observe_stdin_task only stashes a *finished* writer. The
    // child cannot see EOF (and so cannot exit) until the writer's final action
    // — drop(sink) — has run, and FailingReader fails on its first poll with no
    // await afterward, so the writer task completes before the child exits. On
    // the default single-threaded `#[tokio::test]` runtime the writer is thus
    // always finished before wait_exit reaps the child; there is no cross-thread
    // race window. (A multi_thread flavor could in principle starve the writer's
    // completion past the reap — keep this on the default current-thread runtime.)

    /// A stdin source whose read fails immediately with a non-broken-pipe error.
    struct FailingReader;
    impl tokio::io::AsyncRead for FailingReader {
        fn poll_read(
            self: Pin<&mut Self>,
            _cx: &mut Context<'_>,
            _buf: &mut tokio::io::ReadBuf<'_>,
        ) -> Poll<std::io::Result<()>> {
            Poll::Ready(Err(std::io::Error::other("stdin source failed")))
        }
    }

    // `sort` (Windows) / `cat` (Unix): both read stdin and exit 0 on EOF.
    let reads_stdin = if cfg!(windows) {
        Command::new("cmd").args(["/c", "sort"])
    } else {
        Command::new("cat")
    };
    let mut child = reads_stdin
        .stdin(Stdin::from_reader(FailingReader))
        .start()
        .await
        .expect("start");
    let err = tokio::time::timeout(Duration::from_secs(15), wait_all(&mut [&mut child]))
        .await
        .expect("wait_all must not hang")
        .expect_err("a failed stdin writer on a successful run must surface as Error::Stdin");
    assert!(matches!(err, Error::Stdin { .. }), "got: {err:?}");
}