1#![deny(future_incompatible)]
3#![deny(nonstandard_style)]
4#![deny(missing_docs)]
5#![deny(rustdoc::broken_intra_doc_links)]
6#![doc = include_str!("../README.md")]
7
8pub type ProcessStream = Pin<Box<dyn Stream<Item = ProcessItem> + Send>>;
10
11pub use async_stream::stream;
12use io::Result;
13use std::{
14 ffi::OsStr,
15 io,
16 ops::{Deref, DerefMut},
17 path::{Path, PathBuf},
18 pin::Pin,
19 process::Stdio,
20 sync::Arc,
21};
22use tap::Pipe;
23use {
24 tokio::{
25 io::{AsyncBufReadExt, AsyncRead, BufReader},
26 process::{ChildStdin, Command},
27 sync::Notify,
28 },
29 tokio_stream::wrappers::LinesStream,
30};
31
32mod item;
33pub use futures::Stream;
34pub use futures::StreamExt;
35pub use futures::TryStreamExt;
36pub use item::ProcessItem;
37pub use tokio_stream;
38
39pub trait ProcessExt {
41 fn get_command(&mut self) -> &mut Command;
43
44 fn command(&mut self) -> &mut Command {
46 let stdin = self.get_stdin().take().unwrap();
47 let stdout = self.get_stdout().take().unwrap();
48 let stderr = self.get_stderr().take().unwrap();
49 let command = self.get_command();
50
51 #[cfg(windows)]
52 command.creation_flags(0x08000000);
53
54 command.stdin(stdin);
55 command.stdout(stdout);
56 command.stderr(stderr);
57 command
58 }
59
60 fn spawn_and_stream(&mut self) -> Result<ProcessStream> {
62 self._spawn_and_stream()
63 }
64
65 fn _spawn_and_stream(&mut self) -> Result<ProcessStream> {
67 let abort = Arc::new(Notify::new());
68
69 let mut child = self.command().spawn()?;
70
71 let stdout = child.stdout.take().unwrap();
72 let stderr = child.stderr.take().unwrap();
73
74 self.set_child_stdin(child.stdin.take());
75 self.set_aborter(Some(abort.clone()));
76
77 let stdout_stream = into_stream(stdout, true);
78 let stderr_stream = into_stream(stderr, false);
79 let mut std_stream = tokio_stream::StreamExt::merge(stdout_stream, stderr_stream);
80 let stream = stream! {
81 loop {
82 use ProcessItem::*;
83 tokio::select! {
84 Some(output) = std_stream.next() => yield output,
85 status = child.wait() => {
86 while let Some(output) = std_stream.next().await {
88 yield output
89 }
90 match status {
91 Err(err) => yield Error(err.to_string()),
92 Ok(status) => {
93 match status.code() {
94 Some(code) => yield Exit(format!("{code}")),
95 None => yield Error("Unable to get exit code".into()),
96 }
97 }
98 }
99 break;
100 },
101 _ = abort.notified() => {
102 match child.start_kill() {
103 Ok(()) => yield Exit("0".into()),
104 Err(err) => yield Error(format!("abort Process Error: {err}")),
105 };
106 break;
107 }
108 }
109 }
110 };
111
112 Ok(stream.boxed())
113 }
114 fn aborter(&self) -> Option<Arc<Notify>>;
116 fn set_aborter(&mut self, aborter: Option<Arc<Notify>>);
118 fn take_stdin(&mut self) -> Option<ChildStdin> {
120 None
121 }
122 fn set_child_stdin(&mut self, _child_stdin: Option<ChildStdin>) {}
124 fn get_stdin(&mut self) -> Option<Stdio> {
126 Some(Stdio::null())
127 }
128 fn get_stdout(&mut self) -> Option<Stdio> {
130 Some(Stdio::piped())
131 }
132 fn get_stderr(&mut self) -> Option<Stdio> {
134 Some(Stdio::piped())
135 }
136}
137
138pub struct Process {
140 inner: Command,
141 stdin: Option<ChildStdin>,
142 set_stdin: Option<Stdio>,
143 set_stdout: Option<Stdio>,
144 set_stderr: Option<Stdio>,
145 abort: Option<Arc<Notify>>,
146}
147
148impl ProcessExt for Process {
149 fn get_command(&mut self) -> &mut Command {
150 &mut self.inner
151 }
152
153 fn aborter(&self) -> Option<Arc<Notify>> {
154 self.abort.clone()
155 }
156
157 fn set_aborter(&mut self, aborter: Option<Arc<Notify>>) {
158 self.abort = aborter
159 }
160
161 fn take_stdin(&mut self) -> Option<ChildStdin> {
162 self.stdin.take()
163 }
164
165 fn set_child_stdin(&mut self, child_stdin: Option<ChildStdin>) {
166 self.stdin = child_stdin;
167 }
168
169 fn get_stdin(&mut self) -> Option<Stdio> {
170 self.set_stdin.take()
171 }
172
173 fn get_stdout(&mut self) -> Option<Stdio> {
174 self.set_stdout.take()
175 }
176
177 fn get_stderr(&mut self) -> Option<Stdio> {
178 self.set_stderr.take()
179 }
180}
181
182impl Process {
183 pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
185 Self {
186 inner: Command::new(program),
187 set_stdin: Some(Stdio::null()),
188 set_stdout: Some(Stdio::piped()),
189 set_stderr: Some(Stdio::piped()),
190 stdin: None,
191 abort: None,
192 }
193 }
194
195 pub fn stdin(&mut self, stdin: Stdio) {
197 self.set_stdin = stdin.into();
198 }
199
200 pub fn stdout(&mut self, stdout: Stdio) {
202 self.set_stdout = stdout.into();
203 }
204
205 pub fn stderr(&mut self, stderr: Stdio) {
207 self.set_stderr = stderr.into();
208 }
209
210 pub fn abort(&self) {
212 self.aborter().map(|k| k.notify_waiters());
213 }
214}
215
216impl Deref for Process {
217 type Target = Command;
218
219 fn deref(&self) -> &Self::Target {
220 &self.inner
221 }
222}
223
224impl DerefMut for Process {
225 fn deref_mut(&mut self) -> &mut Self::Target {
226 &mut self.inner
227 }
228}
229
230impl From<Command> for Process {
231 fn from(command: Command) -> Self {
232 Self {
233 inner: command,
234 stdin: None,
235 set_stdin: Some(Stdio::null()),
236 set_stdout: Some(Stdio::piped()),
237 set_stderr: Some(Stdio::piped()),
238 abort: None,
239 }
240 }
241}
242
243impl<S: AsRef<OsStr>> From<Vec<S>> for Process {
244 fn from(mut command_args: Vec<S>) -> Self {
245 let command = command_args.remove(0);
246 let mut inner = Command::new(command);
247 inner.args(command_args);
248
249 Self::from(inner)
250 }
251}
252
253impl From<&Path> for Process {
254 fn from(path: &Path) -> Self {
255 let command = Command::new(path);
256 Self::from(command)
257 }
258}
259
260impl From<&str> for Process {
261 fn from(path: &str) -> Self {
262 let command = Command::new(path);
263 Self::from(command)
264 }
265}
266
267impl From<&PathBuf> for Process {
268 fn from(path: &PathBuf) -> Self {
269 let command = Command::new(path);
270 Self::from(command)
271 }
272}
273
274pub fn into_stream<T, R>(std: R, is_stdout: bool) -> impl Stream<Item = T>
276where
277 T: From<(bool, Result<String>)>,
278 R: AsyncRead,
279{
280 std.pipe(BufReader::new)
281 .lines()
282 .pipe(LinesStream::new)
283 .map(move |v| T::from((is_stdout, v)))
284}
285
286#[cfg(test)]
287mod tests {
288 use tokio::io::AsyncWriteExt;
289
290 use crate::*;
291 use std::io::Result;
292
293 #[tokio::test]
294 async fn test_from_path() -> Result<()> {
295 let mut process: Process = "/bin/ls".into();
296
297 let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;
298 println!("{outputs:#?}");
299 Ok(())
300 }
301
302 #[tokio::test]
303 async fn test_dref_item_as_str() {
304 use ProcessItem::*;
305 let items = vec![
306 Output("Hello".into()),
307 Error("XXXXXXXXXX".into()),
308 Exit("0".into()),
309 ];
310 for item in items {
311 println!("{:?}", item.as_bytes())
312 }
313 }
314
315 #[tokio::test]
316 async fn communicate_with_running_process() -> Result<()> {
317 let mut process: Process = Process::new("sort");
318
319 process.stdin(Stdio::piped());
321
322 let mut stream = process.spawn_and_stream().unwrap();
324
325 let mut writer = process.take_stdin().unwrap();
327
328 let reader_thread = tokio::spawn(async move {
330 while let Some(output) = stream.next().await {
331 if output.is_exit() {
332 println!("DONE")
333 } else {
334 println!("{output}")
335 }
336 }
337 });
338
339 let writer_thread = tokio::spawn(async move {
340 writer.write(b"b\nc\na\n").await.unwrap();
341 writer.write(b"f\ne\nd\n").await.unwrap();
342 });
343
344 writer_thread.await?;
345 reader_thread.await?;
346
347 Ok(())
348 }
349}