process_stream/
lib.rs

1//! process-stream is a thin wrapper around [`tokio::process`] to make it streamable
2#![deny(future_incompatible)]
3#![deny(nonstandard_style)]
4#![deny(missing_docs)]
5#![deny(rustdoc::broken_intra_doc_links)]
6#![doc = include_str!("../README.md")]
7
8/// Alias for a stream of process items
9pub type ProcessStream = Pin<Box<dyn Stream<Item = ProcessItem> + Send>>;
10
11pub use async_stream::stream;
12use io::Result;
13use std::{
14    ffi::OsStr,
15    io,
16    ops::{Deref, DerefMut},
17    path::{Path, PathBuf},
18    pin::Pin,
19    process::Stdio,
20    sync::Arc,
21};
22use tap::Pipe;
23use {
24    tokio::{
25        io::{AsyncBufReadExt, AsyncRead, BufReader},
26        process::{ChildStdin, Command},
27        sync::Notify,
28    },
29    tokio_stream::wrappers::LinesStream,
30};
31
32mod item;
33pub use futures::Stream;
34pub use futures::StreamExt;
35pub use futures::TryStreamExt;
36pub use item::ProcessItem;
37pub use tokio_stream;
38
39/// ProcessExt trait that needs to be implemented to make something streamable
40pub trait ProcessExt {
41    /// Get command that will be used to create a child process from
42    fn get_command(&mut self) -> &mut Command;
43
44    /// Get command after settings the required pipes;
45    fn command(&mut self) -> &mut Command {
46        let stdin = self.get_stdin().take().unwrap();
47        let stdout = self.get_stdout().take().unwrap();
48        let stderr = self.get_stderr().take().unwrap();
49        let command = self.get_command();
50
51        #[cfg(windows)]
52        command.creation_flags(0x08000000);
53
54        command.stdin(stdin);
55        command.stdout(stdout);
56        command.stderr(stderr);
57        command
58    }
59
60    /// Spawn and stream process
61    fn spawn_and_stream(&mut self) -> Result<ProcessStream> {
62        self._spawn_and_stream()
63    }
64
65    /// Spawn and stream process (avoid custom implementation, use spawn_and_stream instead)
66    fn _spawn_and_stream(&mut self) -> Result<ProcessStream> {
67        let abort = Arc::new(Notify::new());
68
69        let mut child = self.command().spawn()?;
70
71        let stdout = child.stdout.take().unwrap();
72        let stderr = child.stderr.take().unwrap();
73
74        self.set_child_stdin(child.stdin.take());
75        self.set_aborter(Some(abort.clone()));
76
77        let stdout_stream = into_stream(stdout, true);
78        let stderr_stream = into_stream(stderr, false);
79        let mut std_stream = tokio_stream::StreamExt::merge(stdout_stream, stderr_stream);
80        let stream = stream! {
81            loop {
82                use ProcessItem::*;
83                tokio::select! {
84                    Some(output) = std_stream.next() => yield output,
85                    status = child.wait() => {
86                        // Drain the stream before exiting
87                        while let Some(output) = std_stream.next().await {
88                            yield output
89                        }
90                        match status {
91                            Err(err) => yield Error(err.to_string()),
92                            Ok(status) => {
93                                match status.code() {
94                                    Some(code) => yield Exit(format!("{code}")),
95                                    None => yield Error("Unable to get exit code".into()),
96                                }
97                            }
98                        }
99                        break;
100                    },
101                    _ = abort.notified() => {
102                        match child.start_kill() {
103                            Ok(()) => yield Exit("0".into()),
104                            Err(err) => yield Error(format!("abort Process Error: {err}")),
105                        };
106                        break;
107                    }
108                }
109            }
110        };
111
112        Ok(stream.boxed())
113    }
114    /// Get a notifier that can be used to abort the process
115    fn aborter(&self) -> Option<Arc<Notify>>;
116    /// Set the notifier that should be used to abort the process
117    fn set_aborter(&mut self, aborter: Option<Arc<Notify>>);
118    /// Get process stdin
119    fn take_stdin(&mut self) -> Option<ChildStdin> {
120        None
121    }
122    /// Set process stdin
123    fn set_child_stdin(&mut self, _child_stdin: Option<ChildStdin>) {}
124    /// Get process stdin pipe
125    fn get_stdin(&mut self) -> Option<Stdio> {
126        Some(Stdio::null())
127    }
128    /// get process stdout pipe
129    fn get_stdout(&mut self) -> Option<Stdio> {
130        Some(Stdio::piped())
131    }
132    /// get process stderr pipe
133    fn get_stderr(&mut self) -> Option<Stdio> {
134        Some(Stdio::piped())
135    }
136}
137
138/// Thin Wrapper around [`Command`] to make it streamable
139pub struct Process {
140    inner: Command,
141    stdin: Option<ChildStdin>,
142    set_stdin: Option<Stdio>,
143    set_stdout: Option<Stdio>,
144    set_stderr: Option<Stdio>,
145    abort: Option<Arc<Notify>>,
146}
147
148impl ProcessExt for Process {
149    fn get_command(&mut self) -> &mut Command {
150        &mut self.inner
151    }
152
153    fn aborter(&self) -> Option<Arc<Notify>> {
154        self.abort.clone()
155    }
156
157    fn set_aborter(&mut self, aborter: Option<Arc<Notify>>) {
158        self.abort = aborter
159    }
160
161    fn take_stdin(&mut self) -> Option<ChildStdin> {
162        self.stdin.take()
163    }
164
165    fn set_child_stdin(&mut self, child_stdin: Option<ChildStdin>) {
166        self.stdin = child_stdin;
167    }
168
169    fn get_stdin(&mut self) -> Option<Stdio> {
170        self.set_stdin.take()
171    }
172
173    fn get_stdout(&mut self) -> Option<Stdio> {
174        self.set_stdout.take()
175    }
176
177    fn get_stderr(&mut self) -> Option<Stdio> {
178        self.set_stderr.take()
179    }
180}
181
182impl Process {
183    /// Create new process with a program
184    pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
185        Self {
186            inner: Command::new(program),
187            set_stdin: Some(Stdio::null()),
188            set_stdout: Some(Stdio::piped()),
189            set_stderr: Some(Stdio::piped()),
190            stdin: None,
191            abort: None,
192        }
193    }
194
195    /// Set the process's stdin.
196    pub fn stdin(&mut self, stdin: Stdio) {
197        self.set_stdin = stdin.into();
198    }
199
200    /// Set the process's stdout.
201    pub fn stdout(&mut self, stdout: Stdio) {
202        self.set_stdout = stdout.into();
203    }
204
205    /// Set the process's stderr.
206    pub fn stderr(&mut self, stderr: Stdio) {
207        self.set_stderr = stderr.into();
208    }
209
210    /// Abort the process
211    pub fn abort(&self) {
212        self.aborter().map(|k| k.notify_waiters());
213    }
214}
215
216impl Deref for Process {
217    type Target = Command;
218
219    fn deref(&self) -> &Self::Target {
220        &self.inner
221    }
222}
223
224impl DerefMut for Process {
225    fn deref_mut(&mut self) -> &mut Self::Target {
226        &mut self.inner
227    }
228}
229
230impl From<Command> for Process {
231    fn from(command: Command) -> Self {
232        Self {
233            inner: command,
234            stdin: None,
235            set_stdin: Some(Stdio::null()),
236            set_stdout: Some(Stdio::piped()),
237            set_stderr: Some(Stdio::piped()),
238            abort: None,
239        }
240    }
241}
242
243impl<S: AsRef<OsStr>> From<Vec<S>> for Process {
244    fn from(mut command_args: Vec<S>) -> Self {
245        let command = command_args.remove(0);
246        let mut inner = Command::new(command);
247        inner.args(command_args);
248
249        Self::from(inner)
250    }
251}
252
253impl From<&Path> for Process {
254    fn from(path: &Path) -> Self {
255        let command = Command::new(path);
256        Self::from(command)
257    }
258}
259
260impl From<&str> for Process {
261    fn from(path: &str) -> Self {
262        let command = Command::new(path);
263        Self::from(command)
264    }
265}
266
267impl From<&PathBuf> for Process {
268    fn from(path: &PathBuf) -> Self {
269        let command = Command::new(path);
270        Self::from(command)
271    }
272}
273
274/// Convert std_stream to a stream of T
275pub fn into_stream<T, R>(std: R, is_stdout: bool) -> impl Stream<Item = T>
276where
277    T: From<(bool, Result<String>)>,
278    R: AsyncRead,
279{
280    std.pipe(BufReader::new)
281        .lines()
282        .pipe(LinesStream::new)
283        .map(move |v| T::from((is_stdout, v)))
284}
285
286#[cfg(test)]
287mod tests {
288    use tokio::io::AsyncWriteExt;
289
290    use crate::*;
291    use std::io::Result;
292
293    #[tokio::test]
294    async fn test_from_path() -> Result<()> {
295        let mut process: Process = "/bin/ls".into();
296
297        let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;
298        println!("{outputs:#?}");
299        Ok(())
300    }
301
302    #[tokio::test]
303    async fn test_dref_item_as_str() {
304        use ProcessItem::*;
305        let items = vec![
306            Output("Hello".into()),
307            Error("XXXXXXXXXX".into()),
308            Exit("0".into()),
309        ];
310        for item in items {
311            println!("{:?}", item.as_bytes())
312        }
313    }
314
315    #[tokio::test]
316    async fn communicate_with_running_process() -> Result<()> {
317        let mut process: Process = Process::new("sort");
318
319        // Set stdin (by default is set to null)
320        process.stdin(Stdio::piped());
321
322        // Get Stream;
323        let mut stream = process.spawn_and_stream().unwrap();
324
325        // Get writer from stdin;
326        let mut writer = process.take_stdin().unwrap();
327
328        // Start new running process
329        let reader_thread = tokio::spawn(async move {
330            while let Some(output) = stream.next().await {
331                if output.is_exit() {
332                    println!("DONE")
333                } else {
334                    println!("{output}")
335                }
336            }
337        });
338
339        let writer_thread = tokio::spawn(async move {
340            writer.write(b"b\nc\na\n").await.unwrap();
341            writer.write(b"f\ne\nd\n").await.unwrap();
342        });
343
344        writer_thread.await?;
345        reader_thread.await?;
346
347        Ok(())
348    }
349}