1#![deny(future_incompatible)]
3#![deny(nonstandard_style)]
4#![deny(missing_docs)]
5#![deny(rustdoc::broken_intra_doc_links)]
6#![doc = include_str!("../README.md")]
7
8pub type ProcessStream = Pin<Box<dyn Stream<Item = ProcessItem> + Send>>;
10
11pub use async_stream::stream;
12use io::Result;
13use std::{
14 ffi::OsStr,
15 io,
16 ops::{Deref, DerefMut},
17 path::{Path, PathBuf},
18 pin::Pin,
19 process::Stdio,
20 sync::Arc,
21};
22use tap::Pipe;
23use {
24 tokio::{
25 io::{AsyncBufReadExt, AsyncRead, BufReader},
26 process::{ChildStdin, Command},
27 sync::Notify,
28 },
29 tokio_stream::wrappers::LinesStream,
30};
31
32mod item;
33pub use async_trait::async_trait;
34pub use futures::Stream;
35pub use futures::StreamExt;
36pub use futures::TryStreamExt;
37pub use item::ProcessItem;
38pub use tokio_stream;
39
40#[async_trait]
41pub trait ProcessExt {
43 fn get_command(&mut self) -> &mut Command;
45
46 fn command(&mut self) -> &mut Command {
48 let stdin = self.get_stdin().take().unwrap();
49 let stdout = self.get_stdout().take().unwrap();
50 let stderr = self.get_stderr().take().unwrap();
51 let command = self.get_command();
52
53 #[cfg(windows)]
54 command.creation_flags(0x08000000);
55
56 command.stdin(stdin);
57 command.stdout(stdout);
58 command.stderr(stderr);
59 command
60 }
61
62 fn spawn_and_stream(&mut self) -> Result<ProcessStream> {
64 self._spawn_and_stream()
65 }
66
67 fn _spawn_and_stream(&mut self) -> Result<ProcessStream> {
69 let abort = Arc::new(Notify::new());
70
71 let mut child = self.command().spawn()?;
72
73 let stdout = child.stdout.take().unwrap();
74 let stderr = child.stderr.take().unwrap();
75
76 self.set_child_stdin(child.stdin.take());
77 self.set_aborter(Some(abort.clone()));
78
79 let stdout_stream = into_stream(stdout, true);
80 let stderr_stream = into_stream(stderr, false);
81 let mut std_stream = tokio_stream::StreamExt::merge(stdout_stream, stderr_stream);
82 let stream = stream! {
83 loop {
84 use ProcessItem::*;
85 tokio::select! {
86 Some(output) = std_stream.next() => yield output,
87 status = child.wait() => {
88 while let Some(output) = std_stream.next().await {
90 yield output
91 }
92 match status {
93 Err(err) => yield Error(err.to_string()),
94 Ok(status) => {
95 match status.code() {
96 Some(code) => yield Exit(format!("{code}")),
97 None => yield Error("Unable to get exit code".into()),
98 }
99 }
100 }
101 break;
102 },
103 _ = abort.notified() => {
104 match child.start_kill() {
105 Ok(()) => yield Exit("0".into()),
106 Err(err) => yield Error(format!("abort Process Error: {err}")),
107 };
108 break;
109 }
110 }
111 }
112 };
113
114 Ok(stream.boxed())
115 }
116 fn aborter(&self) -> Option<Arc<Notify>>;
118 fn set_aborter(&mut self, aborter: Option<Arc<Notify>>);
120 fn take_stdin(&mut self) -> Option<ChildStdin> {
122 None
123 }
124 fn set_child_stdin(&mut self, _child_stdin: Option<ChildStdin>) {}
126 fn get_stdin(&mut self) -> Option<Stdio> {
128 Some(Stdio::null())
129 }
130 fn get_stdout(&mut self) -> Option<Stdio> {
132 Some(Stdio::piped())
133 }
134 fn get_stderr(&mut self) -> Option<Stdio> {
136 Some(Stdio::piped())
137 }
138}
139
140pub struct Process {
142 inner: Command,
143 stdin: Option<ChildStdin>,
144 set_stdin: Option<Stdio>,
145 set_stdout: Option<Stdio>,
146 set_stderr: Option<Stdio>,
147 abort: Option<Arc<Notify>>,
148}
149
150impl ProcessExt for Process {
151 fn get_command(&mut self) -> &mut Command {
152 &mut self.inner
153 }
154
155 fn aborter(&self) -> Option<Arc<Notify>> {
156 self.abort.clone()
157 }
158
159 fn set_aborter(&mut self, aborter: Option<Arc<Notify>>) {
160 self.abort = aborter
161 }
162
163 fn take_stdin(&mut self) -> Option<ChildStdin> {
164 self.stdin.take()
165 }
166
167 fn set_child_stdin(&mut self, child_stdin: Option<ChildStdin>) {
168 self.stdin = child_stdin;
169 }
170
171 fn get_stdin(&mut self) -> Option<Stdio> {
172 self.set_stdin.take()
173 }
174
175 fn get_stdout(&mut self) -> Option<Stdio> {
176 self.set_stdout.take()
177 }
178
179 fn get_stderr(&mut self) -> Option<Stdio> {
180 self.set_stderr.take()
181 }
182}
183
184impl Process {
185 pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
187 Self {
188 inner: Command::new(program),
189 set_stdin: Some(Stdio::null()),
190 set_stdout: Some(Stdio::piped()),
191 set_stderr: Some(Stdio::piped()),
192 stdin: None,
193 abort: None,
194 }
195 }
196
197 pub fn stdin(&mut self, stdin: Stdio) {
199 self.set_stdin = stdin.into();
200 }
201
202 pub fn stdout(&mut self, stdout: Stdio) {
204 self.set_stdout = stdout.into();
205 }
206
207 pub fn stderr(&mut self, stderr: Stdio) {
209 self.set_stderr = stderr.into();
210 }
211
212 pub fn abort(&self) {
214 self.aborter().map(|k| k.notify_waiters());
215 }
216}
217
218impl Deref for Process {
219 type Target = Command;
220
221 fn deref(&self) -> &Self::Target {
222 &self.inner
223 }
224}
225
226impl DerefMut for Process {
227 fn deref_mut(&mut self) -> &mut Self::Target {
228 &mut self.inner
229 }
230}
231
232impl From<Command> for Process {
233 fn from(command: Command) -> Self {
234 Self {
235 inner: command,
236 stdin: None,
237 set_stdin: Some(Stdio::null()),
238 set_stdout: Some(Stdio::piped()),
239 set_stderr: Some(Stdio::piped()),
240 abort: None,
241 }
242 }
243}
244
245impl<S: AsRef<OsStr>> From<Vec<S>> for Process {
246 fn from(mut command_args: Vec<S>) -> Self {
247 let command = command_args.remove(0);
248 let mut inner = Command::new(command);
249 inner.args(command_args);
250
251 Self::from(inner)
252 }
253}
254
255impl From<&Path> for Process {
256 fn from(path: &Path) -> Self {
257 let command = Command::new(path);
258 Self::from(command)
259 }
260}
261
262impl From<&str> for Process {
263 fn from(path: &str) -> Self {
264 let command = Command::new(path);
265 Self::from(command)
266 }
267}
268
269impl From<&PathBuf> for Process {
270 fn from(path: &PathBuf) -> Self {
271 let command = Command::new(path);
272 Self::from(command)
273 }
274}
275
276pub fn into_stream<T, R>(std: R, is_stdout: bool) -> impl Stream<Item = T>
278where
279 T: From<(bool, Result<String>)>,
280 R: AsyncRead,
281{
282 std.pipe(BufReader::new)
283 .lines()
284 .pipe(LinesStream::new)
285 .map(move |v| T::from((is_stdout, v)))
286}
287
288#[cfg(test)]
289mod tests {
290 use tokio::io::AsyncWriteExt;
291
292 use crate::*;
293 use std::io::Result;
294
295 #[tokio::test]
296 async fn test_from_path() -> Result<()> {
297 let mut process: Process = "/bin/ls".into();
298
299 let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;
300 println!("{outputs:#?}");
301 Ok(())
302 }
303
304 #[tokio::test]
305 async fn test_dref_item_as_str() {
306 use ProcessItem::*;
307 let items = vec![
308 Output("Hello".into()),
309 Error("XXXXXXXXXX".into()),
310 Exit("0".into()),
311 ];
312 for item in items {
313 println!("{:?}", item.as_bytes())
314 }
315 }
316
317 #[tokio::test]
318 async fn communicate_with_running_process() -> Result<()> {
319 let mut process: Process = Process::new("sort");
320
321 process.stdin(Stdio::piped());
323
324 let mut stream = process.spawn_and_stream().unwrap();
326
327 let mut writer = process.take_stdin().unwrap();
329
330 let reader_thread = tokio::spawn(async move {
332 while let Some(output) = stream.next().await {
333 if output.is_exit() {
334 println!("DONE")
335 } else {
336 println!("{output}")
337 }
338 }
339 });
340
341 let writer_thread = tokio::spawn(async move {
342 writer.write(b"b\nc\na\n").await.unwrap();
343 writer.write(b"f\ne\nd\n").await.unwrap();
344 });
345
346 writer_thread.await?;
347 reader_thread.await?;
348
349 Ok(())
350 }
351}