rumtk_core/pipelines.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2026 Luis M. Santos, M.D.
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025 Luis M. Santos, M.D.
24 * Copyright (C) 2025 Nick Stephenson
25 * Copyright (C) 2025 Ethan Dixon
26 * Copyright (C) 2025 MedicalMasses L.L.C.
27 *
28 * This library is free software; you can redistribute it and/or
29 * modify it under the terms of the GNU Lesser General Public
30 * License as published by the Free Software Foundation; either
31 * version 2.1 of the License, or (at your option) any later version.
32 *
33 * This library is distributed in the hope that it will be useful,
34 * but WITHOUT ANY WARRANTY; without even the implied warranty of
35 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
36 * Lesser General Public License for more details.
37 *
38 * You should have received a copy of the GNU Lesser General Public
39 * License along with this library; if not, write to the Free Software
40 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
41 */
42
43pub mod pipeline_types {
44 use crate::strings::{RUMString, RUMStringConversions};
45 use crate::types::{RUMBuffer, RUMHashMap};
46
47 use crate::core::{RUMResult, RUMVec};
48 use std::process::{Child, Command};
49
50 pub type RUMCommandArgs = Vec<RUMString>;
51 pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
52 #[derive(Default, Debug, Clone)]
53 pub struct RUMCommand {
54 pub path: RUMString,
55 pub args: RUMCommandArgs,
56 pub env: RUMCommandEnv,
57 }
58
59 impl RUMCommand {
60 pub fn new(prog: &str, args: &RUMCommandArgs, env: &RUMCommandEnv) -> Self {
61 RUMCommand {
62 path: prog.to_rumstring(),
63 args: args.clone(),
64 env: env.clone(),
65 }
66 }
67 }
68
69 pub type RUMCommandLine = RUMVec<RUMCommand>;
70 pub type RUMPipelineCommand = Command;
71 pub type RUMPipelineProcess = Child;
72 pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
73 pub type RUMPipelineResult = RUMResult<RUMBuffer>;
74}
75
76pub mod pipeline_functions {
77 use super::pipeline_types::*;
78 use crate::core::RUMResult;
79 use crate::strings::rumtk_format;
80 use std::io::Read;
81
82 use crate::threading::threading_functions::async_sleep;
83 use crate::types::RUMBuffer;
84 use std::process::{Command, Stdio};
85
86 const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
87 const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
88
89 ///
90 /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
91 /// runtime can use to spawn a process.
92 ///
93 /// ## Example
94 ///
95 /// ```
96 /// use std::any::{Any, TypeId};
97 ///
98 /// use rumtk_core::strings::RUMString;
99 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
100 /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
101 ///
102 /// let command_name = "ls";
103 /// let mut command = RUMCommand::default();
104 /// command.path = RUMString::from(command_name);
105 ///
106 /// let sys_command = pipeline_generate_command(&command);
107 ///
108 /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
109 ///
110 /// ```
111 ///
112 pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
113 let mut cmd = Command::new(command.path.as_str());
114
115 for arg in command.args.iter() {
116 cmd.arg(arg);
117 }
118
119 cmd.envs(command.env.iter());
120
121 cmd.stdout(Stdio::piped())
122 .stderr(Stdio::piped())
123 .stdin(Stdio::piped());
124
125 cmd
126 }
127
128 ///
129 /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
130 ///
131 /// ## Example
132 ///
133 /// ```
134 /// use std::any::{Any, TypeId};
135 ///
136 /// use rumtk_core::strings::RUMString;
137 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
138 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
139 ///
140 /// let command_name = "ls";
141 /// let mut command = RUMCommand::default();
142 /// command.path = RUMString::from(command_name);
143 ///
144 /// let mut sys_command = pipeline_generate_command(&command);
145 ///
146 /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
147 ///
148 /// process.wait();
149 /// ```
150 ///
151 pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
152 match cmd.spawn() {
153 Ok(process) => Ok(process),
154 Err(e) => Err(rumtk_format!(
155 "Failed to spawn process {:?} because => {}",
156 cmd.get_program(),
157 e
158 )),
159 }
160 }
161
162 ///
163 /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
164 /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
165 ///
166 /// ## Example
167 ///
168 /// ```
169 /// use std::any::{Any, TypeId};
170 /// use std::process::Stdio;
171 ///
172 /// use rumtk_core::strings::RUMString;
173 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
174 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process};
175 ///
176 /// let ls_name = "ls";
177 /// let mut ls_command = RUMCommand::default();
178 /// ls_command.path = RUMString::from(ls_name);
179 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
180 ///
181 /// let wc_name = "wc";
182 /// let mut wc_command = RUMCommand::default();
183 /// wc_command.path = RUMString::from(wc_name);
184 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
185 ///
186 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
187 /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
188 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
189 ///
190 /// sys_ls_process.wait();
191 /// sys_wc_process.wait();
192 /// ```
193 ///
194 pub fn pipeline_pipe_process(
195 process: &mut RUMPipelineProcess,
196 piped: &mut RUMPipelineCommand,
197 ) -> RUMResult<()> {
198 let process_stdout = Stdio::from(match process.stdout.take() {
199 Some(stdout) => stdout,
200 None => {
201 return Err(rumtk_format!(
202 "No stdout handle found for process {}.",
203 process.id()
204 ));
205 }
206 });
207 let _ = piped.stdin(process_stdout);
208 Ok(())
209 }
210
211 ///
212 /// Retrieves the standard output generated by the completed process.
213 ///
214 /// ## Example
215 ///
216 /// ```
217 /// use std::any::{Any, TypeId};
218 /// use std::process::Stdio;
219 ///
220 /// use rumtk_core::strings::RUMString;
221 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
222 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process, pipeline_get_stdout};
223 ///
224 /// let ls_name = "ls";
225 /// let mut ls_command = RUMCommand::default();
226 /// ls_command.path = RUMString::from(ls_name);
227 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
228 ///
229 /// let wc_name = "wc";
230 /// let mut wc_command = RUMCommand::default();
231 /// wc_command.path = RUMString::from(wc_name);
232 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
233 ///
234 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
235 /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
236 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
237 ///
238 /// sys_ls_process.wait();
239 /// sys_wc_process.wait();
240 ///
241 /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
242 ///
243 /// let out_data = pipeline_get_stdout(pipeline).unwrap();
244 ///
245 /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
246 /// ```
247 ///
248 pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
249 let mut last_item = pipeline.pop().unwrap();
250 match last_item.wait_with_output() {
251 Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
252 Err(e) => Err(rumtk_format!(
253 "Issue reading last process output because => {}",
254 e
255 )),
256 }
257 }
258
259 pub fn pipeline_connect_processes<'a>(
260 root: &'a mut RUMPipelineCommand,
261 piped: &'a mut RUMPipelineCommand,
262 ) -> RUMResult<RUMPipelineProcess> {
263 let mut root_process = pipeline_spawn_process(root)?;
264 pipeline_pipe_process(&mut root_process, piped)?;
265 Ok(root_process)
266 }
267
268 ///
269 /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
270 ///
271 /// ## Example
272 ///
273 /// ```
274 /// use rumtk_core::strings::RUMString;
275 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
276 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
277 ///
278 /// let ls_name = "ls";
279 /// let mut ls_command = RUMCommand::default();
280 /// ls_command.path = RUMString::from(ls_name);
281 ///
282 /// let wc_name = "wc";
283 /// let mut wc_command = RUMCommand::default();
284 /// wc_command.path = RUMString::from(wc_name);
285 ///
286 /// let commands = vec![
287 /// ls_command,
288 /// wc_command
289 /// ];
290 ///
291 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
292 ///
293 /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
294 /// ```
295 ///
296 pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
297 let mut root = pipeline_generate_command(commands.first().unwrap());
298 let mut parent_process;
299
300 // Setup pipeline
301 let mut pipeline = vec![];
302 root.stdin(Stdio::piped()).stdout(Stdio::piped());
303
304 for cmd in commands.iter().skip(1) {
305 let mut new_root = pipeline_generate_command(cmd);
306 parent_process = pipeline_connect_processes(&mut root, &mut new_root)?;
307 pipeline.push(parent_process);
308 root = new_root;
309 }
310
311 pipeline.push(pipeline_spawn_process(&mut root)?);
312
313 Ok(pipeline)
314 }
315
316 ///
317 /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
318 /// consume the pipeline and return the output.
319 ///
320 /// ## Example
321 ///
322 /// ```
323 /// use rumtk_core::strings::RUMString;
324 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
325 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
326 /// use rumtk_core::{rumtk_resolve_task, rumtk_init_threads};
327 ///
328 /// let ls_name = "ls";
329 /// let mut ls_command = RUMCommand::default();
330 /// ls_command.path = RUMString::from(ls_name);
331 ///
332 /// let wc_name = "wc";
333 /// let mut wc_command = RUMCommand::default();
334 /// wc_command.path = RUMString::from(wc_name);
335 ///
336 /// let commands = vec![
337 /// ls_command,
338 /// wc_command
339 /// ];
340 ///
341 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
342 /// let rt = rumtk_init_threads!(&5);
343 /// let result = rumtk_resolve_task!(rt, pipeline_await_pipeline(pipeline)).unwrap();
344 ///
345 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
346 /// ```
347 ///
348 pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
349 for p in pipeline.iter_mut() {
350 loop {
351 match p.try_wait() {
352 Ok(code) => match code {
353 Some(code) => {
354 if !code.success() {
355 return Err(rumtk_format!(
356 "Process {} exited with non-success code => {}!",
357 p.id(),
358 code
359 ));
360 }
361 break;
362 }
363 None => {
364 async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
365 continue;
366 }
367 },
368 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
369 };
370 }
371 }
372
373 let result = pipeline_get_stdout(pipeline)?;
374 Ok(result)
375 }
376
377 ///
378 /// Await for pipeline to complete execution. Once the pipeline execution ends,
379 /// consume the pipeline and return the output.
380 ///
381 /// ## Example
382 ///
383 /// ```
384 /// use rumtk_core::strings::RUMString;
385 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
386 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
387 ///
388 /// let ls_name = "ls";
389 /// let mut ls_command = RUMCommand::default();
390 /// ls_command.path = RUMString::from(ls_name);
391 ///
392 /// let wc_name = "wc";
393 /// let mut wc_command = RUMCommand::default();
394 /// wc_command.path = RUMString::from(wc_name);
395 ///
396 /// let commands = vec![
397 /// ls_command,
398 /// wc_command
399 /// ];
400 ///
401 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
402 /// let result = pipeline_wait_pipeline(pipeline).unwrap();
403 ///
404 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
405 /// ```
406 ///
407 pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
408 for p in pipeline.iter_mut() {
409 match p.wait() {
410 Ok(code) => {
411 if !code.success() {
412 return Err(rumtk_format!(
413 "Process {} exited with non-success code => {}!",
414 p.id(),
415 code
416 ));
417 }
418 break;
419 }
420 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
421 };
422 }
423
424 let result = pipeline_get_stdout(pipeline)?;
425 Ok(result)
426 }
427}
428
429pub mod pipeline_macros {
430
431 ///
432 /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
433 /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
434 ///
435 /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
436 /// key value pair strings that we can use to update the process environment.
437 ///
438 /// ## Example
439 ///
440 /// ### Program Only
441 ///
442 /// ```
443 /// use rumtk_core::rumtk_pipeline_command;
444 ///
445 /// let command = rumtk_pipeline_command!("ls");
446 /// ```
447 ///
448 /// ### Program with Args
449 ///
450 /// ```
451 /// use rumtk_core::rumtk_pipeline_command;
452 /// use rumtk_core::strings::RUMStringConversions;
453 ///
454 /// let command = rumtk_pipeline_command!("ls", &vec![
455 /// "-l".to_rumstring()
456 /// ]);
457 /// ```
458 ///
459 #[macro_export]
460 macro_rules! rumtk_pipeline_command {
461 ( $path:expr, $args:expr, $env:expr ) => {{
462 use $crate::pipelines::pipeline_types::RUMCommand;
463
464 RUMCommand::new($path, $args, $env)
465 }};
466 ( $path:expr, $args:expr ) => {{
467 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
468
469 RUMCommand::new($path, $args, &RUMCommandEnv::default())
470 }};
471 ( $path:expr ) => {{
472 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
473
474 RUMCommand::new($path, &RUMCommandArgs::default(), &RUMCommandEnv::default())
475 }};
476 }
477
478 ///
479 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
480 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
481 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
482 ///
483 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
484 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
485 ///
486 /// ## Example
487 ///
488 /// ```
489 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
490 /// use rumtk_core::core::RUMResult;
491 /// use rumtk_core::strings::RUMStringConversions;
492 ///
493 /// let f = || -> RUMResult<()> {
494 /// let result = rumtk_pipeline_run!(
495 /// rumtk_pipeline_command!("ls"),
496 /// rumtk_pipeline_command!("wc")
497 /// )?;
498 ///
499 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
500 /// Ok(())
501 /// };
502 ///
503 /// f().unwrap();
504 /// ```
505 ///
506 #[macro_export]
507 macro_rules! rumtk_pipeline_run {
508 ( $($command:expr),+ ) => {{
509 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
510
511 let pipeline = pipeline_generate_pipeline(&vec![
512 $($command),+
513 ])?;
514
515 pipeline_wait_pipeline(pipeline)
516 }};
517 }
518
519 ///
520 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
521 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
522 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
523 ///
524 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
525 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
526 ///
527 /// This is the `async` flavor.
528 ///
529 /// ## Example
530 ///
531 /// ```
532 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
533 /// use rumtk_core::core::RUMResult;
534 /// use rumtk_core::strings::RUMStringConversions;
535 ///
536 /// let f = async || -> RUMResult<()> {
537 /// let result = rumtk_pipeline_run_async!(
538 /// rumtk_pipeline_command!("ls"),
539 /// rumtk_pipeline_command!("wc")
540 /// )?;
541 ///
542 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
543 /// Ok(())
544 /// };
545 ///
546 /// let rt = rumtk_init_threads!(&5);
547 /// rumtk_resolve_task!(rt, f()).unwrap();
548 /// ```
549 ///
550 #[macro_export]
551 macro_rules! rumtk_pipeline_run_async {
552 ( $($command:expr),+ ) => {{
553 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
554
555 let pipeline = pipeline_generate_pipeline(&vec![
556 $($command),+
557 ])?;
558
559 pipeline_await_pipeline(pipeline).await
560 }};
561 }
562}