Skip to main content

rumtk_core/
pipelines.rs

1/*
2 *     rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 *     This toolkit aims to be reliable, simple, performant, and standards compliant.
4 *     Copyright (C) 2026  Luis M. Santos, M.D.
5 *
6 *     This program is free software: you can redistribute it and/or modify
7 *     it under the terms of the GNU General Public License as published by
8 *     the Free Software Foundation, either version 3 of the License, or
9 *     (at your option) any later version.
10 *
11 *     This program is distributed in the hope that it will be useful,
12 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *     GNU General Public License for more details.
15 *
16 *     You should have received a copy of the GNU General Public License
17 *     along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025  Luis M. Santos, M.D.
24 * Copyright (C) 2025  Nick Stephenson
25 * Copyright (C) 2025  Ethan Dixon
26 * Copyright (C) 2025  MedicalMasses L.L.C.
27 *
28 * This library is free software; you can redistribute it and/or
29 * modify it under the terms of the GNU Lesser General Public
30 * License as published by the Free Software Foundation; either
31 * version 2.1 of the License, or (at your option) any later version.
32 *
33 * This library is distributed in the hope that it will be useful,
34 * but WITHOUT ANY WARRANTY; without even the implied warranty of
35 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
36 * Lesser General Public License for more details.
37 *
38 * You should have received a copy of the GNU Lesser General Public
39 * License along with this library; if not, write to the Free Software
40 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
41 */
42
43pub mod pipeline_types {
44    use crate::strings::{RUMString, RUMStringConversions};
45    use crate::types::{RUMBuffer, RUMHashMap};
46
47    use crate::core::{RUMResult, RUMVec};
48    use std::process::{Child, Command};
49
50    pub type RUMCommandArgs = Vec<RUMString>;
51    pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
52    #[derive(Default, Debug, Clone)]
53    pub struct RUMCommand {
54        pub path: RUMString,
55        pub data: Option<RUMBuffer>,
56        pub args: RUMCommandArgs,
57        pub env: RUMCommandEnv,
58    }
59
60    impl RUMCommand {
61        pub fn new(
62            prog: &str,
63            data: &Option<RUMBuffer>,
64            args: &RUMCommandArgs,
65            env: &RUMCommandEnv,
66        ) -> Self {
67            RUMCommand {
68                path: prog.to_rumstring(),
69                args: args.clone(),
70                env: env.clone(),
71                data: data.clone(),
72            }
73        }
74    }
75
76    pub type RUMCommandLine = RUMVec<RUMCommand>;
77    pub type RUMPipelineCommand = Command;
78    pub type RUMPipelineProcess = Child;
79    pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
80    pub type RUMPipelineResult = RUMResult<RUMBuffer>;
81}
82
83pub mod pipeline_functions {
84    use super::pipeline_types::*;
85    use crate::core::RUMResult;
86    use crate::strings::rumtk_format;
87    use std::io::{Read, Write};
88
89    use crate::threading::threading_functions::async_sleep;
90    use crate::types::RUMBuffer;
91    use std::process::{Command, Stdio};
92
93    const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
94    const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
95
96    ///
97    /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
98    /// runtime can use to spawn a process.
99    ///
100    /// ## Example
101    ///
102    /// ```
103    /// use std::any::{Any, TypeId};
104    ///
105    /// use rumtk_core::strings::RUMString;
106    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
107    /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
108    ///
109    /// let command_name = "ls";
110    /// let mut command = RUMCommand::default();
111    /// command.path = RUMString::from(command_name);
112    ///
113    /// let sys_command = pipeline_generate_command(&command);
114    ///
115    /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
116    ///
117    /// ```
118    ///
119    pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
120        let mut cmd = Command::new(command.path.as_str());
121
122        for arg in command.args.iter() {
123            cmd.arg(arg);
124        }
125
126        cmd.envs(command.env.iter());
127
128        cmd.stdin(Stdio::piped())
129            .stdout(Stdio::piped())
130            .stderr(Stdio::piped());
131
132        cmd
133    }
134
135    ///
136    /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
137    ///
138    /// ## Example
139    ///
140    /// ```
141    /// use std::any::{Any, TypeId};
142    ///
143    /// use rumtk_core::strings::RUMString;
144    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
145    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
146    ///
147    /// let command_name = "ls";
148    /// let mut command = RUMCommand::default();
149    /// command.path = RUMString::from(command_name);
150    ///
151    /// let mut sys_command = pipeline_generate_command(&command);
152    ///
153    /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
154    ///
155    /// process.wait();
156    /// ```
157    ///
158    pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
159        match cmd.spawn() {
160            Ok(process) => Ok(process),
161            Err(e) => Err(rumtk_format!(
162                "Failed to spawn process {:?} because => {}",
163                cmd.get_program(),
164                e
165            )),
166        }
167    }
168
169    ///
170    /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
171    /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
172    ///
173    /// ## Example
174    ///
175    /// ```
176    /// use std::any::{Any, TypeId};
177    /// use std::process::Stdio;
178    ///
179    /// use rumtk_core::strings::RUMString;
180    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
181    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_processes, pipeline_spawn_process};
182    ///
183    /// let ls_name = "ls";
184    /// let mut ls_command = RUMCommand::default();
185    /// ls_command.path = RUMString::from(ls_name);
186    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
187    ///
188    /// let wc_name = "wc";
189    /// let mut wc_command = RUMCommand::default();
190    /// wc_command.path = RUMString::from(wc_name);
191    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
192    ///
193    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
194    /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
195    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
196    ///
197    /// sys_ls_process.wait();
198    /// sys_wc_process.wait();
199    /// ```
200    ///
201    pub fn pipeline_pipe_processes(
202        process: &mut RUMPipelineProcess,
203        piped: &mut RUMPipelineCommand,
204    ) -> RUMResult<()> {
205        let process_stdout = Stdio::from(match process.stdout.take() {
206            Some(stdout) => stdout,
207            None => {
208                return Err(rumtk_format!(
209                    "No stdout handle found for process {}.",
210                    process.id()
211                ));
212            }
213        });
214        let _ = piped.stdin(process_stdout);
215        Ok(())
216    }
217
218    ///
219    /// Retrieves the standard output generated by the completed process.
220    ///
221    /// ## Example
222    ///
223    /// ```
224    /// use std::any::{Any, TypeId};
225    /// use std::process::Stdio;
226    ///
227    /// use rumtk_core::strings::RUMString;
228    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
229    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process, pipeline_get_stdout, pipeline_pipe_processes};
230    ///
231    /// let ls_name = "ls";
232    /// let mut ls_command = RUMCommand::default();
233    /// ls_command.path = RUMString::from(ls_name);
234    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
235    ///
236    /// let wc_name = "wc";
237    /// let mut wc_command = RUMCommand::default();
238    /// wc_command.path = RUMString::from(wc_name);
239    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
240    ///
241    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
242    /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
243    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
244    ///
245    /// sys_ls_process.wait();
246    /// sys_wc_process.wait();
247    ///
248    /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
249    ///
250    /// let out_data = pipeline_get_stdout(pipeline).unwrap();
251    ///
252    /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
253    /// ```
254    ///
255    pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
256        let mut last_item = pipeline.pop().unwrap();
257        match last_item.wait_with_output() {
258            Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
259            Err(e) => Err(rumtk_format!(
260                "Issue reading last process output because => {}",
261                e
262            )),
263        }
264    }
265
266    ///
267    /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
268    ///
269    /// ## Example
270    ///
271    /// ```
272    /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
273    /// use rumtk_core::strings::RUMString;
274    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
275    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
276    /// use rumtk_core::types::RUMBuffer;
277    ///
278    /// let ls_name = "ls";
279    /// let mut ls_command = RUMCommand::default();
280    /// ls_command.path = RUMString::from(ls_name);
281    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
282    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
283    ///
284    /// pipeline_close_process_stdin(&mut sys_ls_process);
285    ///
286    ///
287    /// ```
288    ///
289    pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
290        // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
291        match process.stdin.take() {
292            Some(stdin) => {
293                drop(stdin);
294            }
295            None => {}
296        };
297    }
298
299    ///
300    /// Pipe data into a process.
301    ///
302    /// ## Example
303    ///
304    /// ```
305    /// use rumtk_core::strings::RUMString;
306    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
307    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
308    /// use rumtk_core::types::RUMBuffer;
309    ///
310    /// let ls_name = "ls";
311    /// let mut ls_command = RUMCommand::default();
312    /// ls_command.path = RUMString::from(ls_name);
313    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
314    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
315    /// pipeline_pipe_into_process(&mut sys_ls_process, &Some(RUMBuffer::default())).unwrap();
316    ///
317    /// let out = sys_ls_process.wait_with_output().unwrap();
318    ///
319    /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
320    /// ```
321    ///
322    pub fn pipeline_pipe_into_process(
323        process: &mut RUMPipelineProcess,
324        data: &Option<RUMBuffer>,
325    ) -> RUMResult<()> {
326        match data {
327            Some(data) => match process.stdin {
328                Some(ref mut stdin) => match stdin.write_all(&data) {
329                    Ok(_) => {}
330                    Err(e) => {
331                        return Err(rumtk_format!(
332                            "Failed to pipe data to stdin of process because => {}",
333                            e
334                        ))
335                    }
336                },
337                None => {}
338            },
339            None => {}
340        }
341        Ok(())
342    }
343
344    ///
345    /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
346    ///
347    /// ## Example
348    ///
349    /// ```
350    /// use rumtk_core::strings::RUMString;
351    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
352    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
353    ///
354    /// let ls_name = "ls";
355    /// let mut ls_command = RUMCommand::default();
356    /// ls_command.path = RUMString::from(ls_name);
357    ///
358    /// let wc_name = "wc";
359    /// let mut wc_command = RUMCommand::default();
360    /// wc_command.path = RUMString::from(wc_name);
361    ///
362    /// let commands = vec![
363    ///     ls_command,
364    ///     wc_command
365    /// ];
366    ///
367    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
368    ///
369    /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
370    /// ```
371    ///
372    pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
373        let first_command = commands.first().unwrap();
374
375        // Setup pipeline
376        let mut pipeline = vec![];
377
378        //Bootstrap first process in chain
379        let mut root = pipeline_generate_command(&first_command);
380        let mut parent_process = pipeline_spawn_process(&mut root)?;
381        pipeline_pipe_into_process(&mut parent_process, &mut first_command.data.clone())?;
382        pipeline.push(parent_process);
383
384        for cmd in commands.iter().skip(1) {
385            let mut new_root = pipeline_generate_command(cmd);
386            pipeline_pipe_processes(pipeline.last_mut().unwrap(), &mut new_root)?;
387            parent_process = pipeline_spawn_process(&mut new_root)?;
388            pipeline.push(parent_process);
389        }
390
391        Ok(pipeline)
392    }
393
394    ///
395    /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
396    /// consume the pipeline and return the output.
397    ///
398    /// ## Example
399    ///
400    /// ```
401    /// use rumtk_core::strings::RUMString;
402    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
403    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
404    /// use rumtk_core::{rumtk_resolve_task};
405    ///
406    /// let ls_name = "ls";
407    /// let mut ls_command = RUMCommand::default();
408    /// ls_command.path = RUMString::from(ls_name);
409    ///
410    /// let wc_name = "wc";
411    /// let mut wc_command = RUMCommand::default();
412    /// wc_command.path = RUMString::from(wc_name);
413    ///
414    /// let commands = vec![
415    ///     ls_command,
416    ///     wc_command
417    /// ];
418    ///
419    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
420    /// let result = rumtk_resolve_task!(pipeline_await_pipeline(pipeline)).unwrap().unwrap();
421    ///
422    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
423    /// ```
424    ///
425    pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
426        // Let's make sure the stdin is closed on the first process to make sure it exits instead of
427        // remain waiting for EOF in the stdin stream.
428        pipeline_close_process_stdin(pipeline.first_mut().unwrap());
429
430        // Now let's visit each process and await their completion!
431        for p in pipeline.iter_mut() {
432            loop {
433                match p.try_wait() {
434                    Ok(code) => match code {
435                        Some(code) => {
436                            if !code.success() {
437                                return Err(rumtk_format!(
438                                    "Process {} exited with non-success code => {}!",
439                                    p.id(),
440                                    code
441                                ));
442                            }
443                            break;
444                        }
445                        None => {
446                            async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
447                            continue;
448                        }
449                    },
450                    Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
451                };
452            }
453        }
454
455        let result = pipeline_get_stdout(pipeline)?;
456        Ok(result)
457    }
458
459    ///
460    /// Await for pipeline to complete execution. Once the pipeline execution ends,
461    /// consume the pipeline and return the output.
462    ///
463    /// ## Example
464    ///
465    /// ```
466    /// use rumtk_core::strings::RUMString;
467    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
468    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
469    ///
470    /// let ls_name = "ls";
471    /// let mut ls_command = RUMCommand::default();
472    /// ls_command.path = RUMString::from(ls_name);
473    ///
474    /// let wc_name = "wc";
475    /// let mut wc_command = RUMCommand::default();
476    /// wc_command.path = RUMString::from(wc_name);
477    ///
478    /// let commands = vec![
479    ///     ls_command,
480    ///     wc_command
481    /// ];
482    ///
483    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
484    /// let result = pipeline_wait_pipeline(pipeline).unwrap();
485    ///
486    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
487    /// ```
488    ///
489    pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
490        // Let's make sure the stdin is closed on the first process to make sure it exits instead of
491        // remain waiting for EOF in the stdin stream.
492        pipeline_close_process_stdin(pipeline.first_mut().unwrap());
493
494        // Now let's visit each process and await their completion!
495        for p in pipeline.iter_mut() {
496            match p.wait() {
497                Ok(code) => {
498                    if !code.success() {
499                        return Err(rumtk_format!(
500                            "Process {} exited with non-success code => {}!",
501                            p.id(),
502                            code
503                        ));
504                    }
505                    continue;
506                }
507                Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
508            };
509        }
510
511        let result = pipeline_get_stdout(pipeline)?;
512        Ok(result)
513    }
514}
515
516pub mod pipeline_macros {
517    ///
518    /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
519    /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
520    ///
521    /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
522    /// key value pair strings that we can use to update the process environment.
523    ///
524    /// ## Example
525    ///
526    /// ### Program Only
527    ///
528    /// ```
529    /// use rumtk_core::rumtk_pipeline_command;
530    ///
531    /// let command = rumtk_pipeline_command!("ls");
532    /// ```
533    ///
534    /// ### Program with Piped Data
535    ///
536    /// ```
537    /// use rumtk_core::rumtk_pipeline_command;
538    /// use rumtk_core::types::RUMBuffer;
539    /// use rumtk_core::strings::RUMStringConversions;
540    ///
541    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
542    /// ```
543    ///
544    /// ### Program with Args
545    ///
546    /// ```
547    /// use rumtk_core::rumtk_pipeline_command;
548    /// use rumtk_core::types::RUMBuffer;
549    /// use rumtk_core::strings::RUMStringConversions;
550    ///
551    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
552    ///     "-l".to_rumstring()
553    /// ]);
554    /// ```
555    ///
556    #[macro_export]
557    macro_rules! rumtk_pipeline_command {
558        ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
559            use $crate::pipelines::pipeline_types::RUMCommand;
560
561            RUMCommand::new($path, &Some($data), $args, $env)
562        }};
563        ( $path:expr, $data:expr, $args:expr ) => {{
564            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
565
566            RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
567        }};
568        ( $path:expr, $data:expr ) => {{
569            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
570
571            RUMCommand::new(
572                $path,
573                &Some($data),
574                &RUMCommandArgs::default(),
575                &RUMCommandEnv::default(),
576            )
577        }};
578        ( $path:expr ) => {{
579            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
580            use $crate::types::RUMBuffer;
581
582            RUMCommand::new(
583                $path,
584                &None,
585                &RUMCommandArgs::default(),
586                &RUMCommandEnv::default(),
587            )
588        }};
589    }
590
591    ///
592    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
593    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
594    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
595    ///
596    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
597    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
598    ///
599    /// ## Example
600    ///
601    /// ### Simple
602    ///
603    /// ```
604    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
605    /// use rumtk_core::core::{RUMResult};
606    /// use rumtk_core::strings::RUMStringConversions;
607    /// use rumtk_core::types::RUMBuffer;
608    ///
609    /// let f = async || -> RUMResult<()> {
610    ///     let result = rumtk_pipeline_run!(
611    ///         rumtk_pipeline_command!("ls"),
612    ///         rumtk_pipeline_command!("wc")
613    ///     ).unwrap();
614    ///
615    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
616    ///     Ok(())
617    /// };
618    ///
619    /// rumtk_resolve_task!(f()).unwrap();
620    /// ```
621    ///
622    /// ### With Buffer Piped In
623    ///
624    /// ```
625    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
626    /// use rumtk_core::core::{RUMResult, new_random_buffer};
627    /// use rumtk_core::strings::RUMStringConversions;
628    /// use rumtk_core::types::RUMBuffer;
629    ///
630    /// let f = || -> RUMResult<()> {
631    ///     let result = rumtk_pipeline_run!(
632    ///         rumtk_pipeline_command!("ls", new_random_buffer()),
633    ///         rumtk_pipeline_command!("wc")
634    ///     )?;
635    ///
636    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
637    ///     Ok(())
638    /// };
639    ///
640    /// f().unwrap();
641    /// ```
642    ///
643    #[macro_export]
644    macro_rules! rumtk_pipeline_run {
645        ( $($command:expr),+ ) => {{
646            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
647
648            let pipeline = pipeline_generate_pipeline(&vec![
649                $($command),+
650            ])?;
651
652            pipeline_wait_pipeline(pipeline)
653        }};
654    }
655
656    ///
657    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
658    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
659    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
660    ///
661    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
662    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
663    ///
664    /// This is the `async` flavor.
665    ///
666    /// ## Example
667    ///
668    /// ### Simple
669    ///
670    /// ```
671    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
672    /// use rumtk_core::core::{RUMResult};
673    /// use rumtk_core::strings::RUMStringConversions;
674    /// use rumtk_core::types::RUMBuffer;
675    ///
676    /// let f = async || -> RUMResult<()> {
677    ///     let result = rumtk_pipeline_run_async!(
678    ///         rumtk_pipeline_command!("ls"),
679    ///         rumtk_pipeline_command!("wc")
680    ///     ).await?;
681    ///
682    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
683    ///     Ok(())
684    /// };
685    ///
686    /// rumtk_resolve_task!(f()).unwrap();
687    /// ```
688    ///
689    /// ### With Buffer Piped In
690    ///
691    /// ```
692    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
693    /// use rumtk_core::core::{RUMResult, new_random_buffer};
694    /// use rumtk_core::strings::RUMStringConversions;
695    /// use rumtk_core::types::RUMBuffer;
696    ///
697    /// let f = async || -> RUMResult<()> {
698    ///     let result = rumtk_pipeline_run_async!(
699    ///         rumtk_pipeline_command!("wc", new_random_buffer())
700    ///     ).await?;
701    ///
702    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
703    ///     Ok(())
704    /// };
705    ///
706    /// rumtk_resolve_task!(f()).unwrap();
707    /// ```
708    ///
709    /// ### With Buffer Piped In W/ Return
710    ///
711    /// ```
712    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
713    /// use rumtk_core::core::{RUMResult, new_random_buffer};
714    /// use rumtk_core::strings::{RUMString, RUMStringConversions, RUMArrayConversions};
715    /// use rumtk_core::types::RUMBuffer;
716    ///
717    /// let expected = "1024\n";
718    ///
719    /// let f = async || -> RUMResult<RUMBuffer> {
720    ///     let result = rumtk_pipeline_run_async!(
721    ///         rumtk_pipeline_command!("wc", new_random_buffer())
722    ///     ).await?;
723    ///
724    ///     Ok(result)
725    /// };
726    ///
727    /// let result = rumtk_resolve_task!(f()).unwrap().unwrap();
728    /// let string = result.to_vec().to_rumstring();
729    /// let result_buffer_size = string.split("      ").last().unwrap().split("    ").last().unwrap().to_rumstring();
730    ///
731    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
732    /// assert_eq!(&result_buffer_size, expected, "Pipeline returned an unexpected result from command wc! => {:?}\nvs.\n{:?}", &result_buffer_size, &expected);
733    /// ```
734    ///
735    #[macro_export]
736    macro_rules! rumtk_pipeline_run_async {
737        ( $($command:expr),+ ) => {{
738            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
739
740            let pipeline = pipeline_generate_pipeline(&vec![
741                $($command),+
742            ])?;
743
744            pipeline_await_pipeline(pipeline)
745        }};
746    }
747}