Skip to main content

rumtk_core/
pipelines.rs

1/*
2 *     rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 *     This toolkit aims to be reliable, simple, performant, and standards compliant.
4 *     Copyright (C) 2026  Luis M. Santos, M.D.
5 *
6 *     This program is free software: you can redistribute it and/or modify
7 *     it under the terms of the GNU General Public License as published by
8 *     the Free Software Foundation, either version 3 of the License, or
9 *     (at your option) any later version.
10 *
11 *     This program is distributed in the hope that it will be useful,
12 *     but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *     GNU General Public License for more details.
15 *
16 *     You should have received a copy of the GNU General Public License
17 *     along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025  Luis M. Santos, M.D.
24 * Copyright (C) 2025  Nick Stephenson
25 * Copyright (C) 2025  Ethan Dixon
26 * Copyright (C) 2025  MedicalMasses L.L.C.
27 *
28 * This library is free software; you can redistribute it and/or
29 * modify it under the terms of the GNU Lesser General Public
30 * License as published by the Free Software Foundation; either
31 * version 2.1 of the License, or (at your option) any later version.
32 *
33 * This library is distributed in the hope that it will be useful,
34 * but WITHOUT ANY WARRANTY; without even the implied warranty of
35 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
36 * Lesser General Public License for more details.
37 *
38 * You should have received a copy of the GNU Lesser General Public
39 * License along with this library; if not, write to the Free Software
40 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
41 */
42
43pub mod pipeline_types {
44    use crate::strings::{RUMString, RUMStringConversions};
45    use crate::types::{RUMBuffer, RUMHashMap};
46
47    use crate::core::{RUMResult, RUMVec};
48    use std::process::{Child, Command};
49
50    pub type RUMCommandArgs = Vec<RUMString>;
51    pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
52    #[derive(Default, Debug, Clone)]
53    pub struct RUMCommand {
54        pub path: RUMString,
55        pub args: RUMCommandArgs,
56        pub env: RUMCommandEnv,
57    }
58
59    impl RUMCommand {
60        pub fn new(prog: &str, args: &RUMCommandArgs, env: &RUMCommandEnv) -> Self {
61            RUMCommand {
62                path: prog.to_rumstring(),
63                args: args.clone(),
64                env: env.clone(),
65            }
66        }
67    }
68
69    pub type RUMCommandLine = RUMVec<RUMCommand>;
70    pub type RUMPipelineCommand = Command;
71    pub type RUMPipelineProcess = Child;
72    pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
73    pub type RUMPipelineResult = RUMResult<RUMBuffer>;
74}
75
76pub mod pipeline_functions {
77    use super::pipeline_types::*;
78    use crate::core::RUMResult;
79    use crate::strings::rumtk_format;
80    use std::io::Read;
81
82    use crate::threading::threading_functions::async_sleep;
83    use crate::types::RUMBuffer;
84    use std::process::{Command, Stdio};
85
86    const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
87    const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
88
89    ///
90    /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
91    /// runtime can use to spawn a process.
92    ///
93    /// ## Example
94    ///
95    /// ```
96    /// use std::any::{Any, TypeId};
97    ///
98    /// use rumtk_core::strings::RUMString;
99    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
100    /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
101    ///
102    /// let command_name = "ls";
103    /// let mut command = RUMCommand::default();
104    /// command.path = RUMString::from(command_name);
105    ///
106    /// let sys_command = pipeline_generate_command(&command);
107    ///
108    /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
109    ///
110    /// ```
111    ///
112    pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
113        let mut cmd = Command::new(command.path.as_str());
114
115        for arg in command.args.iter() {
116            cmd.arg(arg);
117        }
118
119        cmd.envs(command.env.iter());
120
121        cmd.stdout(Stdio::piped())
122            .stderr(Stdio::piped())
123            .stdin(Stdio::piped());
124
125        cmd
126    }
127
128    ///
129    /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
130    ///
131    /// ## Example
132    ///
133    /// ```
134    /// use std::any::{Any, TypeId};
135    ///
136    /// use rumtk_core::strings::RUMString;
137    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
138    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
139    ///
140    /// let command_name = "ls";
141    /// let mut command = RUMCommand::default();
142    /// command.path = RUMString::from(command_name);
143    ///
144    /// let mut sys_command = pipeline_generate_command(&command);
145    ///
146    /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
147    ///
148    /// process.wait();
149    /// ```
150    ///
151    pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
152        match cmd.spawn() {
153            Ok(process) => Ok(process),
154            Err(e) => Err(rumtk_format!(
155                "Failed to spawn process {:?} because => {}",
156                cmd.get_program(),
157                e
158            )),
159        }
160    }
161
162    ///
163    /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
164    /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
165    ///
166    /// ## Example
167    ///
168    /// ```
169    /// use std::any::{Any, TypeId};
170    /// use std::process::Stdio;
171    ///
172    /// use rumtk_core::strings::RUMString;
173    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
174    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process};
175    ///
176    /// let ls_name = "ls";
177    /// let mut ls_command = RUMCommand::default();
178    /// ls_command.path = RUMString::from(ls_name);
179    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
180    ///
181    /// let wc_name = "wc";
182    /// let mut wc_command = RUMCommand::default();
183    /// wc_command.path = RUMString::from(wc_name);
184    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
185    ///
186    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
187    /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
188    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
189    ///
190    /// sys_ls_process.wait();
191    /// sys_wc_process.wait();
192    /// ```
193    ///
194    pub fn pipeline_pipe_process(
195        process: &mut RUMPipelineProcess,
196        piped: &mut RUMPipelineCommand,
197    ) -> RUMResult<()> {
198        let process_stdout = Stdio::from(match process.stdout.take() {
199            Some(stdout) => stdout,
200            None => {
201                return Err(rumtk_format!(
202                    "No stdout handle found for process {}.",
203                    process.id()
204                ));
205            }
206        });
207        let _ = piped.stdin(process_stdout);
208        Ok(())
209    }
210
211    ///
212    /// Retrieves the standard output generated by the completed process.
213    ///
214    /// ## Example
215    ///
216    /// ```
217    /// use std::any::{Any, TypeId};
218    /// use std::process::Stdio;
219    ///
220    /// use rumtk_core::strings::RUMString;
221    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
222    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process, pipeline_get_stdout};
223    ///
224    /// let ls_name = "ls";
225    /// let mut ls_command = RUMCommand::default();
226    /// ls_command.path = RUMString::from(ls_name);
227    /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
228    ///
229    /// let wc_name = "wc";
230    /// let mut wc_command = RUMCommand::default();
231    /// wc_command.path = RUMString::from(wc_name);
232    /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
233    ///
234    /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
235    /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
236    /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
237    ///
238    /// sys_ls_process.wait();
239    /// sys_wc_process.wait();
240    ///
241    /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
242    ///
243    /// let out_data = pipeline_get_stdout(pipeline).unwrap();
244    ///
245    /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
246    /// ```
247    ///
248    pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
249        let mut last_item = pipeline.pop().unwrap();
250        match last_item.wait_with_output() {
251            Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
252            Err(e) => Err(rumtk_format!(
253                "Issue reading last process output because => {}",
254                e
255            )),
256        }
257    }
258
259    pub fn pipeline_connect_processes<'a>(
260        root: &'a mut RUMPipelineCommand,
261        piped: &'a mut RUMPipelineCommand,
262    ) -> RUMResult<RUMPipelineProcess> {
263        let mut root_process = pipeline_spawn_process(root)?;
264        pipeline_pipe_process(&mut root_process, piped)?;
265        Ok(root_process)
266    }
267
268    ///
269    /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
270    ///
271    /// ## Example
272    ///
273    /// ```
274    /// use rumtk_core::strings::RUMString;
275    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
276    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
277    ///
278    /// let ls_name = "ls";
279    /// let mut ls_command = RUMCommand::default();
280    /// ls_command.path = RUMString::from(ls_name);
281    ///
282    /// let wc_name = "wc";
283    /// let mut wc_command = RUMCommand::default();
284    /// wc_command.path = RUMString::from(wc_name);
285    ///
286    /// let commands = vec![
287    ///     ls_command,
288    ///     wc_command
289    /// ];
290    ///
291    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
292    ///
293    /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
294    /// ```
295    ///
296    pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
297        let mut root = pipeline_generate_command(commands.first().unwrap());
298        let mut parent_process;
299
300        // Setup pipeline
301        let mut pipeline = vec![];
302        root.stdin(Stdio::piped()).stdout(Stdio::piped());
303
304        for cmd in commands.iter().skip(1) {
305            let mut new_root = pipeline_generate_command(cmd);
306            parent_process = pipeline_connect_processes(&mut root, &mut new_root)?;
307            pipeline.push(parent_process);
308            root = new_root;
309        }
310
311        pipeline.push(pipeline_spawn_process(&mut root)?);
312
313        Ok(pipeline)
314    }
315
316    ///
317    /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
318    /// consume the pipeline and return the output.
319    ///
320    /// ## Example
321    ///
322    /// ```
323    /// use rumtk_core::strings::RUMString;
324    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
325    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
326    /// use rumtk_core::{rumtk_resolve_task, rumtk_init_threads};
327    ///
328    /// let ls_name = "ls";
329    /// let mut ls_command = RUMCommand::default();
330    /// ls_command.path = RUMString::from(ls_name);
331    ///
332    /// let wc_name = "wc";
333    /// let mut wc_command = RUMCommand::default();
334    /// wc_command.path = RUMString::from(wc_name);
335    ///
336    /// let commands = vec![
337    ///     ls_command,
338    ///     wc_command
339    /// ];
340    ///
341    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
342    /// let rt = rumtk_init_threads!(&5);
343    /// let result = rumtk_resolve_task!(rt, pipeline_await_pipeline(pipeline)).unwrap();
344    ///
345    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
346    /// ```
347    ///
348    pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
349        for p in pipeline.iter_mut() {
350            loop {
351                match p.try_wait() {
352                    Ok(code) => match code {
353                        Some(code) => {
354                            if !code.success() {
355                                return Err(rumtk_format!(
356                                    "Process {} exited with non-success code => {}!",
357                                    p.id(),
358                                    code
359                                ));
360                            }
361                            break;
362                        }
363                        None => {
364                            async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
365                            continue;
366                        }
367                    },
368                    Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
369                };
370            }
371        }
372
373        let result = pipeline_get_stdout(pipeline)?;
374        Ok(result)
375    }
376
377    ///
378    /// Await for pipeline to complete execution. Once the pipeline execution ends,
379    /// consume the pipeline and return the output.
380    ///
381    /// ## Example
382    ///
383    /// ```
384    /// use rumtk_core::strings::RUMString;
385    /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
386    /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
387    ///
388    /// let ls_name = "ls";
389    /// let mut ls_command = RUMCommand::default();
390    /// ls_command.path = RUMString::from(ls_name);
391    ///
392    /// let wc_name = "wc";
393    /// let mut wc_command = RUMCommand::default();
394    /// wc_command.path = RUMString::from(wc_name);
395    ///
396    /// let commands = vec![
397    ///     ls_command,
398    ///     wc_command
399    /// ];
400    ///
401    /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
402    /// let result = pipeline_wait_pipeline(pipeline).unwrap();
403    ///
404    /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
405    /// ```
406    ///
407    pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
408        for p in pipeline.iter_mut() {
409            match p.wait() {
410                Ok(code) => {
411                    if !code.success() {
412                        return Err(rumtk_format!(
413                            "Process {} exited with non-success code => {}!",
414                            p.id(),
415                            code
416                        ));
417                    }
418                    break;
419                }
420                Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
421            };
422        }
423
424        let result = pipeline_get_stdout(pipeline)?;
425        Ok(result)
426    }
427}
428
429pub mod pipeline_macros {
430
431    ///
432    /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
433    /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
434    ///
435    /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
436    /// key value pair strings that we can use to update the process environment.
437    ///
438    /// ## Example
439    ///
440    /// ### Program Only
441    ///
442    /// ```
443    /// use rumtk_core::rumtk_pipeline_command;
444    ///
445    /// let command = rumtk_pipeline_command!("ls");
446    /// ```
447    ///
448    /// ### Program with Args
449    ///
450    /// ```
451    /// use rumtk_core::rumtk_pipeline_command;
452    /// use rumtk_core::strings::RUMStringConversions;
453    ///
454    /// let command = rumtk_pipeline_command!("ls", &vec![
455    ///     "-l".to_rumstring()
456    /// ]);
457    /// ```
458    ///
459    #[macro_export]
460    macro_rules! rumtk_pipeline_command {
461        ( $path:expr, $args:expr, $env:expr ) => {{
462            use $crate::pipelines::pipeline_types::RUMCommand;
463
464            RUMCommand::new($path, $args, $env)
465        }};
466        ( $path:expr, $args:expr ) => {{
467            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
468
469            RUMCommand::new($path, $args, &RUMCommandEnv::default())
470        }};
471        ( $path:expr ) => {{
472            use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
473
474            RUMCommand::new($path, &RUMCommandArgs::default(), &RUMCommandEnv::default())
475        }};
476    }
477
478    ///
479    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
480    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
481    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
482    ///
483    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
484    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
485    ///
486    /// ## Example
487    ///
488    /// ```
489    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
490    /// use rumtk_core::core::RUMResult;
491    /// use rumtk_core::strings::RUMStringConversions;
492    ///
493    /// let f = || -> RUMResult<()> {
494    ///     let result = rumtk_pipeline_run!(
495    ///         rumtk_pipeline_command!("ls"),
496    ///         rumtk_pipeline_command!("wc")
497    ///     )?;
498    ///
499    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
500    ///     Ok(())
501    /// };
502    ///
503    /// f().unwrap();
504    /// ```
505    ///
506    #[macro_export]
507    macro_rules! rumtk_pipeline_run {
508        ( $($command:expr),+ ) => {{
509            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
510
511            let pipeline = pipeline_generate_pipeline(&vec![
512                $($command),+
513            ])?;
514
515            pipeline_wait_pipeline(pipeline)
516        }};
517    }
518
519    ///
520    /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
521    /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
522    /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
523    ///
524    /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
525    /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
526    ///
527    /// This is the `async` flavor.
528    ///
529    /// ## Example
530    ///
531    /// ```
532    /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
533    /// use rumtk_core::core::RUMResult;
534    /// use rumtk_core::strings::RUMStringConversions;
535    ///
536    /// let f = async || -> RUMResult<()> {
537    ///     let result = rumtk_pipeline_run_async!(
538    ///         rumtk_pipeline_command!("ls"),
539    ///         rumtk_pipeline_command!("wc")
540    ///     )?;
541    ///
542    ///     assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
543    ///     Ok(())
544    /// };
545    ///
546    /// let rt = rumtk_init_threads!(&5);
547    /// rumtk_resolve_task!(rt, f()).unwrap();
548    /// ```
549    ///
550    #[macro_export]
551    macro_rules! rumtk_pipeline_run_async {
552        ( $($command:expr),+ ) => {{
553            use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
554
555            let pipeline = pipeline_generate_pipeline(&vec![
556                $($command),+
557            ])?;
558
559            pipeline_await_pipeline(pipeline).await
560        }};
561    }
562}