tokio_process_tools/
lib.rs1#![warn(missing_docs)]
2
3#![doc = include_str!("../README.md")]
5mod async_drop;
8mod collector;
9mod error;
10mod inspector;
11mod output;
12mod output_stream;
13mod panic_on_drop;
14mod process;
15mod process_handle;
16mod signal;
17mod terminate_on_drop;
18
19pub use collector::{AsyncChunkCollector, AsyncLineCollector, Collector, CollectorError, Sink};
20pub use error::{SpawnError, TerminationError, WaitError, WaitForLineResult};
21pub use inspector::{Inspector, InspectorError};
22pub use output::{Output, RawOutput};
23pub use output_stream::{
24 BackpressureControl, Chunk, DEFAULT_CHANNEL_CAPACITY, DEFAULT_CHUNK_SIZE, FromStreamOptions,
25 LineOverflowBehavior, LineParsingOptions, LineWriteMode, Next, NumBytes, NumBytesExt,
26 OutputStream, broadcast, single_subscriber,
27};
28pub use process::{AutoName, AutoNameSettings, Process, ProcessName};
29pub use process_handle::{ProcessHandle, RunningState, Stdin};
30pub use terminate_on_drop::TerminateOnDrop;
31
32#[allow(dead_code)]
33trait SendSync: Send + Sync {}
34impl SendSync for broadcast::BroadcastOutputStream {}
35impl SendSync for single_subscriber::SingleSubscriberOutputStream {}
36impl<O: OutputStream + SendSync> SendSync for ProcessHandle<O> {}
37
38#[cfg(test)]
39mod test {
40 use crate::output::Output;
41 use crate::{LineParsingOptions, Next, Process, RunningState};
42 use assertr::prelude::*;
43 use std::time::Duration;
44 use tokio::process::Command;
45
46 #[tokio::test]
47 async fn wait_with_output() {
48 let mut process = Process::new(Command::new("ls"))
49 .name("ls")
50 .spawn_broadcast()
51 .expect("Failed to spawn `ls` command");
52 let Output {
53 status,
54 stdout,
55 stderr,
56 } = process
57 .wait_for_completion_with_output(None, LineParsingOptions::default())
58 .await
59 .unwrap();
60 assert_that!(status.success()).is_true();
61 for expected in [
62 "Cargo.lock",
63 "Cargo.toml",
64 "LICENSE-APACHE",
65 "LICENSE-MIT",
66 "README.md",
67 "src",
68 "target",
69 ] {
70 assert!(
71 stdout.iter().any(|entry| entry == expected),
72 "expected ls output to contain {expected:?}, got {stdout:?}"
73 );
74 }
75 assert_that!(stderr).is_empty();
76 }
77
78 #[tokio::test]
79 async fn single_subscriber_panics_on_multiple_consumers() {
80 let mut process = Process::new(Command::new("ls"))
81 .name("ls")
82 .spawn_single_subscriber()
83 .expect("Failed to spawn `ls` command");
84
85 let _inspector = process
86 .stdout()
87 .inspect_lines(|_line| Next::Continue, LineParsingOptions::default());
88
89 assert_that_panic_by(|| {
90 let _inspector = process
91 .stdout()
92 .inspect_lines(|_line| Next::Continue, LineParsingOptions::default());
93 })
94 .has_type::<String>()
95 .is_equal_to("Cannot create multiple consumers on SingleSubscriberOutputStream (stream: 'stdout'). Only one inspector or collector can be active at a time. Use .spawn_broadcast() instead of .spawn_single_subscriber() to support multiple consumers.");
96
97 process.wait_for_completion(None).await.unwrap();
98 }
99
100 #[tokio::test]
101 async fn is_running() {
102 let mut cmd = Command::new("sleep");
103 cmd.arg("1");
104 let mut process = Process::new(cmd)
105 .name("sleep")
106 .spawn_broadcast()
107 .expect("Failed to spawn `sleep` command");
108
109 match process.is_running() {
110 RunningState::Running => {}
111 RunningState::Terminated(exit_status) => {
112 assert_that!(exit_status).fail("Process should be running");
113 }
114 RunningState::Uncertain(_) => {
115 assert_that!(&process).fail("Process state should not be uncertain");
116 }
117 }
118
119 let _exit_status = process.wait_for_completion(None).await.unwrap();
120
121 match process.is_running() {
122 RunningState::Running => {
123 assert_that!(process).fail("Process should not be running anymore");
124 }
125 RunningState::Terminated(exit_status) => {
126 assert_that!(exit_status.code()).is_some().is_equal_to(0);
127 assert_that!(exit_status.success()).is_true();
128 }
129 RunningState::Uncertain(_) => {
130 assert_that!(process).fail("Process state should not be uncertain");
131 }
132 }
133 }
134
135 #[tokio::test]
136 async fn terminate() {
137 let mut cmd = Command::new("sleep");
138 cmd.arg("1000");
139 let mut process = Process::new(cmd)
140 .name("sleep")
141 .spawn_broadcast()
142 .expect("Failed to spawn `sleep` command");
143 process
144 .terminate(Duration::from_secs(1), Duration::from_secs(1))
145 .await
146 .unwrap();
147 match process.is_running() {
148 RunningState::Running => {
149 assert_that!(process).fail("Process should not be running anymore");
150 }
151 RunningState::Terminated(exit_status) => {
152 assert_that!(exit_status.code()).is_none();
154 assert_that!(exit_status.success()).is_false();
155 }
156 RunningState::Uncertain(_) => {
157 assert_that!(process).fail("Process state should not be uncertain");
158 }
159 }
160 }
161}