Skip to main content

rumtk_core/
pipelines.rs

1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025  Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025  MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19 */
20
21pub mod pipeline_types {
22    use crate::base::{RUMResult, RUMVec};
23    use crate::serde::json::{RUMDeJson, RUMSerJson};
24    use crate::strings::{RUMString, RUMStringConversions};
25    use crate::types::{RUMBuffer, RUMHashMap};
26    use std::process::{Child, Command};
27
28    pub static EMPTY_COMMAND_LINE: RUMCommandLine = RUMCommandLine::new();
29
30    pub type RUMCommandArgs = Vec<RUMString>;
31    pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
32    #[derive(RUMSerJson, RUMDeJson, PartialEq, Default, Debug, Clone)]
33    pub struct RUMCommand {
34        pub path: RUMString,
35        #[serde(skip)]
36        pub data: Option<RUMBuffer>,
37        pub args: RUMCommandArgs,
38        pub env: RUMCommandEnv,
39    }
40
41    impl RUMCommand {
42        pub fn new(
43            prog: &str,
44            data: &Option<RUMBuffer>,
45            args: &RUMCommandArgs,
46            env: &RUMCommandEnv,
47        ) -> Self {
48            RUMCommand {
49                path: prog.to_string(),
50                args: args.clone(),
51                env: env.clone(),
52                data: data.clone(),
53            }
54        }
55    }
56
57    pub type RUMCommandLine = RUMVec<RUMCommand>;
58    pub type RUMPipelineCommand = Command;
59    pub type RUMPipelineProcess = Child;
60    pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
61    pub type RUMPipelineResult = RUMResult<RUMBuffer>;
62}
63
64pub mod pipeline_functions {
65    use super::pipeline_types::*;
66    use crate::base::RUMResult;
67    use crate::strings::{rumtk_format, string_format, RUMArrayConversions, RUMString, StringReplacementPair};
68    use std::io::{Read, Write};
69    use std::os::unix::ffi::OsStrExt;
70
71    use crate::rumtk_resolve_sync_task;
72    use crate::types::RUMBuffer;
73    use std::process::{Command, Stdio};
74
75    const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
76    const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
77
78    ///
79    /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
80    /// runtime can use to spawn a process.
81    ///
82    /// ## Example
83    ///
84    /// ```
85    /// use std::any::{Any, TypeId};
86    ///
87    /// use rumtk_core::strings::RUMString;
88    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
89    /// use rumtk_core::pipelines::pipeline_functions::pipeline_create_command;
90    ///
91    /// let command_name = "ls";
92    /// let mut command = RUMCommand::default();
93    /// command.path = RUMString::from(command_name);
94    ///
95    /// let sys_command = pipeline_create_command(&command);
96    ///
97    /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
98    ///
99    /// ```
100    ///
101    pub fn pipeline_create_command(command: &RUMCommand) -> RUMPipelineCommand {
102        let mut cmd = Command::new(command.path.as_str());
103
104        for arg in command.args.iter() {
105            cmd.arg(arg);
106        }
107
108        cmd.envs(command.env.iter());
109
110        cmd.stdin(Stdio::piped())
111            .stdout(Stdio::piped())
112            .stderr(Stdio::piped());
113
114        cmd
115    }
116
117    ///
118    /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
119    ///
120    /// ## Example
121    ///
122    /// ```
123    /// use std::any::{Any, TypeId};
124    ///
125    /// use rumtk_core::strings::RUMString;
126    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
127    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_spawn_process};
128    ///
129    /// let command_name = "ls";
130    /// let mut command = RUMCommand::default();
131    /// command.path = RUMString::from(command_name);
132    ///
133    /// let mut sys_command = pipeline_create_command(&command);
134    ///
135    /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
136    ///
137    /// process.wait();
138    /// ```
139    ///
140    pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
141        match cmd.spawn() {
142            Ok(process) => {
143                println!("Spawned process {} => {} with args {:?}", process.id(), cmd.get_program().as_bytes().to_string()?, cmd.get_args());
144                Ok(process)
145            },
146            Err(e) => Err(rumtk_format!(
147                "Failed to spawn process {:?} because => {}",
148                cmd.get_program(),
149                e
150            )),
151        }
152    }
153
154    ///
155    /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
156    /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
157    ///
158    /// ## Example
159    ///
160    /// ```
161    /// use std::any::{Any, TypeId};
162    /// use std::process::Stdio;
163    ///
164    /// use rumtk_core::strings::RUMString;
165    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
166    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_processes, pipeline_spawn_process};
167    ///
168    /// let ls_name = "ls";
169    /// let mut ls_command = RUMCommand::default();
170    /// ls_command.path = RUMString::from(ls_name);
171    /// let mut sys_ls_command = pipeline_create_command(&ls_command);
172    ///
173    /// let wc_name = "wc";
174    /// let mut wc_command = RUMCommand::default();
175    /// wc_command.path = RUMString::from(wc_name);
176    /// let mut sys_wc_command = pipeline_create_command(&wc_command);
177    ///
178    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
179    /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
180    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
181    ///
182    /// sys_ls_process.wait();
183    /// sys_wc_process.wait();
184    /// ```
185    ///
186    pub fn pipeline_pipe_processes(
187        process: &mut RUMPipelineProcess,
188        piped: &mut RUMPipelineCommand,
189    ) -> RUMResult<()> {
190        let process_stdout = Stdio::from(match process.stdout.take() {
191            Some(stdout) => stdout,
192            None => {
193                return Err(rumtk_format!(
194                    "No stdout handle found for process {}.",
195                    process.id()
196                ));
197            }
198        });
199        let _ = piped.stdin(process_stdout);
200        Ok(())
201    }
202
203    ///
204    /// Retrieves the standard output generated by the completed process.
205    ///
206    /// ## Example
207    ///
208    /// ```
209    /// use std::any::{Any, TypeId};
210    /// use std::process::Stdio;
211    ///
212    /// use rumtk_core::strings::RUMString;
213    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
214    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_spawn_process, pipeline_get_stdout, pipeline_pipe_processes};
215    ///
216    /// let ls_name = "ls";
217    /// let mut ls_command = RUMCommand::default();
218    /// ls_command.path = RUMString::from(ls_name);
219    /// let mut sys_ls_command = pipeline_create_command(&ls_command);
220    ///
221    /// let wc_name = "wc";
222    /// let mut wc_command = RUMCommand::default();
223    /// wc_command.path = RUMString::from(wc_name);
224    /// let mut sys_wc_command = pipeline_create_command(&wc_command);
225    ///
226    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
227    /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
228    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
229    ///
230    /// sys_ls_process.wait();
231    /// sys_wc_process.wait();
232    ///
233    /// let out_data = pipeline_get_stdout(sys_wc_process).unwrap();
234    ///
235    /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
236    /// ```
237    ///
238    pub fn pipeline_get_stdout(mut process: RUMPipelineProcess) -> RUMResult<RUMBuffer> {
239        match process.wait_with_output() {
240            Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
241            Err(e) => Err(rumtk_format!(
242                "Issue reading last process output because => {}",
243                e
244            )),
245        }
246    }
247
248    ///
249    /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
250    ///
251    /// ## Example
252    ///
253    /// ```
254    /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
255    /// use rumtk_core::strings::RUMString;
256    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
257    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_into_process, pipeline_spawn_process};
258    /// use rumtk_core::types::RUMBuffer;
259    ///
260    /// let ls_name = "ls";
261    /// let mut ls_command = RUMCommand::default();
262    /// ls_command.path = RUMString::from(ls_name);
263    /// let mut sys_ls_command = pipeline_create_command(&ls_command);
264    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
265    ///
266    /// pipeline_close_process_stdin(&mut sys_ls_process);
267    ///
268    ///
269    /// ```
270    ///
271    pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
272        // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
273        match process.stdin.take() {
274            Some(stdin) => {
275                drop(stdin);
276            }
277            None => {}
278        };
279    }
280
281    ///
282    /// Pipe data into a process.
283    ///
284    /// ## Example
285    ///
286    /// ```
287    /// use rumtk_core::strings::RUMString;
288    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
289    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_into_process, pipeline_spawn_process};
290    /// use rumtk_core::types::RUMBuffer;
291    ///
292    /// let ls_name = "ls";
293    /// let mut ls_command = RUMCommand::default();
294    /// ls_command.path = RUMString::from(ls_name);
295    /// let mut sys_ls_command = pipeline_create_command(&ls_command);
296    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
297    /// pipeline_pipe_into_process(&mut sys_ls_process, &RUMBuffer::default()).unwrap();
298    ///
299    /// let out = sys_ls_process.wait_with_output().unwrap();
300    ///
301    /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
302    /// ```
303    ///
304    pub fn pipeline_pipe_into_process(
305        process: &mut RUMPipelineProcess,
306        data: &RUMBuffer,
307    ) -> RUMResult<()> {
308            match process.stdin.take() {
309                Some(ref mut stdin) => match stdin.write_all(data.as_slice()) {
310                    Ok(_) => {}
311                    Err(e) => {
312                        return Err(rumtk_format!(
313                            "Failed to pipe data to stdin of process {} because => {}",
314                            process.id(),
315                            e
316                        ))
317                    }
318                },
319                None => {}
320            }
321        Ok(())
322    }
323
324    ///
325    /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
326    ///
327    /// ## Example
328    ///
329    /// ```
330    /// use rumtk_core::strings::{RUMStringConversions, RUMString};
331    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
332    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_get_stdout, pipeline_wait_pipeline};
333    /// use rumtk_core::types::RUMBuffer;
334    ///
335    /// let ls_name = "ls";
336    /// let mut ls_command = RUMCommand::default();
337    /// ls_command.path = RUMString::from(ls_name);
338    /// ls_command.args = vec!["-la".to_string()];
339    ///
340    /// let mut process = pipeline_generate_command(&ls_command, &RUMBuffer::default()).unwrap();
341    /// let result = pipeline_get_stdout(process).unwrap();
342    ///
343    /// assert!(result.len() > 0, "Command generation failed to generate a valid command!");
344    /// ```
345    ///
346    pub fn pipeline_generate_command(command: &RUMCommand, data: &RUMBuffer) -> RUMResult<RUMPipelineProcess> {
347        let mut root = pipeline_create_command(&command);
348        let mut process = pipeline_spawn_process(&mut root)?;
349        pipeline_pipe_into_process(&mut process, data)?;
350        pipeline_close_process_stdin(&mut process);
351
352        Ok(process)
353    }
354
355    ///
356    /// Add buffer at the beginning of pipeline to pipe in. This buffer serves as the initial input of
357    /// a pipe aware program.
358    ///
359    /// ## Example
360    /// ```
361    /// use rumtk_core::base::RUMResult;
362    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMCommandLine};
363    /// use rumtk_core::rumtk_pipeline_run;
364    /// use rumtk_core::strings::{RUMString};
365    /// use rumtk_core::buffers::{buffer_to_string};
366    /// use rumtk_core::types::RUMBuffer;
367    ///
368    /// let data = RUMBuffer::from_static(b"Hello World");
369    /// let wc_name = "wc";
370    /// let mut wc_command = RUMCommand::default();
371    /// wc_command.path = RUMString::from(wc_name);
372    /// let mut pipeline = vec![
373    ///     wc_command
374    /// ];
375    ///
376    /// let processor = || -> RUMResult<RUMBuffer> {rumtk_pipeline_run!(&pipeline, &data)};
377    /// let result_string = buffer_to_string(&processor().unwrap()).unwrap();
378    /// let binding = result_string.as_str().replace('\n', "");
379    /// let result_items: Vec<&str> = binding.split("      ").collect();
380    /// let result = result_items.get(2).unwrap().trim().parse::<i32>().unwrap();
381    ///
382    /// assert_eq!(result, 2, "Data was not piped properly!");
383    /// ```
384    ///
385    pub fn pipeline_add_stdin_data_to_pipeline<'a>(pipeline: &'a mut RUMCommandLine, data: &RUMBuffer) -> &'a RUMCommandLine {
386        match pipeline.get_mut(0) {
387            Some(command) => command.data = Some(data.clone()),
388            None => {
389                return pipeline;
390            }
391        };
392
393        pipeline
394    }
395
396    ///
397    /// Flatten the [RUMCommandLine] structure into a single string representing the pipeline and
398    /// print it or log it.
399    ///
400    fn print_pipeline(pipeline: &RUMCommandLine) {
401        let mut pipeline_components = Vec::<RUMString>::with_capacity(pipeline.len());
402
403        for pipe in pipeline.iter() {
404            pipeline_components.push(rumtk_format!("{} {}", pipe.path, pipe.args.clone().join(" ")));
405        }
406
407        println!("Executing {}", pipeline_components.join(" | "));
408    }
409
410    pub fn pipeline_patch_command_args<'a>(cmd: &'a mut RUMCommand, replacements: &StringReplacementPair) -> RUMResult<&'a RUMCommand> {
411        let mut new_args = RUMCommandArgs::with_capacity(cmd.args.len());
412
413        for arg in cmd.args.iter() {
414            new_args.push(string_format(arg, replacements));
415        }
416
417        cmd.args = new_args;
418
419        Ok(cmd)
420    }
421
422    ///
423    /// Patches the arguments of the first command with the pattern=replacement pairs!
424    ///
425    /// ## Example
426    /// ```
427    /// use rumtk_core::base::RUMResult;
428    /// use rumtk_core::pipelines::pipeline_functions::pipeline_patch_args;
429    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMCommandLine};
430    /// use rumtk_core::rumtk_pipeline_run;
431    /// use rumtk_core::strings::{RUMString};
432    /// use rumtk_core::buffers::{buffer_to_string};
433    /// use rumtk_core::types::RUMBuffer;
434    ///
435    /// let ls_name = "ls";
436    /// let mut ls_command = RUMCommand::default();
437    /// ls_command.path = RUMString::from(ls_name);
438    /// ls_command.args.push(RUMString::from("{options}"));
439    /// let mut pipeline = vec![
440    ///     ls_command
441    /// ];
442    /// pipeline_patch_args(&mut pipeline, &[("{options}", "-la")]);
443    ///
444    /// let processor = || -> RUMResult<RUMBuffer> {rumtk_pipeline_run!(&pipeline)};
445    /// let result_string = buffer_to_string(&processor().unwrap()).unwrap();
446    /// let results: Vec<&str> = result_string.as_str().split("\n").collect();
447    /// let dot_dir = results.get(1).unwrap().chars().last().unwrap();
448    ///
449    /// assert_eq!(dot_dir, '.', "Incorrect options passed!");
450    /// ```
451    ///
452    pub fn pipeline_patch_args<'a>(pipeline: &'a mut RUMCommandLine, replacements: &StringReplacementPair) -> RUMResult<&'a RUMCommandLine> {
453        for mut cmd in pipeline.iter_mut() {
454            pipeline_patch_command_args(&mut cmd, replacements)?;
455        }
456        Ok(pipeline)
457    }
458
459    ///
460    /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
461    /// consume the pipeline and return the output.
462    ///
463    /// ## Example
464    ///
465    /// ```
466    /// use rumtk_core::strings::RUMString;
467    /// use rumtk_core::types::RUMBuffer;
468    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
469    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_await_pipeline};
470    /// use rumtk_core::{rumtk_resolve_task};
471    ///
472    /// let ls_name = "ls";
473    /// let mut ls_command = RUMCommand::default();
474    /// ls_command.path = RUMString::from(ls_name);
475    ///
476    /// let wc_name = "wc";
477    /// let mut wc_command = RUMCommand::default();
478    /// wc_command.path = RUMString::from(wc_name);
479    ///
480    /// let pipeline = vec![
481    ///     ls_command,
482    ///     wc_command
483    /// ];
484    ///
485    /// let result = rumtk_resolve_task!(pipeline_await_pipeline(&pipeline, &RUMBuffer::default())).unwrap();
486    ///
487    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
488    /// ```
489    ///
490    pub async fn pipeline_await_pipeline(pipeline: &RUMCommandLine, initial_data: &RUMBuffer) -> RUMPipelineResult {
491        let pipeline_copy = pipeline.clone();
492        let data_copy = initial_data.clone();
493        rumtk_resolve_sync_task!(move || {
494            pipeline_wait_pipeline(&pipeline_copy, &data_copy)
495        })
496    }
497
498    ///
499    /// Await for pipeline to complete execution. Once the pipeline execution ends,
500    /// consume the pipeline and return the output.
501    ///
502    /// ## Example
503    ///
504    /// ```
505    /// use rumtk_core::strings::RUMString;
506    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
507    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_wait_pipeline};
508    /// use rumtk_core::types::RUMBuffer;
509    ///
510    /// let ls_name = "ls";
511    /// let mut ls_command = RUMCommand::default();
512    /// ls_command.path = RUMString::from(ls_name);
513    ///
514    /// let wc_name = "wc";
515    /// let mut wc_command = RUMCommand::default();
516    /// wc_command.path = RUMString::from(wc_name);
517    ///
518    /// let pipeline = vec![
519    ///     ls_command,
520    ///     wc_command
521    /// ];
522    ///
523    /// let result = pipeline_wait_pipeline(&pipeline, &RUMBuffer::default()).unwrap();
524    ///
525    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
526    /// ```
527    ///
528    pub fn pipeline_wait_pipeline(pipeline: &RUMCommandLine, initial_data: &RUMBuffer) -> RUMPipelineResult {
529        let mut last_data = initial_data.clone();
530
531        // Print pipeline
532        print_pipeline(&pipeline);
533
534        // Now let's visit each process and await their completion!
535        for c in pipeline.iter() {
536            let mut p = pipeline_generate_command(&c, &last_data)?;
537            pipeline_close_process_stdin(&mut p);
538            println!("Waiting on {}", p.id());
539            last_data = pipeline_get_stdout(p)?;
540        }
541
542        Ok(last_data)
543    }
544}
545
546pub mod pipeline_macros {
547    ///
548    /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
549    /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
550    ///
551    /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
552    /// key value pair strings that we can use to update the process environment.
553    ///
554    /// ## Example
555    ///
556    /// ### Program Only
557    ///
558    /// ```
559    /// use rumtk_core::rumtk_pipeline_command;
560    ///
561    /// let command = rumtk_pipeline_command!("ls");
562    /// ```
563    ///
564    /// ### Program with Piped Data
565    ///
566    /// ```
567    /// use rumtk_core::rumtk_pipeline_command;
568    /// use rumtk_core::types::RUMBuffer;
569    /// use rumtk_core::strings::RUMStringConversions;
570    ///
571    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
572    /// ```
573    ///
574    /// ### Program with Args
575    ///
576    /// ```
577    /// use rumtk_core::rumtk_pipeline_command;
578    /// use rumtk_core::types::RUMBuffer;
579    /// use rumtk_core::strings::RUMStringConversions;
580    ///
581    /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
582    ///     "-l".to_string()
583    /// ]);
584    /// ```
585    ///
586    #[macro_export]
587    macro_rules! rumtk_pipeline_command {
588        ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
589            use $crate::pipelines::pipeline_types::RUMCommand;
590
591            RUMCommand::new($path, &Some($data), $args, $env)
592        }};
593        ( $path:expr, $data:expr, $args:expr ) => {{
594            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
595
596            RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
597        }};
598        ( $path:expr, $data:expr ) => {{
599            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
600
601            RUMCommand::new(
602                $path,
603                &Some($data),
604                &RUMCommandArgs::default(),
605                &RUMCommandEnv::default(),
606            )
607        }};
608        ( $path:expr ) => {{
609            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
610            use $crate::types::RUMBuffer;
611
612            RUMCommand::new(
613                $path,
614                &None,
615                &RUMCommandArgs::default(),
616                &RUMCommandEnv::default(),
617            )
618        }};
619    }
620
621    ///
622    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
623    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
624    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
625    ///
626    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
627    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
628    ///
629    /// ## Example
630    ///
631    /// ### Simple
632    ///
633    /// ```
634    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_quick_run, rumtk_resolve_task, rumtk_init_threads};
635    /// use rumtk_core::base::{RUMResult};
636    /// use rumtk_core::strings::RUMStringConversions;
637    /// use rumtk_core::types::RUMBuffer;
638    ///
639    /// let f = async || -> RUMResult<()> {
640    ///     let result = rumtk_pipeline_quick_run!(
641    ///         rumtk_pipeline_command!("ls"),
642    ///         rumtk_pipeline_command!("wc")
643    ///     ).unwrap();
644    ///
645    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
646    ///     Ok(())
647    /// };
648    ///
649    /// rumtk_resolve_task!(f()).unwrap();
650    /// ```
651    ///
652    #[macro_export]
653    macro_rules! rumtk_pipeline_quick_run {
654        ( $($command:expr),+ ) => {{
655            use $crate::types::RUMBuffer;
656            use $crate::pipelines::pipeline_functions::{pipeline_wait_pipeline};
657
658            let pipeline = vec![$($command),+];
659
660            pipeline_wait_pipeline(&pipeline, &RUMBuffer::default())
661        }};
662    }
663
664    ///
665    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
666    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
667    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
668    ///
669    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
670    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
671    ///
672    /// This is the `async` flavor.
673    ///
674    /// ## Example
675    ///
676    /// ### Simple
677    ///
678    /// ```
679    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_quick_run_async, rumtk_resolve_task, rumtk_init_threads};
680    /// use rumtk_core::base::{RUMResult};
681    /// use rumtk_core::strings::RUMStringConversions;
682    /// use rumtk_core::types::RUMBuffer;
683    ///
684    /// let f = async || -> RUMResult<()> {
685    ///     let result = rumtk_pipeline_quick_run_async!(
686    ///         rumtk_pipeline_command!("ls"),
687    ///         rumtk_pipeline_command!("wc")
688    ///     ).await?;
689    ///
690    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
691    ///     Ok(())
692    /// };
693    ///
694    /// rumtk_resolve_task!(f()).unwrap();
695    /// ```
696    ///
697    #[macro_export]
698    macro_rules! rumtk_pipeline_quick_run_async {
699        ( $($command:expr),+ ) => {{
700            use $crate::types::RUMBuffer;
701            use $crate::pipelines::pipeline_functions::{pipeline_await_pipeline};
702
703            let pipeline = vec![$($command),+];
704
705            pipeline_await_pipeline(&pipeline.clone(), &RUMBuffer::default().clone())
706        }};
707    }
708
709    ///
710    /// This macro is similar to [rumtk_pipeline_quick_run](crate::rumtk_pipeline_quick_run). The difference here
711    /// is that the function takes a pipeline structure ([RUMCommandLine](crate::pipelines::pipeline_types::RUMCommandLine))
712    /// In other words, this macro simply runs an already defined pipeline.
713    /// 
714    /// ## Example
715    ///
716    /// ### Run the pipeline
717    /// ```
718    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
719    /// use rumtk_core::base::{RUMResult};
720    /// use rumtk_core::strings::RUMStringConversions;
721    /// use rumtk_core::types::RUMBuffer;
722    ///
723    /// let f = || -> RUMResult<RUMBuffer> {
724    ///     let pipeline = vec![
725    ///         rumtk_pipeline_command!("ls"),
726    ///         rumtk_pipeline_command!("wc")
727    ///     ];
728    /// 
729    ///     rumtk_pipeline_run!(&pipeline)
730    /// };
731    /// 
732    /// f().unwrap();
733    /// ```
734    ///
735    /// ### Pipe Buffer to Pipeline
736    ///
737    /// ```
738    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run};
739    /// use rumtk_core::base::RUMResult;
740    /// use rumtk_core::types::RUMBuffer;
741    /// use rumtk_core::strings::{string_to_buffer};
742    /// use rumtk_core::buffers::{buffer_to_string};
743    ///
744    /// const data: &str = "Hello World!";
745    /// const expected: &str = "      0       2      12\n";
746    ///
747    ///
748    /// let f = |input: &RUMBuffer| -> RUMResult<RUMBuffer> {
749    ///     let mut pipeline = vec![
750    ///         rumtk_pipeline_command!("wc")
751    ///     ];
752    ///
753    ///     rumtk_pipeline_run!(&pipeline, &input)
754    /// };
755    /// let result = buffer_to_string(&f(&string_to_buffer(data)).unwrap()).unwrap();
756    ///
757    /// assert_eq!(result, expected, "Buffer correctly piped into pipeline!");
758    /// ```
759    ///
760    /// ### Pipe String to Pipeline
761    ///
762    /// ```
763    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run};
764    /// use rumtk_core::base::RUMResult;
765    /// use rumtk_core::strings::{string_to_buffer};
766    /// use rumtk_core::buffers::{buffer_to_string};
767    /// use rumtk_core::types::RUMBuffer;
768    ///
769    /// const data: &str = "Hello World!";
770    /// const expected: &str = "      0       2      12\n";
771    ///
772    ///
773    /// let f = |input: &str| -> RUMResult<RUMBuffer> {
774    ///     let mut pipeline = vec![
775    ///         rumtk_pipeline_command!("wc")
776    ///     ];
777    ///
778    ///     rumtk_pipeline_run!(&pipeline, &string_to_buffer(input))
779    /// };
780    /// let result = buffer_to_string(&f(data).unwrap()).unwrap();
781    ///
782    /// assert_eq!(result, expected, "String correctly piped into pipeline!");
783    /// ```
784    /// 
785    #[macro_export]
786    macro_rules! rumtk_pipeline_run {
787        ( $pipeline:expr ) => {{
788            use $crate::types::RUMBuffer;
789
790            rumtk_pipeline_run!($pipeline, &RUMBuffer::default())
791        }};
792        ( $pipeline:expr, $data:expr ) => {{
793            use $crate::pipelines::pipeline_functions::{pipeline_wait_pipeline};
794
795            pipeline_wait_pipeline($pipeline, $data)
796        }};
797    }
798
799
800    ///
801    /// This macro is similar to [rumtk_pipeline_quick_run_async](crate::rumtk_pipeline_quick_run_async). The difference here
802    /// is that the function takes a pipeline structure ([RUMCommandLine](crate::pipelines::pipeline_types::RUMCommandLine))
803    /// In other words, this macro simply runs an already defined pipeline.
804    ///
805    /// ## Example
806    /// ```
807    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
808    /// use rumtk_core::base::{RUMResult};
809    /// use rumtk_core::strings::RUMStringConversions;
810    /// use rumtk_core::types::RUMBuffer;
811    ///
812    /// let f = async || -> RUMResult<RUMBuffer> {
813    ///     let pipeline = vec![
814    ///         rumtk_pipeline_command!("ls"),
815    ///         rumtk_pipeline_command!("wc")
816    ///     ];
817    ///
818    ///     rumtk_pipeline_run_async!(&pipeline).await
819    /// };
820    ///
821    /// rumtk_resolve_task!(f()).unwrap();
822    /// ```
823    ///
824    /// ### Pipe Buffer to Pipeline
825    ///
826    /// ```
827    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_spawn_task};
828    /// use rumtk_core::base::RUMResult;
829    /// use rumtk_core::types::RUMBuffer;
830    /// use rumtk_core::buffers::{buffer_to_string};
831    /// use rumtk_core::strings::{string_to_buffer};
832    ///
833    /// const data: &str = "Hello World!";
834    /// const expected: &str = "      0       2      12\n";
835    ///
836    ///
837    /// let task = rumtk_spawn_task!(async {
838    ///     let input = RUMBuffer::from("Hello World!");
839    ///     let mut pipeline = vec![
840    ///         rumtk_pipeline_command!("wc")
841    ///     ];
842    ///
843    ///     rumtk_pipeline_run_async!(&pipeline, &input).await
844    /// });
845    ///
846    /// let result = buffer_to_string(&rumtk_resolve_task!(task).unwrap().unwrap()).unwrap();
847    ///
848    /// assert_eq!(result, expected, "Buffer correctly piped into pipeline!");
849    /// ```
850    ///
851    /// ### Pipe String to Pipeline
852    ///
853    /// ```
854    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_spawn_task};
855    /// use rumtk_core::base::RUMResult;
856    /// use rumtk_core::strings::{string_to_buffer};
857    /// use rumtk_core::buffers::{buffer_to_string};
858    /// use rumtk_core::types::RUMBuffer;
859    ///
860    /// const data: &str = "Hello World!";
861    /// const expected: &str = "      0       2      12\n";
862    ///
863    ///
864    /// let task = rumtk_spawn_task!(async {
865    ///     let input = string_to_buffer(data);
866    ///     let mut pipeline = vec![
867    ///         rumtk_pipeline_command!("wc")
868    ///     ];
869    ///
870    ///     rumtk_pipeline_run_async!(&pipeline, &input).await
871    /// });
872    ///
873    /// let result = buffer_to_string(&rumtk_resolve_task!(task).unwrap().unwrap()).unwrap();
874    ///
875    /// assert_eq!(result, expected, "String correctly piped into pipeline!");
876    /// ```
877    ///
878    #[macro_export]
879    macro_rules! rumtk_pipeline_run_async {
880        ( $pipeline:expr ) => {{
881            use $crate::types::RUMBuffer;
882
883            rumtk_pipeline_run_async!($pipeline, &RUMBuffer::default())
884        }};
885        ( $pipeline:expr, $data:expr ) => {{
886            use $crate::pipelines::pipeline_functions::{pipeline_await_pipeline};
887
888            pipeline_await_pipeline($pipeline, $data)
889        }};
890    }
891
892    ///
893    /// Patch the pipeline's arguments with desired dynamic options.
894    ///
895    /// ## Example
896    /// ```
897    /// use rumtk_core::{rumtk_pipeline_patch_args, rumtk_pipeline_command, rumtk_pipeline_run};
898    /// use rumtk_core::base::RUMResult;
899    /// use rumtk_core::buffers::{buffer_to_string};
900    /// use rumtk_core::types::RUMBuffer;
901    /// use rumtk_core::strings::string_to_buffer;
902    ///
903    ///
904    /// let f = || -> RUMResult<RUMBuffer> {
905    ///     let mut pipeline = vec![
906    ///         rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
907    ///             "{options}".into()
908    ///         ])
909    ///     ];
910    ///
911    ///     rumtk_pipeline_patch_args!(&mut pipeline, &[("{options}", "-la")]);
912    ///
913    ///     rumtk_pipeline_run!(&pipeline)
914    /// };
915    ///
916    /// let result_string = buffer_to_string(&f().unwrap()).unwrap();
917    /// let results: Vec<&str> = result_string.as_str().split("\n").collect();
918    /// let dot_dir = results.get(1).unwrap().chars().last().unwrap();
919    ///
920    /// assert_eq!(dot_dir, '.', "Incorrect options passed!");
921    /// ```
922    ///
923    #[macro_export]
924    macro_rules! rumtk_pipeline_patch_args {
925        ( $pipeline:expr, $replacements:expr ) => {{
926            use $crate::pipelines::pipeline_functions::{pipeline_patch_args};
927
928            pipeline_patch_args($pipeline, $replacements)
929        }};
930    }
931}