1#![deny(future_incompatible)]
3#![deny(nonstandard_style)]
4#![deny(missing_docs)]
5#![deny(rustdoc::broken_intra_doc_links)]
6#![doc = include_str!("../README.md")]
7
8pub type ProcessStream = Pin<Box<dyn Stream<Item = ProcessItem> + Send>>;
10
11pub use async_stream::stream;
12use io::Result;
13use std::{
14 ffi::OsStr,
15 io,
16 ops::{Deref, DerefMut},
17 path::{Path, PathBuf},
18 pin::Pin,
19 process::Stdio,
20 sync::Arc,
21};
22use tap::Pipe;
23use {
24 tokio::{
25 io::{AsyncBufReadExt, AsyncRead, BufReader},
26 process::{ChildStdin, Command},
27 sync::Notify,
28 },
29 tokio_stream::wrappers::LinesStream,
30};
31
32mod item;
33pub use async_trait::async_trait;
34pub use futures::Stream;
35pub use futures::StreamExt;
36pub use futures::TryStreamExt;
37pub use item::ProcessItem;
38pub use tokio_stream;
39
40#[async_trait]
41pub trait ProcessExt {
43 fn get_command(&mut self) -> &mut Command;
45
46 fn command(&mut self) -> &mut Command {
48 let stdin = self.get_stdin().take().unwrap();
49 let stdout = self.get_stdout().take().unwrap();
50 let stderr = self.get_stderr().take().unwrap();
51 let command = self.get_command();
52
53 command.stdin(stdin);
54 command.stderr(stdout);
55 command.stdout(stderr);
56 command
57 }
58
59 fn spawn_and_stream(&mut self) -> Result<ProcessStream> {
61 self._spawn_and_stream()
62 }
63
64 fn _spawn_and_stream(&mut self) -> Result<ProcessStream> {
66 let abort = Arc::new(Notify::new());
67
68 let mut child = self.command().spawn()?;
69
70 let stdout = child.stdout.take().unwrap();
71 let stderr = child.stderr.take().unwrap();
72
73 self.set_child_stdin(child.stdin.take());
74 self.set_aborter(Some(abort.clone()));
75
76 let stdout_stream = into_stream(stdout, true);
77 let stderr_stream = into_stream(stderr, false);
78 let mut std_stream = tokio_stream::StreamExt::merge(stdout_stream, stderr_stream);
79
80 let stream = stream! {
81 loop {
82 use ProcessItem::*;
83 tokio::select! {
84 Some(output) = std_stream.next() => yield output,
85 status = child.wait() => match status {
86 Err(err) => yield Error(err.to_string()),
87 Ok(status) => {
88 match status.code() {
89 Some(code) => yield Exit(format!("{code}")),
90 None => yield Error("Unable to get exit code".into()),
91 }
92 break;
93
94 }
95 },
96 _ = abort.notified() => {
97 match child.start_kill() {
98 Ok(()) => yield Exit("0".into()),
99 Err(err) => yield Error(format!("abort Process Error: {err}")),
100 };
101 break;
102 }
103 }
104 }
105 };
106
107 Ok(stream.boxed())
108 }
109 fn aborter(&self) -> Option<Arc<Notify>>;
111 fn set_aborter(&mut self, aborter: Option<Arc<Notify>>);
113 fn take_stdin(&mut self) -> Option<ChildStdin> {
115 None
116 }
117 fn set_child_stdin(&mut self, _child_stdin: Option<ChildStdin>) {}
119 fn get_stdin(&mut self) -> Option<Stdio> {
121 Some(Stdio::null())
122 }
123 fn get_stdout(&mut self) -> Option<Stdio> {
125 Some(Stdio::piped())
126 }
127 fn get_stderr(&mut self) -> Option<Stdio> {
129 Some(Stdio::piped())
130 }
131}
132
133pub struct Process {
135 inner: Command,
136 stdin: Option<ChildStdin>,
137 set_stdin: Option<Stdio>,
138 set_stdout: Option<Stdio>,
139 set_stderr: Option<Stdio>,
140 abort: Option<Arc<Notify>>,
141}
142
143impl ProcessExt for Process {
144 fn get_command(&mut self) -> &mut Command {
145 &mut self.inner
146 }
147
148 fn aborter(&self) -> Option<Arc<Notify>> {
149 self.abort.clone()
150 }
151
152 fn set_aborter(&mut self, aborter: Option<Arc<Notify>>) {
153 self.abort = aborter
154 }
155
156 fn take_stdin(&mut self) -> Option<ChildStdin> {
157 self.stdin.take()
158 }
159
160 fn set_child_stdin(&mut self, child_stdin: Option<ChildStdin>) {
161 self.stdin = child_stdin;
162 }
163
164 fn get_stdin(&mut self) -> Option<Stdio> {
165 self.set_stdin.take()
166 }
167
168 fn get_stdout(&mut self) -> Option<Stdio> {
169 self.set_stdout.take()
170 }
171
172 fn get_stderr(&mut self) -> Option<Stdio> {
173 self.set_stderr.take()
174 }
175}
176
177impl Process {
178 pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
180 Self {
181 inner: Command::new(program),
182 set_stdin: Some(Stdio::null()),
183 set_stdout: Some(Stdio::piped()),
184 set_stderr: Some(Stdio::piped()),
185 stdin: None,
186 abort: None,
187 }
188 }
189
190 pub fn stdin(&mut self, stdin: Stdio) {
192 self.set_stdin = stdin.into();
193 }
194
195 pub fn stdout(&mut self, stdout: Stdio) {
197 self.set_stdout = stdout.into();
198 }
199
200 pub fn stderr(&mut self, stderr: Stdio) {
202 self.set_stderr = stderr.into();
203 }
204
205 pub fn abort(&self) {
207 self.aborter().map(|k| k.notify_waiters());
208 }
209}
210
211impl Deref for Process {
212 type Target = Command;
213
214 fn deref(&self) -> &Self::Target {
215 &self.inner
216 }
217}
218
219impl DerefMut for Process {
220 fn deref_mut(&mut self) -> &mut Self::Target {
221 &mut self.inner
222 }
223}
224
225impl From<Command> for Process {
226 fn from(command: Command) -> Self {
227 Self {
228 inner: command,
229 stdin: None,
230 set_stdin: Some(Stdio::null()),
231 set_stdout: Some(Stdio::piped()),
232 set_stderr: Some(Stdio::piped()),
233 abort: None,
234 }
235 }
236}
237
238impl<S: AsRef<OsStr>> From<Vec<S>> for Process {
239 fn from(mut command_args: Vec<S>) -> Self {
240 let command = command_args.remove(0);
241 let mut inner = Command::new(command);
242 inner.args(command_args);
243
244 Self::from(inner)
245 }
246}
247
248impl From<&Path> for Process {
249 fn from(path: &Path) -> Self {
250 let command = Command::new(path);
251 Self::from(command)
252 }
253}
254
255impl From<&str> for Process {
256 fn from(path: &str) -> Self {
257 let command = Command::new(path);
258 Self::from(command)
259 }
260}
261
262impl From<&PathBuf> for Process {
263 fn from(path: &PathBuf) -> Self {
264 let command = Command::new(path);
265 Self::from(command)
266 }
267}
268
269pub fn into_stream<T, R>(std: R, is_stdout: bool) -> impl Stream<Item = T>
271where
272 T: From<(bool, Result<String>)>,
273 R: AsyncRead,
274{
275 std.pipe(BufReader::new)
276 .lines()
277 .pipe(LinesStream::new)
278 .map(move |v| T::from((is_stdout, v)))
279}
280
281#[cfg(test)]
282mod tests {
283 use tokio::io::AsyncWriteExt;
284
285 use crate::*;
286 use std::io::Result;
287
288 #[tokio::test]
289 async fn test_from_vector() -> Result<()> {
290 let mut stream = Process::from(vec![
291 "xcrun",
292 "simctl",
293 "launch",
294 "--terminate-running-process",
295 "--console",
296 "booted",
297 "tami5.Wordle",
298 ])
299 .spawn_and_stream()
300 .unwrap();
301
302 while let Some(output) = stream.next().await {
303 println!("{output}")
304 }
305 Ok(())
306 }
307
308 #[tokio::test]
309 async fn test_from_path() -> Result<()> {
310 let mut process: Process = "/bin/ls".into();
311
312 let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;
313 println!("{outputs:#?}");
314 Ok(())
315 }
316
317 #[tokio::test]
318 async fn test_new() -> Result<()> {
319 let mut process = Process::new("xcrun");
320
321 process.args(&[
322 "simctl",
323 "launch",
324 "--terminate-running-process",
325 "--console",
326 "booted",
327 "tami5.Wordle",
328 ]);
329
330 let mut stream = process.spawn_and_stream()?;
331
332 while let Some(output) = stream.next().await {
333 println!("{output:#?}")
334 }
335 Ok(())
336 }
337
338 #[tokio::test]
339 async fn test_abort() -> Result<()> {
340 let mut process = Process::new("xcrun");
341
342 process.args(&[
343 "/Users/tami5/Library/Caches/Xbase/swift_Control/Debug/Control.app/Contents/MacOS/Control",
344 ]);
345
346 let mut stream = process.spawn_and_stream()?;
347 tokio::spawn(async move {
348 while let Some(output) = stream.next().await {
349 println!("{output}")
350 }
351 });
352
353 tokio::time::sleep(std::time::Duration::new(5, 0)).await;
354
355 process.abort();
356
357 tokio::time::sleep(std::time::Duration::new(5, 0)).await;
358
359 Ok(())
360 }
361
362 #[tokio::test]
363 async fn test_dref_item_as_str() {
364 use ProcessItem::*;
365 let items = vec![
366 Output("Hello".into()),
367 Error("XXXXXXXXXX".into()),
368 Exit("0".into()),
369 ];
370 for item in items {
371 println!("{:?}", item.as_bytes())
372 }
373 }
374
375 #[tokio::test]
376 async fn communicate_with_running_process() -> Result<()> {
377 let mut process: Process = Process::new("sort");
378
379 process.stdin(Stdio::piped());
381
382 let mut stream = process.spawn_and_stream().unwrap();
384
385 let mut writer = process.take_stdin().unwrap();
387
388 let reader_thread = tokio::spawn(async move {
390 while let Some(output) = stream.next().await {
391 if output.is_exit() {
392 println!("DONE")
393 } else {
394 println!("{output}")
395 }
396 }
397 });
398
399 let writer_thread = tokio::spawn(async move {
400 writer.write(b"b\nc\na\n").await.unwrap();
401 writer.write(b"f\ne\nd\n").await.unwrap();
402 });
403
404 writer_thread.await?;
405 reader_thread.await?;
406
407 Ok(())
408 }
409}