Skip to main content

rumtk_core/
pipelines.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025  MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19 */
20
21pub mod pipeline_types {
22    use crate::core::{RUMResult, RUMVec};
23    use crate::strings::{RUMString, RUMStringConversions};
24    use crate::types::{RUMBuffer, RUMDeserialize, RUMHashMap, RUMSerialize};
25    use std::process::{Child, Command};
26
27    pub static EMPTY_COMMAND_LINE: RUMCommandLine = RUMCommandLine::new();
28
29    pub type RUMCommandArgs = Vec<RUMString>;
30    pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
31    #[derive(RUMSerialize, RUMDeserialize, PartialEq, Default, Debug, Clone)]
32    pub struct RUMCommand {
33        pub path: RUMString,
34        #[serde(skip)]
35        pub data: Option<RUMBuffer>,
36        pub args: RUMCommandArgs,
37        pub env: RUMCommandEnv,
38    }
39
40    impl RUMCommand {
41        pub fn new(
42            prog: &str,
43            data: &Option<RUMBuffer>,
44            args: &RUMCommandArgs,
45            env: &RUMCommandEnv,
46        ) -> Self {
47            RUMCommand {
48                path: prog.to_string(),
49                args: args.clone(),
50                env: env.clone(),
51                data: data.clone(),
52            }
53        }
54    }
55
56    pub type RUMCommandLine = RUMVec<RUMCommand>;
57    pub type RUMPipelineCommand = Command;
58    pub type RUMPipelineProcess = Child;
59    pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
60    pub type RUMPipelineResult = RUMResult<RUMBuffer>;
61}
62
63pub mod pipeline_functions {
64    use super::pipeline_types::*;
65    use crate::core::RUMResult;
66    use crate::strings::{rumtk_format, string_format, RUMArrayConversions, RUMString, StringReplacementPair};
67    use std::io::{Read, Write};
68    use std::os::unix::ffi::OsStrExt;
69
70    use crate::rumtk_resolve_sync_task;
71    use crate::types::RUMBuffer;
72    use std::process::{Command, Stdio};
73
74    const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
75    const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
76
77    ///
78    /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
79    /// runtime can use to spawn a process.
80    ///
81    /// ## Example
82    ///
83    /// ```
84    /// use std::any::{Any, TypeId};
85    ///
86    /// use rumtk_core::strings::RUMString;
87    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
88    /// use rumtk_core::pipelines::pipeline_functions::pipeline_create_command;
89    ///
90    /// let command_name = "ls";
91    /// let mut command = RUMCommand::default();
92    /// command.path = RUMString::from(command_name);
93    ///
94    /// let sys_command = pipeline_create_command(&command);
95    ///
96    /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
97    ///
98    /// ```
99    ///
100    pub fn pipeline_create_command(command: &RUMCommand) -> RUMPipelineCommand {
101        let mut cmd = Command::new(command.path.as_str());
102
103        for arg in command.args.iter() {
104            cmd.arg(arg);
105        }
106
107        cmd.envs(command.env.iter());
108
109        cmd.stdin(Stdio::piped())
110            .stdout(Stdio::piped())
111            .stderr(Stdio::piped());
112
113        cmd
114    }
115
116    ///
117    /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
118    ///
119    /// ## Example
120    ///
121    /// ```
122    /// use std::any::{Any, TypeId};
123    ///
124    /// use rumtk_core::strings::RUMString;
125    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
126    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_spawn_process};
127    ///
128    /// let command_name = "ls";
129    /// let mut command = RUMCommand::default();
130    /// command.path = RUMString::from(command_name);
131    ///
132    /// let mut sys_command = pipeline_create_command(&command);
133    ///
134    /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
135    ///
136    /// process.wait();
137    /// ```
138    ///
139    pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
140        match cmd.spawn() {
141            Ok(process) => {
142                println!("Spawned process {} => {} with args {:?}", process.id(), cmd.get_program().as_bytes().to_string()?, cmd.get_args());
143                Ok(process)
144            },
145            Err(e) => Err(rumtk_format!(
146                "Failed to spawn process {:?} because => {}",
147                cmd.get_program(),
148                e
149            )),
150        }
151    }
152
153    ///
154    /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
155    /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
156    ///
157    /// ## Example
158    ///
159    /// ```
160    /// use std::any::{Any, TypeId};
161    /// use std::process::Stdio;
162    ///
163    /// use rumtk_core::strings::RUMString;
164    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
165    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_processes, pipeline_spawn_process};
166    ///
167    /// let ls_name = "ls";
168    /// let mut ls_command = RUMCommand::default();
169    /// ls_command.path = RUMString::from(ls_name);
170    /// let mut sys_ls_command = pipeline_create_command(&ls_command);
171    ///
172    /// let wc_name = "wc";
173    /// let mut wc_command = RUMCommand::default();
174    /// wc_command.path = RUMString::from(wc_name);
175    /// let mut sys_wc_command = pipeline_create_command(&wc_command);
176    ///
177    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
178    /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
179    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
180    ///
181    /// sys_ls_process.wait();
182    /// sys_wc_process.wait();
183    /// ```
184    ///
185    pub fn pipeline_pipe_processes(
186        process: &mut RUMPipelineProcess,
187        piped: &mut RUMPipelineCommand,
188    ) -> RUMResult<()> {
189        let process_stdout = Stdio::from(match process.stdout.take() {
190            Some(stdout) => stdout,
191            None => {
192                return Err(rumtk_format!(
193                    "No stdout handle found for process {}.",
194                    process.id()
195                ));
196            }
197        });
198        let _ = piped.stdin(process_stdout);
199        Ok(())
200    }
201
202    ///
203    /// Retrieves the standard output generated by the completed process.
204    ///
205    /// ## Example
206    ///
207    /// ```
208    /// use std::any::{Any, TypeId};
209    /// use std::process::Stdio;
210    ///
211    /// use rumtk_core::strings::RUMString;
212    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
213    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_spawn_process, pipeline_get_stdout, pipeline_pipe_processes};
214    ///
215    /// let ls_name = "ls";
216    /// let mut ls_command = RUMCommand::default();
217    /// ls_command.path = RUMString::from(ls_name);
218    /// let mut sys_ls_command = pipeline_create_command(&ls_command);
219    ///
220    /// let wc_name = "wc";
221    /// let mut wc_command = RUMCommand::default();
222    /// wc_command.path = RUMString::from(wc_name);
223    /// let mut sys_wc_command = pipeline_create_command(&wc_command);
224    ///
225    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
226    /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
227    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
228    ///
229    /// sys_ls_process.wait();
230    /// sys_wc_process.wait();
231    ///
232    /// let out_data = pipeline_get_stdout(sys_wc_process).unwrap();
233    ///
234    /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
235    /// ```
236    ///
237    pub fn pipeline_get_stdout(mut process: RUMPipelineProcess) -> RUMResult<RUMBuffer> {
238        match process.wait_with_output() {
239            Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
240            Err(e) => Err(rumtk_format!(
241                "Issue reading last process output because => {}",
242                e
243            )),
244        }
245    }
246
247    ///
248    /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
249    ///
250    /// ## Example
251    ///
252    /// ```
253    /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
254    /// use rumtk_core::strings::RUMString;
255    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
256    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_into_process, pipeline_spawn_process};
257    /// use rumtk_core::types::RUMBuffer;
258    ///
259    /// let ls_name = "ls";
260    /// let mut ls_command = RUMCommand::default();
261    /// ls_command.path = RUMString::from(ls_name);
262    /// let mut sys_ls_command = pipeline_create_command(&ls_command);
263    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
264    ///
265    /// pipeline_close_process_stdin(&mut sys_ls_process);
266    ///
267    ///
268    /// ```
269    ///
270    pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
271        // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
272        match process.stdin.take() {
273            Some(stdin) => {
274                drop(stdin);
275            }
276            None => {}
277        };
278    }
279
280    ///
281    /// Pipe data into a process.
282    ///
283    /// ## Example
284    ///
285    /// ```
286    /// use rumtk_core::strings::RUMString;
287    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
288    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_into_process, pipeline_spawn_process};
289    /// use rumtk_core::types::RUMBuffer;
290    ///
291    /// let ls_name = "ls";
292    /// let mut ls_command = RUMCommand::default();
293    /// ls_command.path = RUMString::from(ls_name);
294    /// let mut sys_ls_command = pipeline_create_command(&ls_command);
295    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
296    /// pipeline_pipe_into_process(&mut sys_ls_process, &RUMBuffer::default()).unwrap();
297    ///
298    /// let out = sys_ls_process.wait_with_output().unwrap();
299    ///
300    /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
301    /// ```
302    ///
303    pub fn pipeline_pipe_into_process(
304        process: &mut RUMPipelineProcess,
305        data: &RUMBuffer,
306    ) -> RUMResult<()> {
307            match process.stdin.take() {
308                Some(ref mut stdin) => match stdin.write_all(data.as_slice()) {
309                    Ok(_) => {}
310                    Err(e) => {
311                        return Err(rumtk_format!(
312                            "Failed to pipe data to stdin of process {} because => {}",
313                            process.id(),
314                            e
315                        ))
316                    }
317                },
318                None => {}
319            }
320        Ok(())
321    }
322
323    ///
324    /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
325    ///
326    /// ## Example
327    ///
328    /// ```
329    /// use rumtk_core::strings::{RUMStringConversions, RUMString};
330    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
331    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_get_stdout, pipeline_wait_pipeline};
332    /// use rumtk_core::types::RUMBuffer;
333    ///
334    /// let ls_name = "ls";
335    /// let mut ls_command = RUMCommand::default();
336    /// ls_command.path = RUMString::from(ls_name);
337    /// ls_command.args = vec!["-la".to_string()];
338    ///
339    /// let mut process = pipeline_generate_command(&ls_command, &RUMBuffer::default()).unwrap();
340    /// let result = pipeline_get_stdout(process).unwrap();
341    ///
342    /// assert!(result.len() > 0, "Command generation failed to generate a valid command!");
343    /// ```
344    ///
345    pub fn pipeline_generate_command(command: &RUMCommand, data: &RUMBuffer) -> RUMResult<RUMPipelineProcess> {
346        let mut root = pipeline_create_command(&command);
347        let mut process = pipeline_spawn_process(&mut root)?;
348        pipeline_pipe_into_process(&mut process, data)?;
349        pipeline_close_process_stdin(&mut process);
350
351        Ok(process)
352    }
353
354    ///
355    /// Add buffer at the beginning of pipeline to pipe in. This buffer serves as the initial input of
356    /// a pipe aware program.
357    ///
358    /// ## Example
359    /// ```
360    /// use rumtk_core::core::RUMResult;
361    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMCommandLine};
362    /// use rumtk_core::rumtk_pipeline_run;
363    /// use rumtk_core::strings::{RUMString};
364    /// use rumtk_core::buffers::{buffer_to_string};
365    /// use rumtk_core::types::RUMBuffer;
366    ///
367    /// let data = RUMBuffer::from_static(b"Hello World");
368    /// let wc_name = "wc";
369    /// let mut wc_command = RUMCommand::default();
370    /// wc_command.path = RUMString::from(wc_name);
371    /// let mut pipeline = vec![
372    ///     wc_command
373    /// ];
374    ///
375    /// let processor = || -> RUMResult<RUMBuffer> {rumtk_pipeline_run!(&pipeline, &data)};
376    /// let result_string = buffer_to_string(&processor().unwrap()).unwrap();
377    /// let binding = result_string.as_str().replace('\n', "");
378    /// let result_items: Vec<&str> = binding.split("      ").collect();
379    /// let result = result_items.get(2).unwrap().trim().parse::<i32>().unwrap();
380    ///
381    /// assert_eq!(result, 2, "Data was not piped properly!");
382    /// ```
383    ///
384    pub fn pipeline_add_stdin_data_to_pipeline<'a>(pipeline: &'a mut RUMCommandLine, data: &RUMBuffer) -> &'a RUMCommandLine {
385        match pipeline.get_mut(0) {
386            Some(command) => command.data = Some(data.clone()),
387            None => {
388                return pipeline;
389            }
390        };
391
392        pipeline
393    }
394
395    ///
396    /// Flatten the [RUMCommandLine] structure into a single string representing the pipeline and
397    /// print it or log it.
398    ///
399    fn print_pipeline(pipeline: &RUMCommandLine) {
400        let mut pipeline_components = Vec::<RUMString>::with_capacity(pipeline.len());
401
402        for pipe in pipeline.iter() {
403            pipeline_components.push(rumtk_format!("{} {}", pipe.path, pipe.args.clone().join(" ")));
404        }
405
406        println!("Executing {}", pipeline_components.join(" | "));
407    }
408
409    pub fn pipeline_patch_command_args<'a>(cmd: &'a mut RUMCommand, replacements: &StringReplacementPair) -> RUMResult<&'a RUMCommand> {
410        let mut new_args = RUMCommandArgs::with_capacity(cmd.args.len());
411
412        for arg in cmd.args.iter() {
413            new_args.push(string_format(arg, replacements));
414        }
415
416        cmd.args = new_args;
417
418        Ok(cmd)
419    }
420
421    ///
422    /// Patches the arguments of the first command with the pattern=replacement pairs!
423    ///
424    /// ## Example
425    /// ```
426    /// use rumtk_core::core::RUMResult;
427    /// use rumtk_core::pipelines::pipeline_functions::pipeline_patch_args;
428    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMCommandLine};
429    /// use rumtk_core::rumtk_pipeline_run;
430    /// use rumtk_core::strings::{RUMString};
431    /// use rumtk_core::buffers::{buffer_to_string};
432    /// use rumtk_core::types::RUMBuffer;
433    ///
434    /// let ls_name = "ls";
435    /// let mut ls_command = RUMCommand::default();
436    /// ls_command.path = RUMString::from(ls_name);
437    /// ls_command.args.push(RUMString::from("{options}"));
438    /// let mut pipeline = vec![
439    ///     ls_command
440    /// ];
441    /// pipeline_patch_args(&mut pipeline, &[("{options}", "-la")]);
442    ///
443    /// let processor = || -> RUMResult<RUMBuffer> {rumtk_pipeline_run!(&pipeline)};
444    /// let result_string = buffer_to_string(&processor().unwrap()).unwrap();
445    /// let results: Vec<&str> = result_string.as_str().split("\n").collect();
446    /// let dot_dir = results.get(1).unwrap().chars().last().unwrap();
447    ///
448    /// assert_eq!(dot_dir, '.', "Incorrect options passed!");
449    /// ```
450    ///
451    pub fn pipeline_patch_args<'a>(pipeline: &'a mut RUMCommandLine, replacements: &StringReplacementPair) -> RUMResult<&'a RUMCommandLine> {
452        for mut cmd in pipeline.iter_mut() {
453            pipeline_patch_command_args(&mut cmd, replacements)?;
454        }
455        Ok(pipeline)
456    }
457
458    ///
459    /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
460    /// consume the pipeline and return the output.
461    ///
462    /// ## Example
463    ///
464    /// ```
465    /// use rumtk_core::strings::RUMString;
466    /// use rumtk_core::types::RUMBuffer;
467    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
468    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_await_pipeline};
469    /// use rumtk_core::{rumtk_resolve_task};
470    ///
471    /// let ls_name = "ls";
472    /// let mut ls_command = RUMCommand::default();
473    /// ls_command.path = RUMString::from(ls_name);
474    ///
475    /// let wc_name = "wc";
476    /// let mut wc_command = RUMCommand::default();
477    /// wc_command.path = RUMString::from(wc_name);
478    ///
479    /// let pipeline = vec![
480    ///     ls_command,
481    ///     wc_command
482    /// ];
483    ///
484    /// let result = rumtk_resolve_task!(pipeline_await_pipeline(&pipeline, &RUMBuffer::default())).unwrap();
485    ///
486    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
487    /// ```
488    ///
489    pub async fn pipeline_await_pipeline(pipeline: &RUMCommandLine, initial_data: &RUMBuffer) -> RUMPipelineResult {
490        let pipeline_copy = pipeline.clone();
491        let data_copy = initial_data.clone();
492        rumtk_resolve_sync_task!(move || {
493            pipeline_wait_pipeline(&pipeline_copy, &data_copy)
494        })
495    }
496
497    ///
498    /// Await for pipeline to complete execution. Once the pipeline execution ends,
499    /// consume the pipeline and return the output.
500    ///
501    /// ## Example
502    ///
503    /// ```
504    /// use rumtk_core::strings::RUMString;
505    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
506    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_wait_pipeline};
507    /// use rumtk_core::types::RUMBuffer;
508    ///
509    /// let ls_name = "ls";
510    /// let mut ls_command = RUMCommand::default();
511    /// ls_command.path = RUMString::from(ls_name);
512    ///
513    /// let wc_name = "wc";
514    /// let mut wc_command = RUMCommand::default();
515    /// wc_command.path = RUMString::from(wc_name);
516    ///
517    /// let pipeline = vec![
518    ///     ls_command,
519    ///     wc_command
520    /// ];
521    ///
522    /// let result = pipeline_wait_pipeline(&pipeline, &RUMBuffer::default()).unwrap();
523    ///
524    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
525    /// ```
526    ///
527    pub fn pipeline_wait_pipeline(pipeline: &RUMCommandLine, initial_data: &RUMBuffer) -> RUMPipelineResult {
528        let mut last_data = initial_data.clone();
529
530        // Print pipeline
531        print_pipeline(&pipeline);
532
533        // Now let's visit each process and await their completion!
534        for c in pipeline.iter() {
535            let mut p = pipeline_generate_command(&c, &last_data)?;
536            pipeline_close_process_stdin(&mut p);
537            println!("Waiting on {}", p.id());
538            last_data = pipeline_get_stdout(p)?;
539        }
540
541        Ok(last_data)
542    }
543}
544
545pub mod pipeline_macros {
546    ///
547    /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
548    /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
549    ///
550    /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
551    /// key value pair strings that we can use to update the process environment.
552    ///
553    /// ## Example
554    ///
555    /// ### Program Only
556    ///
557    /// ```
558    /// use rumtk_core::rumtk_pipeline_command;
559    ///
560    /// let command = rumtk_pipeline_command!("ls");
561    /// ```
562    ///
563    /// ### Program with Piped Data
564    ///
565    /// ```
566    /// use rumtk_core::rumtk_pipeline_command;
567    /// use rumtk_core::types::RUMBuffer;
568    /// use rumtk_core::strings::RUMStringConversions;
569    ///
570    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
571    /// ```
572    ///
573    /// ### Program with Args
574    ///
575    /// ```
576    /// use rumtk_core::rumtk_pipeline_command;
577    /// use rumtk_core::types::RUMBuffer;
578    /// use rumtk_core::strings::RUMStringConversions;
579    ///
580    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
581    ///     "-l".to_string()
582    /// ]);
583    /// ```
584    ///
585    #[macro_export]
586    macro_rules! rumtk_pipeline_command {
587        ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
588            use $crate::pipelines::pipeline_types::RUMCommand;
589
590            RUMCommand::new($path, &Some($data), $args, $env)
591        }};
592        ( $path:expr, $data:expr, $args:expr ) => {{
593            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
594
595            RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
596        }};
597        ( $path:expr, $data:expr ) => {{
598            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
599
600            RUMCommand::new(
601                $path,
602                &Some($data),
603                &RUMCommandArgs::default(),
604                &RUMCommandEnv::default(),
605            )
606        }};
607        ( $path:expr ) => {{
608            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
609            use $crate::types::RUMBuffer;
610
611            RUMCommand::new(
612                $path,
613                &None,
614                &RUMCommandArgs::default(),
615                &RUMCommandEnv::default(),
616            )
617        }};
618    }
619
620    ///
621    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
622    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
623    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
624    ///
625    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
626    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
627    ///
628    /// ## Example
629    ///
630    /// ### Simple
631    ///
632    /// ```
633    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_quick_run, rumtk_resolve_task, rumtk_init_threads};
634    /// use rumtk_core::core::{RUMResult};
635    /// use rumtk_core::strings::RUMStringConversions;
636    /// use rumtk_core::types::RUMBuffer;
637    ///
638    /// let f = async || -> RUMResult<()> {
639    ///     let result = rumtk_pipeline_quick_run!(
640    ///         rumtk_pipeline_command!("ls"),
641    ///         rumtk_pipeline_command!("wc")
642    ///     ).unwrap();
643    ///
644    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
645    ///     Ok(())
646    /// };
647    ///
648    /// rumtk_resolve_task!(f()).unwrap();
649    /// ```
650    ///
651    #[macro_export]
652    macro_rules! rumtk_pipeline_quick_run {
653        ( $($command:expr),+ ) => {{
654            use $crate::types::RUMBuffer;
655            use $crate::pipelines::pipeline_functions::{pipeline_wait_pipeline};
656
657            let pipeline = vec![$($command),+];
658
659            pipeline_wait_pipeline(&pipeline, &RUMBuffer::default())
660        }};
661    }
662
663    ///
664    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
665    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
666    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
667    ///
668    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
669    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
670    ///
671    /// This is the `async` flavor.
672    ///
673    /// ## Example
674    ///
675    /// ### Simple
676    ///
677    /// ```
678    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_quick_run_async, rumtk_resolve_task, rumtk_init_threads};
679    /// use rumtk_core::core::{RUMResult};
680    /// use rumtk_core::strings::RUMStringConversions;
681    /// use rumtk_core::types::RUMBuffer;
682    ///
683    /// let f = async || -> RUMResult<()> {
684    ///     let result = rumtk_pipeline_quick_run_async!(
685    ///         rumtk_pipeline_command!("ls"),
686    ///         rumtk_pipeline_command!("wc")
687    ///     ).await?;
688    ///
689    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
690    ///     Ok(())
691    /// };
692    ///
693    /// rumtk_resolve_task!(f()).unwrap();
694    /// ```
695    ///
696    #[macro_export]
697    macro_rules! rumtk_pipeline_quick_run_async {
698        ( $($command:expr),+ ) => {{
699            use $crate::types::RUMBuffer;
700            use $crate::pipelines::pipeline_functions::{pipeline_await_pipeline};
701
702            let pipeline = vec![$($command),+];
703
704            pipeline_await_pipeline(&pipeline.clone(), &RUMBuffer::default().clone())
705        }};
706    }
707
708    ///
709    /// This macro is similar to [rumtk_pipeline_quick_run](crate::rumtk_pipeline_quick_run). The difference here
710    /// is that the function takes a pipeline structure ([RUMCommandLine](crate::pipelines::pipeline_types::RUMCommandLine))
711    /// In other words, this macro simply runs an already defined pipeline.
712    /// 
713    /// ## Example
714    ///
715    /// ### Run the pipeline
716    /// ```
717    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
718    /// use rumtk_core::core::{RUMResult};
719    /// use rumtk_core::strings::RUMStringConversions;
720    /// use rumtk_core::types::RUMBuffer;
721    ///
722    /// let f = || -> RUMResult<RUMBuffer> {
723    ///     let pipeline = vec![
724    ///         rumtk_pipeline_command!("ls"),
725    ///         rumtk_pipeline_command!("wc")
726    ///     ];
727    /// 
728    ///     rumtk_pipeline_run!(&pipeline)
729    /// };
730    /// 
731    /// f().unwrap();
732    /// ```
733    ///
734    /// ### Pipe Buffer to Pipeline
735    ///
736    /// ```
737    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run};
738    /// use rumtk_core::core::RUMResult;
739    /// use rumtk_core::types::RUMBuffer;
740    /// use rumtk_core::strings::{string_to_buffer};
741    /// use rumtk_core::buffers::{buffer_to_string};
742    ///
743    /// const data: &str = "Hello World!";
744    /// const expected: &str = "      0       2      12\n";
745    ///
746    ///
747    /// let f = |input: &RUMBuffer| -> RUMResult<RUMBuffer> {
748    ///     let mut pipeline = vec![
749    ///         rumtk_pipeline_command!("wc")
750    ///     ];
751    ///
752    ///     rumtk_pipeline_run!(&pipeline, &input)
753    /// };
754    /// let result = buffer_to_string(&f(&string_to_buffer(data)).unwrap()).unwrap();
755    ///
756    /// assert_eq!(result, expected, "Buffer correctly piped into pipeline!");
757    /// ```
758    ///
759    /// ### Pipe String to Pipeline
760    ///
761    /// ```
762    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run};
763    /// use rumtk_core::core::RUMResult;
764    /// use rumtk_core::strings::{string_to_buffer};
765    /// use rumtk_core::buffers::{buffer_to_string};
766    /// use rumtk_core::types::RUMBuffer;
767    ///
768    /// const data: &str = "Hello World!";
769    /// const expected: &str = "      0       2      12\n";
770    ///
771    ///
772    /// let f = |input: &str| -> RUMResult<RUMBuffer> {
773    ///     let mut pipeline = vec![
774    ///         rumtk_pipeline_command!("wc")
775    ///     ];
776    ///
777    ///     rumtk_pipeline_run!(&pipeline, &string_to_buffer(input))
778    /// };
779    /// let result = buffer_to_string(&f(data).unwrap()).unwrap();
780    ///
781    /// assert_eq!(result, expected, "String correctly piped into pipeline!");
782    /// ```
783    /// 
784    #[macro_export]
785    macro_rules! rumtk_pipeline_run {
786        ( $pipeline:expr ) => {{
787            use $crate::types::RUMBuffer;
788
789            rumtk_pipeline_run!($pipeline, &RUMBuffer::default())
790        }};
791        ( $pipeline:expr, $data:expr ) => {{
792            use $crate::pipelines::pipeline_functions::{pipeline_wait_pipeline};
793
794            pipeline_wait_pipeline($pipeline, $data)
795        }};
796    }
797
798
799    ///
800    /// This macro is similar to [rumtk_pipeline_quick_run_async](crate::rumtk_pipeline_quick_run_async). The difference here
801    /// is that the function takes a pipeline structure ([RUMCommandLine](crate::pipelines::pipeline_types::RUMCommandLine))
802    /// In other words, this macro simply runs an already defined pipeline.
803    ///
804    /// ## Example
805    /// ```
806    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
807    /// use rumtk_core::core::{RUMResult};
808    /// use rumtk_core::strings::RUMStringConversions;
809    /// use rumtk_core::types::RUMBuffer;
810    ///
811    /// let f = async || -> RUMResult<RUMBuffer> {
812    ///     let pipeline = vec![
813    ///         rumtk_pipeline_command!("ls"),
814    ///         rumtk_pipeline_command!("wc")
815    ///     ];
816    ///
817    ///     rumtk_pipeline_run_async!(&pipeline).await
818    /// };
819    ///
820    /// rumtk_resolve_task!(f()).unwrap();
821    /// ```
822    ///
823    /// ### Pipe Buffer to Pipeline
824    ///
825    /// ```
826    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_spawn_task};
827    /// use rumtk_core::core::RUMResult;
828    /// use rumtk_core::types::RUMBuffer;
829    /// use rumtk_core::buffers::{buffer_to_string};
830    /// use rumtk_core::strings::{string_to_buffer};
831    ///
832    /// const data: &str = "Hello World!";
833    /// const expected: &str = "      0       2      12\n";
834    ///
835    ///
836    /// let task = rumtk_spawn_task!(async {
837    ///     let input = RUMBuffer::from("Hello World!");
838    ///     let mut pipeline = vec![
839    ///         rumtk_pipeline_command!("wc")
840    ///     ];
841    ///
842    ///     rumtk_pipeline_run_async!(&pipeline, &input).await
843    /// });
844    ///
845    /// let result = buffer_to_string(&rumtk_resolve_task!(task).unwrap().unwrap()).unwrap();
846    ///
847    /// assert_eq!(result, expected, "Buffer correctly piped into pipeline!");
848    /// ```
849    ///
850    /// ### Pipe String to Pipeline
851    ///
852    /// ```
853    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_spawn_task};
854    /// use rumtk_core::core::RUMResult;
855    /// use rumtk_core::strings::{string_to_buffer};
856    /// use rumtk_core::buffers::{buffer_to_string};
857    /// use rumtk_core::types::RUMBuffer;
858    ///
859    /// const data: &str = "Hello World!";
860    /// const expected: &str = "      0       2      12\n";
861    ///
862    ///
863    /// let task = rumtk_spawn_task!(async {
864    ///     let input = string_to_buffer(data);
865    ///     let mut pipeline = vec![
866    ///         rumtk_pipeline_command!("wc")
867    ///     ];
868    ///
869    ///     rumtk_pipeline_run_async!(&pipeline, &input).await
870    /// });
871    ///
872    /// let result = buffer_to_string(&rumtk_resolve_task!(task).unwrap().unwrap()).unwrap();
873    ///
874    /// assert_eq!(result, expected, "String correctly piped into pipeline!");
875    /// ```
876    ///
877    #[macro_export]
878    macro_rules! rumtk_pipeline_run_async {
879        ( $pipeline:expr ) => {{
880            use $crate::types::RUMBuffer;
881
882            rumtk_pipeline_run_async!($pipeline, &RUMBuffer::default())
883        }};
884        ( $pipeline:expr, $data:expr ) => {{
885            use $crate::pipelines::pipeline_functions::{pipeline_await_pipeline};
886
887            pipeline_await_pipeline($pipeline, $data)
888        }};
889    }
890
891    ///
892    /// Patch the pipeline's arguments with desired dynamic options.
893    ///
894    /// ## Example
895    /// ```
896    /// use rumtk_core::{rumtk_pipeline_patch_args, rumtk_pipeline_command, rumtk_pipeline_run};
897    /// use rumtk_core::core::RUMResult;
898    /// use rumtk_core::buffers::{buffer_to_string};
899    /// use rumtk_core::types::RUMBuffer;
900    /// use rumtk_core::strings::string_to_buffer;
901    ///
902    ///
903    /// let f = || -> RUMResult<RUMBuffer> {
904    ///     let mut pipeline = vec![
905    ///         rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
906    ///             "{options}".into()
907    ///         ])
908    ///     ];
909    ///
910    ///     rumtk_pipeline_patch_args!(&mut pipeline, &[("{options}", "-la")]);
911    ///
912    ///     rumtk_pipeline_run!(&pipeline)
913    /// };
914    ///
915    /// let result_string = buffer_to_string(&f().unwrap()).unwrap();
916    /// let results: Vec<&str> = result_string.as_str().split("\n").collect();
917    /// let dot_dir = results.get(1).unwrap().chars().last().unwrap();
918    ///
919    /// assert_eq!(dot_dir, '.', "Incorrect options passed!");
920    /// ```
921    ///
922    #[macro_export]
923    macro_rules! rumtk_pipeline_patch_args {
924        ( $pipeline:expr, $replacements:expr ) => {{
925            use $crate::pipelines::pipeline_functions::{pipeline_patch_args};
926
927            pipeline_patch_args($pipeline, $replacements)
928        }};
929    }
930}