process_stream/
lib.rs

1//! process-stream is a thin wrapper around [`tokio::process`] to make it streamable
2#![deny(future_incompatible)]
3#![deny(nonstandard_style)]
4#![deny(missing_docs)]
5#![deny(rustdoc::broken_intra_doc_links)]
6#![doc = include_str!("../README.md")]
7
8/// Alias for a stream of process items
9pub type ProcessStream = Pin<Box<dyn Stream<Item = ProcessItem> + Send>>;
10
11pub use async_stream::stream;
12use io::Result;
13use std::{
14    ffi::OsStr,
15    io,
16    ops::{Deref, DerefMut},
17    path::{Path, PathBuf},
18    pin::Pin,
19    process::Stdio,
20    sync::Arc,
21};
22use tap::Pipe;
23use {
24    tokio::{
25        io::{AsyncBufReadExt, AsyncRead, BufReader},
26        process::{ChildStdin, Command},
27        sync::Notify,
28    },
29    tokio_stream::wrappers::LinesStream,
30};
31
32mod item;
33pub use async_trait::async_trait;
34pub use futures::Stream;
35pub use futures::StreamExt;
36pub use futures::TryStreamExt;
37pub use item::ProcessItem;
38pub use tokio_stream;
39
40#[async_trait]
41/// ProcessExt trait that needs to be implemented to make something streamable
42pub trait ProcessExt {
43    /// Get command that will be used to create a child process from
44    fn get_command(&mut self) -> &mut Command;
45
46    /// Get command after settings the required pipes;
47    fn command(&mut self) -> &mut Command {
48        let stdin = self.get_stdin().take().unwrap();
49        let stdout = self.get_stdout().take().unwrap();
50        let stderr = self.get_stderr().take().unwrap();
51        let command = self.get_command();
52
53        command.stdin(stdin);
54        command.stderr(stdout);
55        command.stdout(stderr);
56        command
57    }
58
59    /// Spawn and stream process
60    fn spawn_and_stream(&mut self) -> Result<ProcessStream> {
61        self._spawn_and_stream()
62    }
63
64    /// Spawn and stream process (avoid custom implementation, use spawn_and_stream instead)
65    fn _spawn_and_stream(&mut self) -> Result<ProcessStream> {
66        let abort = Arc::new(Notify::new());
67
68        let mut child = self.command().spawn()?;
69
70        let stdout = child.stdout.take().unwrap();
71        let stderr = child.stderr.take().unwrap();
72
73        self.set_child_stdin(child.stdin.take());
74        self.set_aborter(Some(abort.clone()));
75
76        let stdout_stream = into_stream(stdout, true);
77        let stderr_stream = into_stream(stderr, false);
78        let mut std_stream = tokio_stream::StreamExt::merge(stdout_stream, stderr_stream);
79
80        let stream = stream! {
81            loop {
82                use ProcessItem::*;
83                tokio::select! {
84                    Some(output) = std_stream.next() => yield output,
85                    status = child.wait() => match status {
86                        Err(err) => yield Error(err.to_string()),
87                        Ok(status) => {
88                            match status.code() {
89                                Some(code) => yield Exit(format!("{code}")),
90                                None => yield Error("Unable to get exit code".into()),
91                            }
92                            break;
93
94                        }
95                    },
96                    _ = abort.notified() => {
97                        match child.start_kill() {
98                            Ok(()) => yield Exit("0".into()),
99                            Err(err) => yield Error(format!("abort Process Error: {err}")),
100                        };
101                        break;
102                    }
103                }
104            }
105        };
106
107        Ok(stream.boxed())
108    }
109    /// Get a notifier that can be used to abort the process
110    fn aborter(&self) -> Option<Arc<Notify>>;
111    /// Set the notifier that should be used to abort the process
112    fn set_aborter(&mut self, aborter: Option<Arc<Notify>>);
113    /// Get process stdin
114    fn take_stdin(&mut self) -> Option<ChildStdin> {
115        None
116    }
117    /// Set process stdin
118    fn set_child_stdin(&mut self, _child_stdin: Option<ChildStdin>) {}
119    /// Get process stdin pipe
120    fn get_stdin(&mut self) -> Option<Stdio> {
121        Some(Stdio::null())
122    }
123    /// get process stdout pipe
124    fn get_stdout(&mut self) -> Option<Stdio> {
125        Some(Stdio::piped())
126    }
127    /// get process stderr pipe
128    fn get_stderr(&mut self) -> Option<Stdio> {
129        Some(Stdio::piped())
130    }
131}
132
133/// Thin Wrapper around [`Command`] to make it streamable
134pub struct Process {
135    inner: Command,
136    stdin: Option<ChildStdin>,
137    set_stdin: Option<Stdio>,
138    set_stdout: Option<Stdio>,
139    set_stderr: Option<Stdio>,
140    abort: Option<Arc<Notify>>,
141}
142
143impl ProcessExt for Process {
144    fn get_command(&mut self) -> &mut Command {
145        &mut self.inner
146    }
147
148    fn aborter(&self) -> Option<Arc<Notify>> {
149        self.abort.clone()
150    }
151
152    fn set_aborter(&mut self, aborter: Option<Arc<Notify>>) {
153        self.abort = aborter
154    }
155
156    fn take_stdin(&mut self) -> Option<ChildStdin> {
157        self.stdin.take()
158    }
159
160    fn set_child_stdin(&mut self, child_stdin: Option<ChildStdin>) {
161        self.stdin = child_stdin;
162    }
163
164    fn get_stdin(&mut self) -> Option<Stdio> {
165        self.set_stdin.take()
166    }
167
168    fn get_stdout(&mut self) -> Option<Stdio> {
169        self.set_stdout.take()
170    }
171
172    fn get_stderr(&mut self) -> Option<Stdio> {
173        self.set_stderr.take()
174    }
175}
176
177impl Process {
178    /// Create new process with a program
179    pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
180        Self {
181            inner: Command::new(program),
182            set_stdin: Some(Stdio::null()),
183            set_stdout: Some(Stdio::piped()),
184            set_stderr: Some(Stdio::piped()),
185            stdin: None,
186            abort: None,
187        }
188    }
189
190    /// Set the process's stdin.
191    pub fn stdin(&mut self, stdin: Stdio) {
192        self.set_stdin = stdin.into();
193    }
194
195    /// Set the process's stdout.
196    pub fn stdout(&mut self, stdout: Stdio) {
197        self.set_stdout = stdout.into();
198    }
199
200    /// Set the process's stderr.
201    pub fn stderr(&mut self, stderr: Stdio) {
202        self.set_stderr = stderr.into();
203    }
204
205    /// Abort the process
206    pub fn abort(&self) {
207        self.aborter().map(|k| k.notify_waiters());
208    }
209}
210
211impl Deref for Process {
212    type Target = Command;
213
214    fn deref(&self) -> &Self::Target {
215        &self.inner
216    }
217}
218
219impl DerefMut for Process {
220    fn deref_mut(&mut self) -> &mut Self::Target {
221        &mut self.inner
222    }
223}
224
225impl From<Command> for Process {
226    fn from(command: Command) -> Self {
227        Self {
228            inner: command,
229            stdin: None,
230            set_stdin: Some(Stdio::null()),
231            set_stdout: Some(Stdio::piped()),
232            set_stderr: Some(Stdio::piped()),
233            abort: None,
234        }
235    }
236}
237
238impl<S: AsRef<OsStr>> From<Vec<S>> for Process {
239    fn from(mut command_args: Vec<S>) -> Self {
240        let command = command_args.remove(0);
241        let mut inner = Command::new(command);
242        inner.args(command_args);
243
244        Self::from(inner)
245    }
246}
247
248impl From<&Path> for Process {
249    fn from(path: &Path) -> Self {
250        let command = Command::new(path);
251        Self::from(command)
252    }
253}
254
255impl From<&str> for Process {
256    fn from(path: &str) -> Self {
257        let command = Command::new(path);
258        Self::from(command)
259    }
260}
261
262impl From<&PathBuf> for Process {
263    fn from(path: &PathBuf) -> Self {
264        let command = Command::new(path);
265        Self::from(command)
266    }
267}
268
269/// Convert std_stream to a stream of T
270pub fn into_stream<T, R>(std: R, is_stdout: bool) -> impl Stream<Item = T>
271where
272    T: From<(bool, Result<String>)>,
273    R: AsyncRead,
274{
275    std.pipe(BufReader::new)
276        .lines()
277        .pipe(LinesStream::new)
278        .map(move |v| T::from((is_stdout, v)))
279}
280
281#[cfg(test)]
282mod tests {
283    use tokio::io::AsyncWriteExt;
284
285    use crate::*;
286    use std::io::Result;
287
288    #[tokio::test]
289    async fn test_from_vector() -> Result<()> {
290        let mut stream = Process::from(vec![
291            "xcrun",
292            "simctl",
293            "launch",
294            "--terminate-running-process",
295            "--console",
296            "booted",
297            "tami5.Wordle",
298        ])
299        .spawn_and_stream()
300        .unwrap();
301
302        while let Some(output) = stream.next().await {
303            println!("{output}")
304        }
305        Ok(())
306    }
307
308    #[tokio::test]
309    async fn test_from_path() -> Result<()> {
310        let mut process: Process = "/bin/ls".into();
311
312        let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;
313        println!("{outputs:#?}");
314        Ok(())
315    }
316
317    #[tokio::test]
318    async fn test_new() -> Result<()> {
319        let mut process = Process::new("xcrun");
320
321        process.args(&[
322            "simctl",
323            "launch",
324            "--terminate-running-process",
325            "--console",
326            "booted",
327            "tami5.Wordle",
328        ]);
329
330        let mut stream = process.spawn_and_stream()?;
331
332        while let Some(output) = stream.next().await {
333            println!("{output:#?}")
334        }
335        Ok(())
336    }
337
338    #[tokio::test]
339    async fn test_abort() -> Result<()> {
340        let mut process = Process::new("xcrun");
341
342        process.args(&[
343            "/Users/tami5/Library/Caches/Xbase/swift_Control/Debug/Control.app/Contents/MacOS/Control",
344        ]);
345
346        let mut stream = process.spawn_and_stream()?;
347        tokio::spawn(async move {
348            while let Some(output) = stream.next().await {
349                println!("{output}")
350            }
351        });
352
353        tokio::time::sleep(std::time::Duration::new(5, 0)).await;
354
355        process.abort();
356
357        tokio::time::sleep(std::time::Duration::new(5, 0)).await;
358
359        Ok(())
360    }
361
362    #[tokio::test]
363    async fn test_dref_item_as_str() {
364        use ProcessItem::*;
365        let items = vec![
366            Output("Hello".into()),
367            Error("XXXXXXXXXX".into()),
368            Exit("0".into()),
369        ];
370        for item in items {
371            println!("{:?}", item.as_bytes())
372        }
373    }
374
375    #[tokio::test]
376    async fn communicate_with_running_process() -> Result<()> {
377        let mut process: Process = Process::new("sort");
378
379        // Set stdin (by default is set to null)
380        process.stdin(Stdio::piped());
381
382        // Get Stream;
383        let mut stream = process.spawn_and_stream().unwrap();
384
385        // Get writer from stdin;
386        let mut writer = process.take_stdin().unwrap();
387
388        // Start new running process
389        let reader_thread = tokio::spawn(async move {
390            while let Some(output) = stream.next().await {
391                if output.is_exit() {
392                    println!("DONE")
393                } else {
394                    println!("{output}")
395                }
396            }
397        });
398
399        let writer_thread = tokio::spawn(async move {
400            writer.write(b"b\nc\na\n").await.unwrap();
401            writer.write(b"f\ne\nd\n").await.unwrap();
402        });
403
404        writer_thread.await?;
405        reader_thread.await?;
406
407        Ok(())
408    }
409}