Skip to main content

tokio_process_tools/
lib.rs

1#![warn(missing_docs)]
2
3//!
4#![doc = include_str!("../README.md")]
5//!
6
7mod async_drop;
8mod collector;
9mod error;
10mod inspector;
11mod output;
12mod output_stream;
13mod panic_on_drop;
14mod process;
15mod process_handle;
16mod signal;
17mod terminate_on_drop;
18
19pub use collector::{AsyncChunkCollector, AsyncLineCollector, Collector, CollectorError, Sink};
20pub use error::{SpawnError, TerminationError, WaitError, WaitForLineResult};
21pub use inspector::{Inspector, InspectorError};
22pub use output::{Output, RawOutput};
23pub use output_stream::{
24    BackpressureControl, Chunk, DEFAULT_CHANNEL_CAPACITY, DEFAULT_CHUNK_SIZE, FromStreamOptions,
25    LineOverflowBehavior, LineParsingOptions, LineWriteMode, Next, NumBytes, NumBytesExt,
26    OutputStream, broadcast, single_subscriber,
27};
28pub use process::{AutoName, AutoNameSettings, Process, ProcessName};
29pub use process_handle::{ProcessHandle, RunningState, Stdin};
30pub use terminate_on_drop::TerminateOnDrop;
31
32#[allow(dead_code)]
33trait SendSync: Send + Sync {}
34impl SendSync for broadcast::BroadcastOutputStream {}
35impl SendSync for single_subscriber::SingleSubscriberOutputStream {}
36impl<O: OutputStream + SendSync> SendSync for ProcessHandle<O> {}
37
38#[cfg(test)]
39mod test {
40    use crate::output::Output;
41    use crate::{LineParsingOptions, Next, Process, RunningState};
42    use assertr::prelude::*;
43    use std::time::Duration;
44    use tokio::process::Command;
45
46    #[tokio::test]
47    async fn wait_with_output() {
48        let mut process = Process::new(Command::new("ls"))
49            .name("ls")
50            .spawn_broadcast()
51            .expect("Failed to spawn `ls` command");
52        let Output {
53            status,
54            stdout,
55            stderr,
56        } = process
57            .wait_for_completion_with_output(None, LineParsingOptions::default())
58            .await
59            .unwrap();
60        assert_that!(status.success()).is_true();
61        for expected in [
62            "Cargo.lock",
63            "Cargo.toml",
64            "LICENSE-APACHE",
65            "LICENSE-MIT",
66            "README.md",
67            "src",
68            "target",
69        ] {
70            assert!(
71                stdout.iter().any(|entry| entry == expected),
72                "expected ls output to contain {expected:?}, got {stdout:?}"
73            );
74        }
75        assert_that!(stderr).is_empty();
76    }
77
78    #[tokio::test]
79    async fn single_subscriber_panics_on_multiple_consumers() {
80        let mut process = Process::new(Command::new("ls"))
81            .name("ls")
82            .spawn_single_subscriber()
83            .expect("Failed to spawn `ls` command");
84
85        let _inspector = process
86            .stdout()
87            .inspect_lines(|_line| Next::Continue, LineParsingOptions::default());
88
89        assert_that_panic_by(|| {
90            let _inspector = process
91                .stdout()
92                .inspect_lines(|_line| Next::Continue, LineParsingOptions::default());
93        })
94        .has_type::<String>()
95        .is_equal_to("Cannot create multiple consumers on SingleSubscriberOutputStream (stream: 'stdout'). Only one inspector or collector can be active at a time. Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.");
96
97        process.wait_for_completion(None).await.unwrap();
98    }
99
100    #[tokio::test]
101    async fn is_running() {
102        let mut cmd = Command::new("sleep");
103        cmd.arg("1");
104        let mut process = Process::new(cmd)
105            .name("sleep")
106            .spawn_broadcast()
107            .expect("Failed to spawn `sleep` command");
108
109        match process.is_running() {
110            RunningState::Running => {}
111            RunningState::Terminated(exit_status) => {
112                assert_that!(exit_status).fail("Process should be running");
113            }
114            RunningState::Uncertain(_) => {
115                assert_that!(&process).fail("Process state should not be uncertain");
116            }
117        }
118
119        let _exit_status = process.wait_for_completion(None).await.unwrap();
120
121        match process.is_running() {
122            RunningState::Running => {
123                assert_that!(process).fail("Process should not be running anymore");
124            }
125            RunningState::Terminated(exit_status) => {
126                assert_that!(exit_status.code()).is_some().is_equal_to(0);
127                assert_that!(exit_status.success()).is_true();
128            }
129            RunningState::Uncertain(_) => {
130                assert_that!(process).fail("Process state should not be uncertain");
131            }
132        }
133    }
134
135    #[tokio::test]
136    async fn terminate() {
137        let mut cmd = Command::new("sleep");
138        cmd.arg("1000");
139        let mut process = Process::new(cmd)
140            .name("sleep")
141            .spawn_broadcast()
142            .expect("Failed to spawn `sleep` command");
143        process
144            .terminate(Duration::from_secs(1), Duration::from_secs(1))
145            .await
146            .unwrap();
147        match process.is_running() {
148            RunningState::Running => {
149                assert_that!(process).fail("Process should not be running anymore");
150            }
151            RunningState::Terminated(exit_status) => {
152                // Terminating a process with a signal results in no code being emitted (on linux).
153                assert_that!(exit_status.code()).is_none();
154                assert_that!(exit_status.success()).is_false();
155            }
156            RunningState::Uncertain(_) => {
157                assert_that!(process).fail("Process state should not be uncertain");
158            }
159        }
160    }
161}