Skip to main content

rumtk_core/
pipelines.rs

1/*
2 *     rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 *     This toolkit aims to be reliable, simple, performant, and standards compliant.
4 *     Copyright (C) 2026  Luis M. Santos, M.D.
5 *
6 *     This program is free software: you can redistribute it and/or modify
7 *     it under the terms of the GNU General Public License as published by
8 *     the Free Software Foundation, either version 3 of the License, or
9 *     (at your option) any later version.
10 *
11 *     This program is distributed in the hope that it will be useful,
12 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *     GNU General Public License for more details.
15 *
16 *     You should have received a copy of the GNU General Public License
17 *     along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025  Luis M. Santos, M.D.
24 * Copyright (C) 2025  Nick Stephenson
25 * Copyright (C) 2025  Ethan Dixon
26 * Copyright (C) 2025  MedicalMasses L.L.C.
27 *
28 * This library is free software; you can redistribute it and/or
29 * modify it under the terms of the GNU Lesser General Public
30 * License as published by the Free Software Foundation; either
31 * version 2.1 of the License, or (at your option) any later version.
32 *
33 * This library is distributed in the hope that it will be useful,
34 * but WITHOUT ANY WARRANTY; without even the implied warranty of
35 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
36 * Lesser General Public License for more details.
37 *
38 * You should have received a copy of the GNU Lesser General Public
39 * License along with this library; if not, write to the Free Software
40 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
41 */
42
43pub mod pipeline_types {
44    use crate::strings::{RUMString, RUMStringConversions};
45    use crate::types::{RUMBuffer, RUMHashMap};
46
47    use crate::core::{RUMResult, RUMVec};
48    use std::process::{Child, Command};
49
50    pub type RUMCommandArgs = Vec<RUMString>;
51    pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
52    #[derive(Default, Debug, Clone)]
53    pub struct RUMCommand {
54        pub path: RUMString,
55        pub data: Option<RUMBuffer>,
56        pub args: RUMCommandArgs,
57        pub env: RUMCommandEnv,
58    }
59
60    impl RUMCommand {
61        pub fn new(
62            prog: &str,
63            data: &Option<RUMBuffer>,
64            args: &RUMCommandArgs,
65            env: &RUMCommandEnv,
66        ) -> Self {
67            RUMCommand {
68                path: prog.to_rumstring(),
69                args: args.clone(),
70                env: env.clone(),
71                data: data.clone(),
72            }
73        }
74    }
75
76    pub type RUMCommandLine = RUMVec<RUMCommand>;
77    pub type RUMPipelineCommand = Command;
78    pub type RUMPipelineProcess = Child;
79    pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
80    pub type RUMPipelineResult = RUMResult<RUMBuffer>;
81}
82
83pub mod pipeline_functions {
84    use super::pipeline_types::*;
85    use crate::core::RUMResult;
86    use crate::strings::rumtk_format;
87    use std::io::{Read, Write};
88
89    use crate::threading::threading_functions::async_sleep;
90    use crate::types::RUMBuffer;
91    use std::process::{Command, Stdio};
92
93    const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
94    const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
95
96    ///
97    /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
98    /// runtime can use to spawn a process.
99    ///
100    /// ## Example
101    ///
102    /// ```
103    /// use std::any::{Any, TypeId};
104    ///
105    /// use rumtk_core::strings::RUMString;
106    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
107    /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
108    ///
109    /// let command_name = "ls";
110    /// let mut command = RUMCommand::default();
111    /// command.path = RUMString::from(command_name);
112    ///
113    /// let sys_command = pipeline_generate_command(&command);
114    ///
115    /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
116    ///
117    /// ```
118    ///
119    pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
120        let mut cmd = Command::new(command.path.as_str());
121
122        for arg in command.args.iter() {
123            cmd.arg(arg);
124        }
125
126        cmd.envs(command.env.iter());
127
128        cmd.stdout(Stdio::piped())
129            .stderr(Stdio::piped())
130            .stdin(Stdio::piped());
131
132        cmd
133    }
134
135    ///
136    /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
137    ///
138    /// ## Example
139    ///
140    /// ```
141    /// use std::any::{Any, TypeId};
142    ///
143    /// use rumtk_core::strings::RUMString;
144    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
145    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
146    ///
147    /// let command_name = "ls";
148    /// let mut command = RUMCommand::default();
149    /// command.path = RUMString::from(command_name);
150    ///
151    /// let mut sys_command = pipeline_generate_command(&command);
152    ///
153    /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
154    ///
155    /// process.wait();
156    /// ```
157    ///
158    pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
159        match cmd.spawn() {
160            Ok(process) => Ok(process),
161            Err(e) => Err(rumtk_format!(
162                "Failed to spawn process {:?} because => {}",
163                cmd.get_program(),
164                e
165            )),
166        }
167    }
168
169    ///
170    /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
171    /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
172    ///
173    /// ## Example
174    ///
175    /// ```
176    /// use std::any::{Any, TypeId};
177    /// use std::process::Stdio;
178    ///
179    /// use rumtk_core::strings::RUMString;
180    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
181    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process};
182    ///
183    /// let ls_name = "ls";
184    /// let mut ls_command = RUMCommand::default();
185    /// ls_command.path = RUMString::from(ls_name);
186    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
187    ///
188    /// let wc_name = "wc";
189    /// let mut wc_command = RUMCommand::default();
190    /// wc_command.path = RUMString::from(wc_name);
191    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
192    ///
193    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
194    /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
195    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
196    ///
197    /// sys_ls_process.wait();
198    /// sys_wc_process.wait();
199    /// ```
200    ///
201    pub fn pipeline_pipe_process(
202        process: &mut RUMPipelineProcess,
203        piped: &mut RUMPipelineCommand,
204    ) -> RUMResult<()> {
205        let process_stdout = Stdio::from(match process.stdout.take() {
206            Some(stdout) => stdout,
207            None => {
208                return Err(rumtk_format!(
209                    "No stdout handle found for process {}.",
210                    process.id()
211                ));
212            }
213        });
214        let _ = piped.stdin(process_stdout);
215        Ok(())
216    }
217
218    ///
219    /// Retrieves the standard output generated by the completed process.
220    ///
221    /// ## Example
222    ///
223    /// ```
224    /// use std::any::{Any, TypeId};
225    /// use std::process::Stdio;
226    ///
227    /// use rumtk_core::strings::RUMString;
228    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
229    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process, pipeline_get_stdout};
230    ///
231    /// let ls_name = "ls";
232    /// let mut ls_command = RUMCommand::default();
233    /// ls_command.path = RUMString::from(ls_name);
234    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
235    ///
236    /// let wc_name = "wc";
237    /// let mut wc_command = RUMCommand::default();
238    /// wc_command.path = RUMString::from(wc_name);
239    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
240    ///
241    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
242    /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
243    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
244    ///
245    /// sys_ls_process.wait();
246    /// sys_wc_process.wait();
247    ///
248    /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
249    ///
250    /// let out_data = pipeline_get_stdout(pipeline).unwrap();
251    ///
252    /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
253    /// ```
254    ///
255    pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
256        let mut last_item = pipeline.pop().unwrap();
257        match last_item.wait_with_output() {
258            Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
259            Err(e) => Err(rumtk_format!(
260                "Issue reading last process output because => {}",
261                e
262            )),
263        }
264    }
265
266    ///
267    /// Pipe data into a process.
268    ///
269    /// ## Example
270    ///
271    /// ```
272    /// use rumtk_core::strings::RUMString;
273    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
274    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
275    /// use rumtk_core::types::RUMBuffer;
276    ///
277    /// let ls_name = "ls";
278    /// let mut ls_command = RUMCommand::default();
279    /// ls_command.path = RUMString::from(ls_name);
280    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
281    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
282    /// pipeline_pipe_into_process(&mut sys_ls_process, &Some(RUMBuffer::default())).unwrap();
283    ///
284    /// let out = sys_ls_process.wait_with_output().unwrap();
285    ///
286    /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
287    /// ```
288    ///
289    pub fn pipeline_pipe_into_process(
290        process: &mut RUMPipelineProcess,
291        data: &Option<RUMBuffer>,
292    ) -> RUMResult<()> {
293        match data {
294            Some(data) => match process.stdin {
295                Some(ref mut stdin) => match stdin.write_all(&data) {
296                    Ok(_) => {}
297                    Err(e) => {
298                        return Err(rumtk_format!(
299                            "Failed to pipe data to stdin of process because => {}",
300                            e
301                        ))
302                    }
303                },
304                None => {}
305            },
306            None => {}
307        }
308        Ok(())
309    }
310
311    pub fn pipeline_connect_processes<'a>(
312        root: &'a mut RUMPipelineCommand,
313        piped: &'a mut RUMPipelineCommand,
314        data: &'a Option<RUMBuffer>,
315    ) -> RUMResult<RUMPipelineProcess> {
316        let mut root_process = pipeline_spawn_process(root)?;
317        pipeline_pipe_process(&mut root_process, piped)?;
318
319        pipeline_pipe_into_process(&mut root_process, data)?;
320
321        Ok(root_process)
322    }
323
324    ///
325    /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
326    ///
327    /// ## Example
328    ///
329    /// ```
330    /// use rumtk_core::strings::RUMString;
331    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
332    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
333    ///
334    /// let ls_name = "ls";
335    /// let mut ls_command = RUMCommand::default();
336    /// ls_command.path = RUMString::from(ls_name);
337    ///
338    /// let wc_name = "wc";
339    /// let mut wc_command = RUMCommand::default();
340    /// wc_command.path = RUMString::from(wc_name);
341    ///
342    /// let commands = vec![
343    ///     ls_command,
344    ///     wc_command
345    /// ];
346    ///
347    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
348    ///
349    /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
350    /// ```
351    ///
352    pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
353        let first_command = commands.first().unwrap();
354        let mut root = pipeline_generate_command(&first_command);
355        let mut data = first_command.data.clone();
356        let mut parent_process;
357
358        // Setup pipeline
359        let mut pipeline = vec![];
360        root.stdin(Stdio::piped()).stdout(Stdio::piped());
361
362        for cmd in commands.iter().skip(1) {
363            let mut new_root = pipeline_generate_command(cmd);
364            parent_process = pipeline_connect_processes(&mut root, &mut new_root, &data)?;
365            pipeline.push(parent_process);
366            root = new_root;
367            data = None;
368        }
369
370        pipeline.push(pipeline_spawn_process(&mut root)?);
371
372        Ok(pipeline)
373    }
374
375    ///
376    /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
377    /// consume the pipeline and return the output.
378    ///
379    /// ## Example
380    ///
381    /// ```
382    /// use rumtk_core::strings::RUMString;
383    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
384    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
385    /// use rumtk_core::{rumtk_resolve_task, rumtk_init_threads};
386    ///
387    /// let ls_name = "ls";
388    /// let mut ls_command = RUMCommand::default();
389    /// ls_command.path = RUMString::from(ls_name);
390    ///
391    /// let wc_name = "wc";
392    /// let mut wc_command = RUMCommand::default();
393    /// wc_command.path = RUMString::from(wc_name);
394    ///
395    /// let commands = vec![
396    ///     ls_command,
397    ///     wc_command
398    /// ];
399    ///
400    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
401    /// let rt = rumtk_init_threads!(&5);
402    /// let result = rumtk_resolve_task!(rt, pipeline_await_pipeline(pipeline)).unwrap();
403    ///
404    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
405    /// ```
406    ///
407    pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
408        for p in pipeline.iter_mut() {
409            loop {
410                match p.try_wait() {
411                    Ok(code) => match code {
412                        Some(code) => {
413                            if !code.success() {
414                                return Err(rumtk_format!(
415                                    "Process {} exited with non-success code => {}!",
416                                    p.id(),
417                                    code
418                                ));
419                            }
420                            break;
421                        }
422                        None => {
423                            async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
424                            continue;
425                        }
426                    },
427                    Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
428                };
429            }
430        }
431
432        let result = pipeline_get_stdout(pipeline)?;
433        Ok(result)
434    }
435
436    ///
437    /// Await for pipeline to complete execution. Once the pipeline execution ends,
438    /// consume the pipeline and return the output.
439    ///
440    /// ## Example
441    ///
442    /// ```
443    /// use rumtk_core::strings::RUMString;
444    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
445    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
446    ///
447    /// let ls_name = "ls";
448    /// let mut ls_command = RUMCommand::default();
449    /// ls_command.path = RUMString::from(ls_name);
450    ///
451    /// let wc_name = "wc";
452    /// let mut wc_command = RUMCommand::default();
453    /// wc_command.path = RUMString::from(wc_name);
454    ///
455    /// let commands = vec![
456    ///     ls_command,
457    ///     wc_command
458    /// ];
459    ///
460    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
461    /// let result = pipeline_wait_pipeline(pipeline).unwrap();
462    ///
463    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
464    /// ```
465    ///
466    pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
467        for p in pipeline.iter_mut() {
468            match p.wait() {
469                Ok(code) => {
470                    if !code.success() {
471                        return Err(rumtk_format!(
472                            "Process {} exited with non-success code => {}!",
473                            p.id(),
474                            code
475                        ));
476                    }
477                    break;
478                }
479                Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
480            };
481        }
482
483        let result = pipeline_get_stdout(pipeline)?;
484        Ok(result)
485    }
486}
487
488pub mod pipeline_macros {
489    ///
490    /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
491    /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
492    ///
493    /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
494    /// key value pair strings that we can use to update the process environment.
495    ///
496    /// ## Example
497    ///
498    /// ### Program Only
499    ///
500    /// ```
501    /// use rumtk_core::rumtk_pipeline_command;
502    ///
503    /// let command = rumtk_pipeline_command!("ls");
504    /// ```
505    ///
506    /// ### Program with Piped Data
507    ///
508    /// ```
509    /// use rumtk_core::rumtk_pipeline_command;
510    /// use rumtk_core::types::RUMBuffer;
511    /// use rumtk_core::strings::RUMStringConversions;
512    ///
513    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
514    /// ```
515    ///
516    /// ### Program with Args
517    ///
518    /// ```
519    /// use rumtk_core::rumtk_pipeline_command;
520    /// use rumtk_core::types::RUMBuffer;
521    /// use rumtk_core::strings::RUMStringConversions;
522    ///
523    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
524    ///     "-l".to_rumstring()
525    /// ]);
526    /// ```
527    ///
528    #[macro_export]
529    macro_rules! rumtk_pipeline_command {
530        ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
531            use $crate::pipelines::pipeline_types::RUMCommand;
532
533            RUMCommand::new($path, &Some($data), $args, $env)
534        }};
535        ( $path:expr, $data:expr, $args:expr ) => {{
536            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
537
538            RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
539        }};
540        ( $path:expr, $data:expr ) => {{
541            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
542
543            RUMCommand::new(
544                $path,
545                &Some($data),
546                &RUMCommandArgs::default(),
547                &RUMCommandEnv::default(),
548            )
549        }};
550        ( $path:expr ) => {{
551            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
552            use $crate::types::RUMBuffer;
553
554            RUMCommand::new(
555                $path,
556                &None,
557                &RUMCommandArgs::default(),
558                &RUMCommandEnv::default(),
559            )
560        }};
561    }
562
563    ///
564    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
565    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
566    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
567    ///
568    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
569    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
570    ///
571    /// ## Example
572    ///
573    /// ```
574    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
575    /// use rumtk_core::core::RUMResult;
576    /// use rumtk_core::strings::RUMStringConversions;
577    /// use rumtk_core::types::RUMBuffer;
578    ///
579    /// let f = || -> RUMResult<()> {
580    ///     let result = rumtk_pipeline_run!(
581    ///         rumtk_pipeline_command!("ls", RUMBuffer::default()),
582    ///         rumtk_pipeline_command!("wc")
583    ///     )?;
584    ///
585    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
586    ///     Ok(())
587    /// };
588    ///
589    /// f().unwrap();
590    /// ```
591    ///
592    #[macro_export]
593    macro_rules! rumtk_pipeline_run {
594        ( $($command:expr),+ ) => {{
595            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
596
597            let pipeline = pipeline_generate_pipeline(&vec![
598                $($command),+
599            ])?;
600
601            pipeline_wait_pipeline(pipeline)
602        }};
603    }
604
605    ///
606    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
607    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
608    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
609    ///
610    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
611    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
612    ///
613    /// This is the `async` flavor.
614    ///
615    /// ## Example
616    ///
617    /// ```
618    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
619    /// use rumtk_core::core::RUMResult;
620    /// use rumtk_core::strings::RUMStringConversions;
621    ///
622    /// let f = async || -> RUMResult<()> {
623    ///     let result = rumtk_pipeline_run_async!(
624    ///         rumtk_pipeline_command!("ls", RUMBuffer::default()),
625    ///         rumtk_pipeline_command!("wc")
626    ///     )?;
627    ///
628    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
629    ///     Ok(())
630    /// };
631    ///
632    /// let rt = rumtk_init_threads!(&5);
633    /// rumtk_resolve_task!(rt, f()).unwrap();
634    /// ```
635    ///
636    #[macro_export]
637    macro_rules! rumtk_pipeline_run_async {
638        ( $($command:expr),+ ) => {{
639            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
640
641            let pipeline = pipeline_generate_pipeline(&vec![
642                $($command),+
643            ])?;
644
645            pipeline_await_pipeline(pipeline).await
646        }};
647    }
648}