Skip to main content

rumtk_core/
pipelines.rs

1/*
2 *     rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 *     This toolkit aims to be reliable, simple, performant, and standards compliant.
4 *     Copyright (C) 2026  Luis M. Santos, M.D.
5 *
6 *     This program is free software: you can redistribute it and/or modify
7 *     it under the terms of the GNU General Public License as published by
8 *     the Free Software Foundation, either version 3 of the License, or
9 *     (at your option) any later version.
10 *
11 *     This program is distributed in the hope that it will be useful,
12 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *     GNU General Public License for more details.
15 *
16 *     You should have received a copy of the GNU General Public License
17 *     along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025  Luis M. Santos, M.D. <lsantos@medicalmasses.com>
24 * Copyright (C) 2025  MedicalMasses L.L.C. <contact@medicalmasses.com>
25 *
26 * This program is free software: you can redistribute it and/or modify
27 * it under the terms of the GNU General Public License as published by
28 * the Free Software Foundation, either version 3 of the License, or
29 * (at your option) any later version.
30 *
31 * This program is distributed in the hope that it will be useful,
32 * but WITHOUT ANY WARRANTY; without even the implied warranty of
33 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
34 * GNU General Public License for more details.
35 *
36 * You should have received a copy of the GNU General Public License
37 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
38 */
39
40pub mod pipeline_types {
41    use crate::strings::{RUMString, RUMStringConversions};
42    use crate::types::{RUMBuffer, RUMHashMap};
43
44    use crate::core::{RUMResult, RUMVec};
45    use std::process::{Child, Command};
46
47    pub type RUMCommandArgs = Vec<RUMString>;
48    pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
49    #[derive(Default, Debug, Clone)]
50    pub struct RUMCommand {
51        pub path: RUMString,
52        pub data: Option<RUMBuffer>,
53        pub args: RUMCommandArgs,
54        pub env: RUMCommandEnv,
55    }
56
57    impl RUMCommand {
58        pub fn new(
59            prog: &str,
60            data: &Option<RUMBuffer>,
61            args: &RUMCommandArgs,
62            env: &RUMCommandEnv,
63        ) -> Self {
64            RUMCommand {
65                path: prog.to_rumstring(),
66                args: args.clone(),
67                env: env.clone(),
68                data: data.clone(),
69            }
70        }
71    }
72
73    pub type RUMCommandLine = RUMVec<RUMCommand>;
74    pub type RUMPipelineCommand = Command;
75    pub type RUMPipelineProcess = Child;
76    pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
77    pub type RUMPipelineResult = RUMResult<RUMBuffer>;
78}
79
80pub mod pipeline_functions {
81    use super::pipeline_types::*;
82    use crate::core::RUMResult;
83    use crate::strings::rumtk_format;
84    use std::io::{Read, Write};
85
86    use crate::threading::threading_functions::async_sleep;
87    use crate::types::RUMBuffer;
88    use std::process::{Command, Stdio};
89
90    const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
91    const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
92
93    ///
94    /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
95    /// runtime can use to spawn a process.
96    ///
97    /// ## Example
98    ///
99    /// ```
100    /// use std::any::{Any, TypeId};
101    ///
102    /// use rumtk_core::strings::RUMString;
103    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
104    /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
105    ///
106    /// let command_name = "ls";
107    /// let mut command = RUMCommand::default();
108    /// command.path = RUMString::from(command_name);
109    ///
110    /// let sys_command = pipeline_generate_command(&command);
111    ///
112    /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
113    ///
114    /// ```
115    ///
116    pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
117        let mut cmd = Command::new(command.path.as_str());
118
119        for arg in command.args.iter() {
120            cmd.arg(arg);
121        }
122
123        cmd.envs(command.env.iter());
124
125        cmd.stdin(Stdio::piped())
126            .stdout(Stdio::piped())
127            .stderr(Stdio::piped());
128
129        cmd
130    }
131
132    ///
133    /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
134    ///
135    /// ## Example
136    ///
137    /// ```
138    /// use std::any::{Any, TypeId};
139    ///
140    /// use rumtk_core::strings::RUMString;
141    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
142    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
143    ///
144    /// let command_name = "ls";
145    /// let mut command = RUMCommand::default();
146    /// command.path = RUMString::from(command_name);
147    ///
148    /// let mut sys_command = pipeline_generate_command(&command);
149    ///
150    /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
151    ///
152    /// process.wait();
153    /// ```
154    ///
155    pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
156        match cmd.spawn() {
157            Ok(process) => Ok(process),
158            Err(e) => Err(rumtk_format!(
159                "Failed to spawn process {:?} because => {}",
160                cmd.get_program(),
161                e
162            )),
163        }
164    }
165
166    ///
167    /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
168    /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
169    ///
170    /// ## Example
171    ///
172    /// ```
173    /// use std::any::{Any, TypeId};
174    /// use std::process::Stdio;
175    ///
176    /// use rumtk_core::strings::RUMString;
177    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
178    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_processes, pipeline_spawn_process};
179    ///
180    /// let ls_name = "ls";
181    /// let mut ls_command = RUMCommand::default();
182    /// ls_command.path = RUMString::from(ls_name);
183    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
184    ///
185    /// let wc_name = "wc";
186    /// let mut wc_command = RUMCommand::default();
187    /// wc_command.path = RUMString::from(wc_name);
188    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
189    ///
190    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
191    /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
192    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
193    ///
194    /// sys_ls_process.wait();
195    /// sys_wc_process.wait();
196    /// ```
197    ///
198    pub fn pipeline_pipe_processes(
199        process: &mut RUMPipelineProcess,
200        piped: &mut RUMPipelineCommand,
201    ) -> RUMResult<()> {
202        let process_stdout = Stdio::from(match process.stdout.take() {
203            Some(stdout) => stdout,
204            None => {
205                return Err(rumtk_format!(
206                    "No stdout handle found for process {}.",
207                    process.id()
208                ));
209            }
210        });
211        let _ = piped.stdin(process_stdout);
212        Ok(())
213    }
214
215    ///
216    /// Retrieves the standard output generated by the completed process.
217    ///
218    /// ## Example
219    ///
220    /// ```
221    /// use std::any::{Any, TypeId};
222    /// use std::process::Stdio;
223    ///
224    /// use rumtk_core::strings::RUMString;
225    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
226    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process, pipeline_get_stdout, pipeline_pipe_processes};
227    ///
228    /// let ls_name = "ls";
229    /// let mut ls_command = RUMCommand::default();
230    /// ls_command.path = RUMString::from(ls_name);
231    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
232    ///
233    /// let wc_name = "wc";
234    /// let mut wc_command = RUMCommand::default();
235    /// wc_command.path = RUMString::from(wc_name);
236    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
237    ///
238    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
239    /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
240    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
241    ///
242    /// sys_ls_process.wait();
243    /// sys_wc_process.wait();
244    ///
245    /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
246    ///
247    /// let out_data = pipeline_get_stdout(pipeline).unwrap();
248    ///
249    /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
250    /// ```
251    ///
252    pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
253        let mut last_item = pipeline.pop().unwrap();
254        match last_item.wait_with_output() {
255            Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
256            Err(e) => Err(rumtk_format!(
257                "Issue reading last process output because => {}",
258                e
259            )),
260        }
261    }
262
263    ///
264    /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
265    ///
266    /// ## Example
267    ///
268    /// ```
269    /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
270    /// use rumtk_core::strings::RUMString;
271    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
272    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
273    /// use rumtk_core::types::RUMBuffer;
274    ///
275    /// let ls_name = "ls";
276    /// let mut ls_command = RUMCommand::default();
277    /// ls_command.path = RUMString::from(ls_name);
278    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
279    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
280    ///
281    /// pipeline_close_process_stdin(&mut sys_ls_process);
282    ///
283    ///
284    /// ```
285    ///
286    pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
287        // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
288        match process.stdin.take() {
289            Some(stdin) => {
290                drop(stdin);
291            }
292            None => {}
293        };
294    }
295
296    ///
297    /// Pipe data into a process.
298    ///
299    /// ## Example
300    ///
301    /// ```
302    /// use rumtk_core::strings::RUMString;
303    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
304    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
305    /// use rumtk_core::types::RUMBuffer;
306    ///
307    /// let ls_name = "ls";
308    /// let mut ls_command = RUMCommand::default();
309    /// ls_command.path = RUMString::from(ls_name);
310    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
311    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
312    /// pipeline_pipe_into_process(&mut sys_ls_process, &Some(RUMBuffer::default())).unwrap();
313    ///
314    /// let out = sys_ls_process.wait_with_output().unwrap();
315    ///
316    /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
317    /// ```
318    ///
319    pub fn pipeline_pipe_into_process(
320        process: &mut RUMPipelineProcess,
321        data: &Option<RUMBuffer>,
322    ) -> RUMResult<()> {
323        match data {
324            Some(data) => match process.stdin {
325                Some(ref mut stdin) => match stdin.write_all(&data) {
326                    Ok(_) => {}
327                    Err(e) => {
328                        return Err(rumtk_format!(
329                            "Failed to pipe data to stdin of process because => {}",
330                            e
331                        ))
332                    }
333                },
334                None => {}
335            },
336            None => {}
337        }
338        Ok(())
339    }
340
341    ///
342    /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
343    ///
344    /// ## Example
345    ///
346    /// ```
347    /// use rumtk_core::strings::RUMString;
348    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
349    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
350    ///
351    /// let ls_name = "ls";
352    /// let mut ls_command = RUMCommand::default();
353    /// ls_command.path = RUMString::from(ls_name);
354    ///
355    /// let wc_name = "wc";
356    /// let mut wc_command = RUMCommand::default();
357    /// wc_command.path = RUMString::from(wc_name);
358    ///
359    /// let commands = vec![
360    ///     ls_command,
361    ///     wc_command
362    /// ];
363    ///
364    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
365    ///
366    /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
367    /// ```
368    ///
369    pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
370        let first_command = commands.first().unwrap();
371
372        // Setup pipeline
373        let mut pipeline = vec![];
374
375        //Bootstrap first process in chain
376        let mut root = pipeline_generate_command(&first_command);
377        let mut parent_process = pipeline_spawn_process(&mut root)?;
378        pipeline_pipe_into_process(&mut parent_process, &mut first_command.data.clone())?;
379        pipeline.push(parent_process);
380
381        for cmd in commands.iter().skip(1) {
382            let mut new_root = pipeline_generate_command(cmd);
383            pipeline_pipe_processes(pipeline.last_mut().unwrap(), &mut new_root)?;
384            parent_process = pipeline_spawn_process(&mut new_root)?;
385            pipeline.push(parent_process);
386        }
387
388        Ok(pipeline)
389    }
390
391    ///
392    /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
393    /// consume the pipeline and return the output.
394    ///
395    /// ## Example
396    ///
397    /// ```
398    /// use rumtk_core::strings::RUMString;
399    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
400    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
401    /// use rumtk_core::{rumtk_resolve_task};
402    ///
403    /// let ls_name = "ls";
404    /// let mut ls_command = RUMCommand::default();
405    /// ls_command.path = RUMString::from(ls_name);
406    ///
407    /// let wc_name = "wc";
408    /// let mut wc_command = RUMCommand::default();
409    /// wc_command.path = RUMString::from(wc_name);
410    ///
411    /// let commands = vec![
412    ///     ls_command,
413    ///     wc_command
414    /// ];
415    ///
416    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
417    /// let result = rumtk_resolve_task!(pipeline_await_pipeline(pipeline)).unwrap();
418    ///
419    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
420    /// ```
421    ///
422    pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
423        // Let's make sure the stdin is closed on the first process to make sure it exits instead of
424        // remain waiting for EOF in the stdin stream.
425        pipeline_close_process_stdin(pipeline.first_mut().unwrap());
426
427        // Now let's visit each process and await their completion!
428        for p in pipeline.iter_mut() {
429            loop {
430                match p.try_wait() {
431                    Ok(code) => match code {
432                        Some(code) => {
433                            if !code.success() {
434                                return Err(rumtk_format!(
435                                    "Process {} exited with non-success code => {}!",
436                                    p.id(),
437                                    code
438                                ));
439                            }
440                            break;
441                        }
442                        None => {
443                            async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
444                            continue;
445                        }
446                    },
447                    Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
448                };
449            }
450        }
451
452        let result = pipeline_get_stdout(pipeline)?;
453        Ok(result)
454    }
455
456    ///
457    /// Await for pipeline to complete execution. Once the pipeline execution ends,
458    /// consume the pipeline and return the output.
459    ///
460    /// ## Example
461    ///
462    /// ```
463    /// use rumtk_core::strings::RUMString;
464    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
465    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
466    ///
467    /// let ls_name = "ls";
468    /// let mut ls_command = RUMCommand::default();
469    /// ls_command.path = RUMString::from(ls_name);
470    ///
471    /// let wc_name = "wc";
472    /// let mut wc_command = RUMCommand::default();
473    /// wc_command.path = RUMString::from(wc_name);
474    ///
475    /// let commands = vec![
476    ///     ls_command,
477    ///     wc_command
478    /// ];
479    ///
480    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
481    /// let result = pipeline_wait_pipeline(pipeline).unwrap();
482    ///
483    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
484    /// ```
485    ///
486    pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
487        // Let's make sure the stdin is closed on the first process to make sure it exits instead of
488        // remain waiting for EOF in the stdin stream.
489        pipeline_close_process_stdin(pipeline.first_mut().unwrap());
490
491        // Now let's visit each process and await their completion!
492        for p in pipeline.iter_mut() {
493            match p.wait() {
494                Ok(code) => {
495                    if !code.success() {
496                        return Err(rumtk_format!(
497                            "Process {} exited with non-success code => {}!",
498                            p.id(),
499                            code
500                        ));
501                    }
502                    continue;
503                }
504                Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
505            };
506        }
507
508        let result = pipeline_get_stdout(pipeline)?;
509        Ok(result)
510    }
511}
512
513pub mod pipeline_macros {
514    ///
515    /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
516    /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
517    ///
518    /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
519    /// key value pair strings that we can use to update the process environment.
520    ///
521    /// ## Example
522    ///
523    /// ### Program Only
524    ///
525    /// ```
526    /// use rumtk_core::rumtk_pipeline_command;
527    ///
528    /// let command = rumtk_pipeline_command!("ls");
529    /// ```
530    ///
531    /// ### Program with Piped Data
532    ///
533    /// ```
534    /// use rumtk_core::rumtk_pipeline_command;
535    /// use rumtk_core::types::RUMBuffer;
536    /// use rumtk_core::strings::RUMStringConversions;
537    ///
538    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
539    /// ```
540    ///
541    /// ### Program with Args
542    ///
543    /// ```
544    /// use rumtk_core::rumtk_pipeline_command;
545    /// use rumtk_core::types::RUMBuffer;
546    /// use rumtk_core::strings::RUMStringConversions;
547    ///
548    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
549    ///     "-l".to_rumstring()
550    /// ]);
551    /// ```
552    ///
553    #[macro_export]
554    macro_rules! rumtk_pipeline_command {
555        ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
556            use $crate::pipelines::pipeline_types::RUMCommand;
557
558            RUMCommand::new($path, &Some($data), $args, $env)
559        }};
560        ( $path:expr, $data:expr, $args:expr ) => {{
561            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
562
563            RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
564        }};
565        ( $path:expr, $data:expr ) => {{
566            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
567
568            RUMCommand::new(
569                $path,
570                &Some($data),
571                &RUMCommandArgs::default(),
572                &RUMCommandEnv::default(),
573            )
574        }};
575        ( $path:expr ) => {{
576            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
577            use $crate::types::RUMBuffer;
578
579            RUMCommand::new(
580                $path,
581                &None,
582                &RUMCommandArgs::default(),
583                &RUMCommandEnv::default(),
584            )
585        }};
586    }
587
588    ///
589    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
590    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
591    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
592    ///
593    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
594    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
595    ///
596    /// ## Example
597    ///
598    /// ### Simple
599    ///
600    /// ```
601    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
602    /// use rumtk_core::core::{RUMResult};
603    /// use rumtk_core::strings::RUMStringConversions;
604    /// use rumtk_core::types::RUMBuffer;
605    ///
606    /// let f = async || -> RUMResult<()> {
607    ///     let result = rumtk_pipeline_run!(
608    ///         rumtk_pipeline_command!("ls"),
609    ///         rumtk_pipeline_command!("wc")
610    ///     ).unwrap();
611    ///
612    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
613    ///     Ok(())
614    /// };
615    ///
616    /// rumtk_resolve_task!(f()).unwrap();
617    /// ```
618    ///
619    /// ### With Buffer Piped In
620    ///
621    /// ```
622    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
623    /// use rumtk_core::core::{RUMResult, new_random_buffer};
624    /// use rumtk_core::strings::RUMStringConversions;
625    /// use rumtk_core::types::RUMBuffer;
626    ///
627    /// let f = || -> RUMResult<()> {
628    ///     let result = rumtk_pipeline_run!(
629    ///         rumtk_pipeline_command!("ls", new_random_buffer()),
630    ///         rumtk_pipeline_command!("wc")
631    ///     )?;
632    ///
633    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
634    ///     Ok(())
635    /// };
636    ///
637    /// f().unwrap();
638    /// ```
639    ///
640    #[macro_export]
641    macro_rules! rumtk_pipeline_run {
642        ( $($command:expr),+ ) => {{
643            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
644
645            let pipeline = pipeline_generate_pipeline(&vec![
646                $($command),+
647            ])?;
648
649            pipeline_wait_pipeline(pipeline)
650        }};
651    }
652
653    ///
654    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
655    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
656    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
657    ///
658    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
659    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
660    ///
661    /// This is the `async` flavor.
662    ///
663    /// ## Example
664    ///
665    /// ### Simple
666    ///
667    /// ```
668    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
669    /// use rumtk_core::core::{RUMResult};
670    /// use rumtk_core::strings::RUMStringConversions;
671    /// use rumtk_core::types::RUMBuffer;
672    ///
673    /// let f = async || -> RUMResult<()> {
674    ///     let result = rumtk_pipeline_run_async!(
675    ///         rumtk_pipeline_command!("ls"),
676    ///         rumtk_pipeline_command!("wc")
677    ///     ).await?;
678    ///
679    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
680    ///     Ok(())
681    /// };
682    ///
683    /// rumtk_resolve_task!(f()).unwrap();
684    /// ```
685    ///
686    /// ### With Buffer Piped In
687    ///
688    /// ```
689    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
690    /// use rumtk_core::core::{RUMResult, new_random_buffer};
691    /// use rumtk_core::strings::RUMStringConversions;
692    /// use rumtk_core::types::RUMBuffer;
693    ///
694    /// let f = async || -> RUMResult<()> {
695    ///     let result = rumtk_pipeline_run_async!(
696    ///         rumtk_pipeline_command!("wc", new_random_buffer())
697    ///     ).await?;
698    ///
699    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
700    ///     Ok(())
701    /// };
702    ///
703    /// rumtk_resolve_task!(f()).unwrap();
704    /// ```
705    ///
706    /// ### With Buffer Piped In W/ Return
707    ///
708    /// ```
709    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
710    /// use rumtk_core::core::{RUMResult, new_random_buffer};
711    /// use rumtk_core::strings::{RUMString, RUMStringConversions, RUMArrayConversions};
712    /// use rumtk_core::types::RUMBuffer;
713    ///
714    /// let expected = "1024\n";
715    ///
716    /// let f = async || -> RUMResult<RUMBuffer> {
717    ///     let result = rumtk_pipeline_run_async!(
718    ///         rumtk_pipeline_command!("wc", new_random_buffer())
719    ///     ).await?;
720    ///
721    ///     Ok(result)
722    /// };
723    ///
724    /// let result = rumtk_resolve_task!(f()).unwrap();
725    /// let string = result.to_vec().to_rumstring();
726    /// let result_buffer_size = string.split("      ").last().unwrap().split("    ").last().unwrap().to_rumstring();
727    ///
728    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
729    /// assert_eq!(&result_buffer_size, expected, "Pipeline returned an unexpected result from command wc! => {:?}\nvs.\n{:?}", &result_buffer_size, &expected);
730    /// ```
731    ///
732    #[macro_export]
733    macro_rules! rumtk_pipeline_run_async {
734        ( $($command:expr),+ ) => {{
735            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
736
737            let pipeline = pipeline_generate_pipeline(&vec![
738                $($command),+
739            ])?;
740
741            pipeline_await_pipeline(pipeline)
742        }};
743    }
744}