process_stream/
lib.rs

1//! process-stream is a thin wrapper around [`tokio::process`] to make it streamable
2#![deny(future_incompatible)]
3#![deny(nonstandard_style)]
4#![deny(missing_docs)]
5#![deny(rustdoc::broken_intra_doc_links)]
6#![doc = include_str!("../README.md")]
7
8/// Alias for a stream of process items
9pub type ProcessStream = Pin<Box<dyn Stream<Item = ProcessItem> + Send>>;
10
11pub use async_stream::stream;
12use io::Result;
13use std::{
14    ffi::OsStr,
15    io,
16    ops::{Deref, DerefMut},
17    path::{Path, PathBuf},
18    pin::Pin,
19    process::Stdio,
20    sync::Arc,
21};
22use tap::Pipe;
23use {
24    tokio::{
25        io::{AsyncBufReadExt, AsyncRead, BufReader},
26        process::{ChildStdin, Command},
27        sync::Notify,
28    },
29    tokio_stream::wrappers::LinesStream,
30};
31
32mod item;
33pub use async_trait::async_trait;
34pub use futures::Stream;
35pub use futures::StreamExt;
36pub use futures::TryStreamExt;
37pub use item::ProcessItem;
38pub use tokio_stream;
39
40#[async_trait]
41/// ProcessExt trait that needs to be implemented to make something streamable
42pub trait ProcessExt {
43    /// Get command that will be used to create a child process from
44    fn get_command(&mut self) -> &mut Command;
45
46    /// Get command after settings the required pipes;
47    fn command(&mut self) -> &mut Command {
48        let stdin = self.get_stdin().take().unwrap();
49        let stdout = self.get_stdout().take().unwrap();
50        let stderr = self.get_stderr().take().unwrap();
51        let command = self.get_command();
52
53        #[cfg(windows)]
54        command.creation_flags(0x08000000);
55
56        command.stdin(stdin);
57        command.stdout(stdout);
58        command.stderr(stderr);
59        command
60    }
61
62    /// Spawn and stream process
63    fn spawn_and_stream(&mut self) -> Result<ProcessStream> {
64        self._spawn_and_stream()
65    }
66
67    /// Spawn and stream process (avoid custom implementation, use spawn_and_stream instead)
68    fn _spawn_and_stream(&mut self) -> Result<ProcessStream> {
69        let abort = Arc::new(Notify::new());
70
71        let mut child = self.command().spawn()?;
72
73        let stdout = child.stdout.take().unwrap();
74        let stderr = child.stderr.take().unwrap();
75
76        self.set_child_stdin(child.stdin.take());
77        self.set_aborter(Some(abort.clone()));
78
79        let stdout_stream = into_stream(stdout, true);
80        let stderr_stream = into_stream(stderr, false);
81        let mut std_stream = tokio_stream::StreamExt::merge(stdout_stream, stderr_stream);
82        let stream = stream! {
83            loop {
84                use ProcessItem::*;
85                tokio::select! {
86                    Some(output) = std_stream.next() => yield output,
87                    status = child.wait() => {
88                        // Drain the stream before exiting
89                        while let Some(output) = std_stream.next().await {
90                            yield output
91                        }
92                        match status {
93                            Err(err) => yield Error(err.to_string()),
94                            Ok(status) => {
95                                match status.code() {
96                                    Some(code) => yield Exit(format!("{code}")),
97                                    None => yield Error("Unable to get exit code".into()),
98                                }
99                            }
100                        }
101                        break;
102                    },
103                    _ = abort.notified() => {
104                        match child.start_kill() {
105                            Ok(()) => yield Exit("0".into()),
106                            Err(err) => yield Error(format!("abort Process Error: {err}")),
107                        };
108                        break;
109                    }
110                }
111            }
112        };
113
114        Ok(stream.boxed())
115    }
116    /// Get a notifier that can be used to abort the process
117    fn aborter(&self) -> Option<Arc<Notify>>;
118    /// Set the notifier that should be used to abort the process
119    fn set_aborter(&mut self, aborter: Option<Arc<Notify>>);
120    /// Get process stdin
121    fn take_stdin(&mut self) -> Option<ChildStdin> {
122        None
123    }
124    /// Set process stdin
125    fn set_child_stdin(&mut self, _child_stdin: Option<ChildStdin>) {}
126    /// Get process stdin pipe
127    fn get_stdin(&mut self) -> Option<Stdio> {
128        Some(Stdio::null())
129    }
130    /// get process stdout pipe
131    fn get_stdout(&mut self) -> Option<Stdio> {
132        Some(Stdio::piped())
133    }
134    /// get process stderr pipe
135    fn get_stderr(&mut self) -> Option<Stdio> {
136        Some(Stdio::piped())
137    }
138}
139
140/// Thin Wrapper around [`Command`] to make it streamable
141pub struct Process {
142    inner: Command,
143    stdin: Option<ChildStdin>,
144    set_stdin: Option<Stdio>,
145    set_stdout: Option<Stdio>,
146    set_stderr: Option<Stdio>,
147    abort: Option<Arc<Notify>>,
148}
149
150impl ProcessExt for Process {
151    fn get_command(&mut self) -> &mut Command {
152        &mut self.inner
153    }
154
155    fn aborter(&self) -> Option<Arc<Notify>> {
156        self.abort.clone()
157    }
158
159    fn set_aborter(&mut self, aborter: Option<Arc<Notify>>) {
160        self.abort = aborter
161    }
162
163    fn take_stdin(&mut self) -> Option<ChildStdin> {
164        self.stdin.take()
165    }
166
167    fn set_child_stdin(&mut self, child_stdin: Option<ChildStdin>) {
168        self.stdin = child_stdin;
169    }
170
171    fn get_stdin(&mut self) -> Option<Stdio> {
172        self.set_stdin.take()
173    }
174
175    fn get_stdout(&mut self) -> Option<Stdio> {
176        self.set_stdout.take()
177    }
178
179    fn get_stderr(&mut self) -> Option<Stdio> {
180        self.set_stderr.take()
181    }
182}
183
184impl Process {
185    /// Create new process with a program
186    pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
187        Self {
188            inner: Command::new(program),
189            set_stdin: Some(Stdio::null()),
190            set_stdout: Some(Stdio::piped()),
191            set_stderr: Some(Stdio::piped()),
192            stdin: None,
193            abort: None,
194        }
195    }
196
197    /// Set the process's stdin.
198    pub fn stdin(&mut self, stdin: Stdio) {
199        self.set_stdin = stdin.into();
200    }
201
202    /// Set the process's stdout.
203    pub fn stdout(&mut self, stdout: Stdio) {
204        self.set_stdout = stdout.into();
205    }
206
207    /// Set the process's stderr.
208    pub fn stderr(&mut self, stderr: Stdio) {
209        self.set_stderr = stderr.into();
210    }
211
212    /// Abort the process
213    pub fn abort(&self) {
214        self.aborter().map(|k| k.notify_waiters());
215    }
216}
217
218impl Deref for Process {
219    type Target = Command;
220
221    fn deref(&self) -> &Self::Target {
222        &self.inner
223    }
224}
225
226impl DerefMut for Process {
227    fn deref_mut(&mut self) -> &mut Self::Target {
228        &mut self.inner
229    }
230}
231
232impl From<Command> for Process {
233    fn from(command: Command) -> Self {
234        Self {
235            inner: command,
236            stdin: None,
237            set_stdin: Some(Stdio::null()),
238            set_stdout: Some(Stdio::piped()),
239            set_stderr: Some(Stdio::piped()),
240            abort: None,
241        }
242    }
243}
244
245impl<S: AsRef<OsStr>> From<Vec<S>> for Process {
246    fn from(mut command_args: Vec<S>) -> Self {
247        let command = command_args.remove(0);
248        let mut inner = Command::new(command);
249        inner.args(command_args);
250
251        Self::from(inner)
252    }
253}
254
255impl From<&Path> for Process {
256    fn from(path: &Path) -> Self {
257        let command = Command::new(path);
258        Self::from(command)
259    }
260}
261
262impl From<&str> for Process {
263    fn from(path: &str) -> Self {
264        let command = Command::new(path);
265        Self::from(command)
266    }
267}
268
269impl From<&PathBuf> for Process {
270    fn from(path: &PathBuf) -> Self {
271        let command = Command::new(path);
272        Self::from(command)
273    }
274}
275
276/// Convert std_stream to a stream of T
277pub fn into_stream<T, R>(std: R, is_stdout: bool) -> impl Stream<Item = T>
278where
279    T: From<(bool, Result<String>)>,
280    R: AsyncRead,
281{
282    std.pipe(BufReader::new)
283        .lines()
284        .pipe(LinesStream::new)
285        .map(move |v| T::from((is_stdout, v)))
286}
287
288#[cfg(test)]
289mod tests {
290    use tokio::io::AsyncWriteExt;
291
292    use crate::*;
293    use std::io::Result;
294
295    #[tokio::test]
296    async fn test_from_path() -> Result<()> {
297        let mut process: Process = "/bin/ls".into();
298
299        let outputs = process.spawn_and_stream()?.collect::<Vec<_>>().await;
300        println!("{outputs:#?}");
301        Ok(())
302    }
303
304    #[tokio::test]
305    async fn test_dref_item_as_str() {
306        use ProcessItem::*;
307        let items = vec![
308            Output("Hello".into()),
309            Error("XXXXXXXXXX".into()),
310            Exit("0".into()),
311        ];
312        for item in items {
313            println!("{:?}", item.as_bytes())
314        }
315    }
316
317    #[tokio::test]
318    async fn communicate_with_running_process() -> Result<()> {
319        let mut process: Process = Process::new("sort");
320
321        // Set stdin (by default is set to null)
322        process.stdin(Stdio::piped());
323
324        // Get Stream;
325        let mut stream = process.spawn_and_stream().unwrap();
326
327        // Get writer from stdin;
328        let mut writer = process.take_stdin().unwrap();
329
330        // Start new running process
331        let reader_thread = tokio::spawn(async move {
332            while let Some(output) = stream.next().await {
333                if output.is_exit() {
334                    println!("DONE")
335                } else {
336                    println!("{output}")
337                }
338            }
339        });
340
341        let writer_thread = tokio::spawn(async move {
342            writer.write(b"b\nc\na\n").await.unwrap();
343            writer.write(b"f\ne\nd\n").await.unwrap();
344        });
345
346        writer_thread.await?;
347        reader_thread.await?;
348
349        Ok(())
350    }
351}