tokio-process-tools 0.8.0

Interact with processes spawned by tokio.
Documentation
#![warn(missing_docs)]

//!
#![doc = include_str!("../README.md")]
//!

mod async_drop;
mod collector;
mod error;
mod inspector;
mod output;
mod output_stream;
mod panic_on_drop;
mod process;
mod process_handle;
mod signal;
mod terminate_on_drop;

pub use collector::{AsyncChunkCollector, AsyncLineCollector, Collector, CollectorError, Sink};
pub use error::{SpawnError, TerminationError, WaitError, WaitForLineResult};
pub use inspector::{Inspector, InspectorError};
pub use output::{Output, RawOutput};
pub use output_stream::{
    BackpressureControl, Chunk, DEFAULT_CHANNEL_CAPACITY, DEFAULT_CHUNK_SIZE, FromStreamOptions,
    LineOverflowBehavior, LineParsingOptions, LineWriteMode, Next, NumBytes, NumBytesExt,
    OutputStream, broadcast, single_subscriber,
};
pub use process::{AutoName, AutoNameSettings, Process, ProcessName};
pub use process_handle::{ProcessHandle, RunningState, Stdin};
pub use terminate_on_drop::TerminateOnDrop;

#[allow(dead_code)]
trait SendSync: Send + Sync {}
impl SendSync for broadcast::BroadcastOutputStream {}
impl SendSync for single_subscriber::SingleSubscriberOutputStream {}
impl<O: OutputStream + SendSync> SendSync for ProcessHandle<O> {}

#[cfg(test)]
mod test {
    use crate::output::Output;
    use crate::{LineParsingOptions, Next, Process, RunningState};
    use assertr::prelude::*;
    use std::time::Duration;
    use tokio::process::Command;

    #[tokio::test]
    async fn wait_with_output() {
        let mut process = Process::new(Command::new("ls"))
            .name("ls")
            .spawn_broadcast()
            .expect("Failed to spawn `ls` command");
        let Output {
            status,
            stdout,
            stderr,
        } = process
            .wait_for_completion_with_output(None, LineParsingOptions::default())
            .await
            .unwrap();
        assert_that!(status.success()).is_true();
        for expected in [
            "Cargo.lock",
            "Cargo.toml",
            "LICENSE-APACHE",
            "LICENSE-MIT",
            "README.md",
            "src",
            "target",
        ] {
            assert!(
                stdout.iter().any(|entry| entry == expected),
                "expected ls output to contain {expected:?}, got {stdout:?}"
            );
        }
        assert_that!(stderr).is_empty();
    }

    #[tokio::test]
    async fn single_subscriber_panics_on_multiple_consumers() {
        let mut process = Process::new(Command::new("ls"))
            .name("ls")
            .spawn_single_subscriber()
            .expect("Failed to spawn `ls` command");

        let _inspector = process
            .stdout()
            .inspect_lines(|_line| Next::Continue, LineParsingOptions::default());

        assert_that_panic_by(|| {
            let _inspector = process
                .stdout()
                .inspect_lines(|_line| Next::Continue, LineParsingOptions::default());
        })
        .has_type::<String>()
        .is_equal_to("Cannot create multiple consumers on SingleSubscriberOutputStream (stream: 'stdout'). Only one inspector or collector can be active at a time. Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.");

        process.wait_for_completion(None).await.unwrap();
    }

    #[tokio::test]
    async fn is_running() {
        let mut cmd = Command::new("sleep");
        cmd.arg("1");
        let mut process = Process::new(cmd)
            .name("sleep")
            .spawn_broadcast()
            .expect("Failed to spawn `sleep` command");

        match process.is_running() {
            RunningState::Running => {}
            RunningState::Terminated(exit_status) => {
                assert_that!(exit_status).fail("Process should be running");
            }
            RunningState::Uncertain(_) => {
                assert_that!(&process).fail("Process state should not be uncertain");
            }
        }

        let _exit_status = process.wait_for_completion(None).await.unwrap();

        match process.is_running() {
            RunningState::Running => {
                assert_that!(process).fail("Process should not be running anymore");
            }
            RunningState::Terminated(exit_status) => {
                assert_that!(exit_status.code()).is_some().is_equal_to(0);
                assert_that!(exit_status.success()).is_true();
            }
            RunningState::Uncertain(_) => {
                assert_that!(process).fail("Process state should not be uncertain");
            }
        }
    }

    #[tokio::test]
    async fn terminate() {
        let mut cmd = Command::new("sleep");
        cmd.arg("1000");
        let mut process = Process::new(cmd)
            .name("sleep")
            .spawn_broadcast()
            .expect("Failed to spawn `sleep` command");
        process
            .terminate(Duration::from_secs(1), Duration::from_secs(1))
            .await
            .unwrap();
        match process.is_running() {
            RunningState::Running => {
                assert_that!(process).fail("Process should not be running anymore");
            }
            RunningState::Terminated(exit_status) => {
                // Terminating a process with a signal results in no code being emitted (on linux).
                assert_that!(exit_status.code()).is_none();
                assert_that!(exit_status.success()).is_false();
            }
            RunningState::Uncertain(_) => {
                assert_that!(process).fail("Process state should not be uncertain");
            }
        }
    }
}