Skip to main content

rumtk_core/
pipelines.rs

1/*
2 *     rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 *     This toolkit aims to be reliable, simple, performant, and standards compliant.
4 *     Copyright (C) 2026  Luis M. Santos, M.D.
5 *
6 *     This program is free software: you can redistribute it and/or modify
7 *     it under the terms of the GNU General Public License as published by
8 *     the Free Software Foundation, either version 3 of the License, or
9 *     (at your option) any later version.
10 *
11 *     This program is distributed in the hope that it will be useful,
12 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *     GNU General Public License for more details.
15 *
16 *     You should have received a copy of the GNU General Public License
17 *     along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025  Luis M. Santos, M.D.
24 * Copyright (C) 2025  Nick Stephenson
25 * Copyright (C) 2025  Ethan Dixon
26 * Copyright (C) 2025  MedicalMasses L.L.C.
27 *
28 * This library is free software; you can redistribute it and/or
29 * modify it under the terms of the GNU Lesser General Public
30 * License as published by the Free Software Foundation; either
31 * version 2.1 of the License, or (at your option) any later version.
32 *
33 * This library is distributed in the hope that it will be useful,
34 * but WITHOUT ANY WARRANTY; without even the implied warranty of
35 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
36 * Lesser General Public License for more details.
37 *
38 * You should have received a copy of the GNU Lesser General Public
39 * License along with this library; if not, write to the Free Software
40 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
41 */
42
43pub mod pipeline_types {
44    use crate::strings::{RUMString, RUMStringConversions};
45    use crate::types::{RUMBuffer, RUMHashMap};
46
47    use crate::core::{RUMResult, RUMVec};
48    use std::process::{Child, Command};
49
50    pub type RUMCommandArgs = Vec<RUMString>;
51    pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
52    #[derive(Default, Debug, Clone)]
53    pub struct RUMCommand {
54        pub path: RUMString,
55        pub data: Option<RUMBuffer>,
56        pub args: RUMCommandArgs,
57        pub env: RUMCommandEnv,
58    }
59
60    impl RUMCommand {
61        pub fn new(
62            prog: &str,
63            data: &Option<RUMBuffer>,
64            args: &RUMCommandArgs,
65            env: &RUMCommandEnv,
66        ) -> Self {
67            RUMCommand {
68                path: prog.to_rumstring(),
69                args: args.clone(),
70                env: env.clone(),
71                data: data.clone(),
72            }
73        }
74    }
75
76    pub type RUMCommandLine = RUMVec<RUMCommand>;
77    pub type RUMPipelineCommand = Command;
78    pub type RUMPipelineProcess = Child;
79    pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
80    pub type RUMPipelineResult = RUMResult<RUMBuffer>;
81}
82
83pub mod pipeline_functions {
84    use super::pipeline_types::*;
85    use crate::core::RUMResult;
86    use crate::strings::rumtk_format;
87    use std::io::{Read, Write};
88
89    use crate::threading::threading_functions::async_sleep;
90    use crate::types::RUMBuffer;
91    use std::process::{Command, Stdio};
92
93    const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
94    const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
95
96    ///
97    /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
98    /// runtime can use to spawn a process.
99    ///
100    /// ## Example
101    ///
102    /// ```
103    /// use std::any::{Any, TypeId};
104    ///
105    /// use rumtk_core::strings::RUMString;
106    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
107    /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
108    ///
109    /// let command_name = "ls";
110    /// let mut command = RUMCommand::default();
111    /// command.path = RUMString::from(command_name);
112    ///
113    /// let sys_command = pipeline_generate_command(&command);
114    ///
115    /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
116    ///
117    /// ```
118    ///
119    pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
120        let mut cmd = Command::new(command.path.as_str());
121
122        for arg in command.args.iter() {
123            cmd.arg(arg);
124        }
125
126        cmd.envs(command.env.iter());
127
128        cmd.stdin(Stdio::piped())
129            .stdout(Stdio::piped())
130            .stderr(Stdio::piped());
131
132        cmd
133    }
134
135    ///
136    /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
137    ///
138    /// ## Example
139    ///
140    /// ```
141    /// use std::any::{Any, TypeId};
142    ///
143    /// use rumtk_core::strings::RUMString;
144    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
145    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
146    ///
147    /// let command_name = "ls";
148    /// let mut command = RUMCommand::default();
149    /// command.path = RUMString::from(command_name);
150    ///
151    /// let mut sys_command = pipeline_generate_command(&command);
152    ///
153    /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
154    ///
155    /// process.wait();
156    /// ```
157    ///
158    pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
159        match cmd.spawn() {
160            Ok(process) => Ok(process),
161            Err(e) => Err(rumtk_format!(
162                "Failed to spawn process {:?} because => {}",
163                cmd.get_program(),
164                e
165            )),
166        }
167    }
168
169    ///
170    /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
171    /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
172    ///
173    /// ## Example
174    ///
175    /// ```
176    /// use std::any::{Any, TypeId};
177    /// use std::process::Stdio;
178    ///
179    /// use rumtk_core::strings::RUMString;
180    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
181    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process};
182    ///
183    /// let ls_name = "ls";
184    /// let mut ls_command = RUMCommand::default();
185    /// ls_command.path = RUMString::from(ls_name);
186    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
187    ///
188    /// let wc_name = "wc";
189    /// let mut wc_command = RUMCommand::default();
190    /// wc_command.path = RUMString::from(wc_name);
191    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
192    ///
193    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
194    /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
195    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
196    ///
197    /// sys_ls_process.wait();
198    /// sys_wc_process.wait();
199    /// ```
200    ///
201    pub fn pipeline_pipe_process(
202        process: &mut RUMPipelineProcess,
203        piped: &mut RUMPipelineCommand,
204    ) -> RUMResult<()> {
205        let process_stdout = Stdio::from(match process.stdout.take() {
206            Some(stdout) => stdout,
207            None => {
208                return Err(rumtk_format!(
209                    "No stdout handle found for process {}.",
210                    process.id()
211                ));
212            }
213        });
214        let _ = piped.stdin(process_stdout);
215        Ok(())
216    }
217
218    ///
219    /// Retrieves the standard output generated by the completed process.
220    ///
221    /// ## Example
222    ///
223    /// ```
224    /// use std::any::{Any, TypeId};
225    /// use std::process::Stdio;
226    ///
227    /// use rumtk_core::strings::RUMString;
228    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
229    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process, pipeline_get_stdout};
230    ///
231    /// let ls_name = "ls";
232    /// let mut ls_command = RUMCommand::default();
233    /// ls_command.path = RUMString::from(ls_name);
234    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
235    ///
236    /// let wc_name = "wc";
237    /// let mut wc_command = RUMCommand::default();
238    /// wc_command.path = RUMString::from(wc_name);
239    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
240    ///
241    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
242    /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
243    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
244    ///
245    /// sys_ls_process.wait();
246    /// sys_wc_process.wait();
247    ///
248    /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
249    ///
250    /// let out_data = pipeline_get_stdout(pipeline).unwrap();
251    ///
252    /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
253    /// ```
254    ///
255    pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
256        let mut last_item = pipeline.pop().unwrap();
257        match last_item.wait_with_output() {
258            Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
259            Err(e) => Err(rumtk_format!(
260                "Issue reading last process output because => {}",
261                e
262            )),
263        }
264    }
265
266    ///
267    /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
268    ///
269    /// ## Example
270    ///
271    /// ```
272    /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
273    /// use rumtk_core::strings::RUMString;
274    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
275    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
276    /// use rumtk_core::types::RUMBuffer;
277    ///
278    /// let ls_name = "ls";
279    /// let mut ls_command = RUMCommand::default();
280    /// ls_command.path = RUMString::from(ls_name);
281    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
282    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
283    ///
284    /// pipeline_close_process_stdin(&mut sys_ls_process);
285    ///
286    ///
287    /// ```
288    ///
289    pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
290        // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
291        match process.stdin.take() {
292            Some(stdin) => {
293                drop(stdin);
294            }
295            None => {}
296        };
297    }
298
299    ///
300    /// Pipe data into a process.
301    ///
302    /// ## Example
303    ///
304    /// ```
305    /// use rumtk_core::strings::RUMString;
306    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
307    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
308    /// use rumtk_core::types::RUMBuffer;
309    ///
310    /// let ls_name = "ls";
311    /// let mut ls_command = RUMCommand::default();
312    /// ls_command.path = RUMString::from(ls_name);
313    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
314    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
315    /// pipeline_pipe_into_process(&mut sys_ls_process, &Some(RUMBuffer::default())).unwrap();
316    ///
317    /// let out = sys_ls_process.wait_with_output().unwrap();
318    ///
319    /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
320    /// ```
321    ///
322    pub fn pipeline_pipe_into_process(
323        process: &mut RUMPipelineProcess,
324        data: &Option<RUMBuffer>,
325    ) -> RUMResult<()> {
326        match data {
327            Some(data) => match process.stdin {
328                Some(ref mut stdin) => match stdin.write_all(&data) {
329                    Ok(_) => {}
330                    Err(e) => {
331                        return Err(rumtk_format!(
332                            "Failed to pipe data to stdin of process because => {}",
333                            e
334                        ))
335                    }
336                },
337                None => {}
338            },
339            None => {}
340        }
341        Ok(())
342    }
343
344    ///
345    /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
346    ///
347    /// ## Example
348    ///
349    /// ```
350    /// use rumtk_core::strings::RUMString;
351    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
352    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
353    ///
354    /// let ls_name = "ls";
355    /// let mut ls_command = RUMCommand::default();
356    /// ls_command.path = RUMString::from(ls_name);
357    ///
358    /// let wc_name = "wc";
359    /// let mut wc_command = RUMCommand::default();
360    /// wc_command.path = RUMString::from(wc_name);
361    ///
362    /// let commands = vec![
363    ///     ls_command,
364    ///     wc_command
365    /// ];
366    ///
367    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
368    ///
369    /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
370    /// ```
371    ///
372    pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
373        let first_command = commands.first().unwrap();
374
375        // Setup pipeline
376        let mut pipeline = vec![];
377
378        //Bootstrap first process in chain
379        let mut root = pipeline_generate_command(&first_command);
380        let mut parent_process = pipeline_spawn_process(&mut root)?;
381        pipeline_pipe_into_process(&mut parent_process, &mut first_command.data.clone())?;
382        pipeline.push(parent_process);
383
384        for cmd in commands.iter().skip(1) {
385            let mut new_root = pipeline_generate_command(cmd);
386            pipeline_pipe_process(pipeline.last_mut().unwrap(), &mut new_root)?;
387            parent_process = pipeline_spawn_process(&mut new_root)?;
388            pipeline.push(parent_process);
389        }
390
391        Ok(pipeline)
392    }
393
394    ///
395    /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
396    /// consume the pipeline and return the output.
397    ///
398    /// ## Example
399    ///
400    /// ```
401    /// use rumtk_core::strings::RUMString;
402    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
403    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
404    /// use rumtk_core::{rumtk_resolve_task, rumtk_init_threads};
405    ///
406    /// let ls_name = "ls";
407    /// let mut ls_command = RUMCommand::default();
408    /// ls_command.path = RUMString::from(ls_name);
409    ///
410    /// let wc_name = "wc";
411    /// let mut wc_command = RUMCommand::default();
412    /// wc_command.path = RUMString::from(wc_name);
413    ///
414    /// let commands = vec![
415    ///     ls_command,
416    ///     wc_command
417    /// ];
418    ///
419    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
420    /// let rt = rumtk_init_threads!(&5);
421    /// let result = rumtk_resolve_task!(rt, pipeline_await_pipeline(pipeline)).unwrap();
422    ///
423    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
424    /// ```
425    ///
426    pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
427        // Let's make sure the stdin is closed on the first process to make sure it exits instead of
428        // remain waiting for EOF in the stdin stream.
429        pipeline_close_process_stdin(pipeline.first_mut().unwrap());
430
431        // Now let's visit each process and await their completion!
432        for p in pipeline.iter_mut() {
433            loop {
434                match p.try_wait() {
435                    Ok(code) => match code {
436                        Some(code) => {
437                            if !code.success() {
438                                return Err(rumtk_format!(
439                                    "Process {} exited with non-success code => {}!",
440                                    p.id(),
441                                    code
442                                ));
443                            }
444                            break;
445                        }
446                        None => {
447                            async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
448                            continue;
449                        }
450                    },
451                    Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
452                };
453            }
454        }
455
456        let result = pipeline_get_stdout(pipeline)?;
457        Ok(result)
458    }
459
460    ///
461    /// Await for pipeline to complete execution. Once the pipeline execution ends,
462    /// consume the pipeline and return the output.
463    ///
464    /// ## Example
465    ///
466    /// ```
467    /// use rumtk_core::strings::RUMString;
468    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
469    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
470    ///
471    /// let ls_name = "ls";
472    /// let mut ls_command = RUMCommand::default();
473    /// ls_command.path = RUMString::from(ls_name);
474    ///
475    /// let wc_name = "wc";
476    /// let mut wc_command = RUMCommand::default();
477    /// wc_command.path = RUMString::from(wc_name);
478    ///
479    /// let commands = vec![
480    ///     ls_command,
481    ///     wc_command
482    /// ];
483    ///
484    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
485    /// let result = pipeline_wait_pipeline(pipeline).unwrap();
486    ///
487    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
488    /// ```
489    ///
490    pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
491        // Let's make sure the stdin is closed on the first process to make sure it exits instead of
492        // remain waiting for EOF in the stdin stream.
493        pipeline_close_process_stdin(pipeline.first_mut().unwrap());
494
495        // Now let's visit each process and await their completion!
496        for p in pipeline.iter_mut() {
497            match p.wait() {
498                Ok(code) => {
499                    if !code.success() {
500                        return Err(rumtk_format!(
501                            "Process {} exited with non-success code => {}!",
502                            p.id(),
503                            code
504                        ));
505                    }
506                    break;
507                }
508                Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
509            };
510        }
511
512        let result = pipeline_get_stdout(pipeline)?;
513        Ok(result)
514    }
515}
516
517pub mod pipeline_macros {
518    ///
519    /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
520    /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
521    ///
522    /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
523    /// key value pair strings that we can use to update the process environment.
524    ///
525    /// ## Example
526    ///
527    /// ### Program Only
528    ///
529    /// ```
530    /// use rumtk_core::rumtk_pipeline_command;
531    ///
532    /// let command = rumtk_pipeline_command!("ls");
533    /// ```
534    ///
535    /// ### Program with Piped Data
536    ///
537    /// ```
538    /// use rumtk_core::rumtk_pipeline_command;
539    /// use rumtk_core::types::RUMBuffer;
540    /// use rumtk_core::strings::RUMStringConversions;
541    ///
542    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
543    /// ```
544    ///
545    /// ### Program with Args
546    ///
547    /// ```
548    /// use rumtk_core::rumtk_pipeline_command;
549    /// use rumtk_core::types::RUMBuffer;
550    /// use rumtk_core::strings::RUMStringConversions;
551    ///
552    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
553    ///     "-l".to_rumstring()
554    /// ]);
555    /// ```
556    ///
557    #[macro_export]
558    macro_rules! rumtk_pipeline_command {
559        ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
560            use $crate::pipelines::pipeline_types::RUMCommand;
561
562            RUMCommand::new($path, &Some($data), $args, $env)
563        }};
564        ( $path:expr, $data:expr, $args:expr ) => {{
565            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
566
567            RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
568        }};
569        ( $path:expr, $data:expr ) => {{
570            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
571
572            RUMCommand::new(
573                $path,
574                &Some($data),
575                &RUMCommandArgs::default(),
576                &RUMCommandEnv::default(),
577            )
578        }};
579        ( $path:expr ) => {{
580            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
581            use $crate::types::RUMBuffer;
582
583            RUMCommand::new(
584                $path,
585                &None,
586                &RUMCommandArgs::default(),
587                &RUMCommandEnv::default(),
588            )
589        }};
590    }
591
592    ///
593    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
594    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
595    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
596    ///
597    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
598    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
599    ///
600    /// ## Example
601    ///
602    /// ### Simple
603    ///
604    /// ```
605    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
606    /// use rumtk_core::core::{RUMResult};
607    /// use rumtk_core::strings::RUMStringConversions;
608    /// use rumtk_core::types::RUMBuffer;
609    ///
610    /// let f = async || -> RUMResult<()> {
611    ///     let result = rumtk_pipeline_run!(
612    ///         rumtk_pipeline_command!("ls"),
613    ///         rumtk_pipeline_command!("wc")
614    ///     ).unwrap();
615    ///
616    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
617    ///     Ok(())
618    /// };
619    ///
620    /// let rt = rumtk_init_threads!(&5);
621    /// rumtk_resolve_task!(rt, f()).unwrap();
622    /// ```
623    ///
624    /// ### With Buffer Piped In
625    ///
626    /// ```
627    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
628    /// use rumtk_core::core::{RUMResult, new_random_buffer};
629    /// use rumtk_core::strings::RUMStringConversions;
630    /// use rumtk_core::types::RUMBuffer;
631    ///
632    /// let f = || -> RUMResult<()> {
633    ///     let result = rumtk_pipeline_run!(
634    ///         rumtk_pipeline_command!("ls", new_random_buffer()),
635    ///         rumtk_pipeline_command!("wc")
636    ///     )?;
637    ///
638    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
639    ///     Ok(())
640    /// };
641    ///
642    /// f().unwrap();
643    /// ```
644    ///
645    #[macro_export]
646    macro_rules! rumtk_pipeline_run {
647        ( $($command:expr),+ ) => {{
648            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
649
650            let pipeline = pipeline_generate_pipeline(&vec![
651                $($command),+
652            ])?;
653
654            pipeline_wait_pipeline(pipeline)
655        }};
656    }
657
658    ///
659    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
660    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
661    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
662    ///
663    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
664    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
665    ///
666    /// This is the `async` flavor.
667    ///
668    /// ## Example
669    ///
670    /// ### Simple
671    ///
672    /// ```
673    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
674    /// use rumtk_core::core::{RUMResult};
675    /// use rumtk_core::strings::RUMStringConversions;
676    /// use rumtk_core::types::RUMBuffer;
677    ///
678    /// let f = async || -> RUMResult<()> {
679    ///     let result = rumtk_pipeline_run_async!(
680    ///         rumtk_pipeline_command!("ls"),
681    ///         rumtk_pipeline_command!("wc")
682    ///     ).await?;
683    ///
684    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
685    ///     Ok(())
686    /// };
687    ///
688    /// let rt = rumtk_init_threads!(&5);
689    /// rumtk_resolve_task!(rt, f()).unwrap();
690    /// ```
691    ///
692    /// ### With Buffer Piped In
693    ///
694    /// ```
695    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
696    /// use rumtk_core::core::{RUMResult, new_random_buffer};
697    /// use rumtk_core::strings::RUMStringConversions;
698    /// use rumtk_core::types::RUMBuffer;
699    ///
700    /// let f = async || -> RUMResult<()> {
701    ///     let result = rumtk_pipeline_run_async!(
702    ///         rumtk_pipeline_command!("ls", new_random_buffer()),
703    ///         rumtk_pipeline_command!("wc")
704    ///     ).await?;
705    ///
706    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
707    ///     Ok(())
708    /// };
709    ///
710    /// let rt = rumtk_init_threads!(&5);
711    /// rumtk_resolve_task!(rt, f()).unwrap();
712    /// ```
713    ///
714    #[macro_export]
715    macro_rules! rumtk_pipeline_run_async {
716        ( $($command:expr),+ ) => {{
717            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
718
719            let pipeline = pipeline_generate_pipeline(&vec![
720                $($command),+
721            ])?;
722
723            pipeline_await_pipeline(pipeline)
724        }};
725    }
726}