rumtk_core/pipelines.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2026 Luis M. Santos, M.D.
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025 Luis M. Santos, M.D. <lsantos@medicalmasses.com>
24 * Copyright (C) 2025 MedicalMasses L.L.C. <contact@medicalmasses.com>
25 *
26 * This program is free software: you can redistribute it and/or modify
27 * it under the terms of the GNU General Public License as published by
28 * the Free Software Foundation, either version 3 of the License, or
29 * (at your option) any later version.
30 *
31 * This program is distributed in the hope that it will be useful,
32 * but WITHOUT ANY WARRANTY; without even the implied warranty of
33 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
34 * GNU General Public License for more details.
35 *
36 * You should have received a copy of the GNU General Public License
37 * along with this program. If not, see <https://www.gnu.org/licenses/>.
38 */
39
40pub mod pipeline_types {
41 use crate::strings::{RUMString, RUMStringConversions};
42 use crate::types::{RUMBuffer, RUMHashMap};
43
44 use crate::core::{RUMResult, RUMVec};
45 use std::process::{Child, Command};
46
47 pub type RUMCommandArgs = Vec<RUMString>;
48 pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
49 #[derive(Default, Debug, Clone)]
50 pub struct RUMCommand {
51 pub path: RUMString,
52 pub data: Option<RUMBuffer>,
53 pub args: RUMCommandArgs,
54 pub env: RUMCommandEnv,
55 }
56
57 impl RUMCommand {
58 pub fn new(
59 prog: &str,
60 data: &Option<RUMBuffer>,
61 args: &RUMCommandArgs,
62 env: &RUMCommandEnv,
63 ) -> Self {
64 RUMCommand {
65 path: prog.to_rumstring(),
66 args: args.clone(),
67 env: env.clone(),
68 data: data.clone(),
69 }
70 }
71 }
72
73 pub type RUMCommandLine = RUMVec<RUMCommand>;
74 pub type RUMPipelineCommand = Command;
75 pub type RUMPipelineProcess = Child;
76 pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
77 pub type RUMPipelineResult = RUMResult<RUMBuffer>;
78}
79
80pub mod pipeline_functions {
81 use super::pipeline_types::*;
82 use crate::core::RUMResult;
83 use crate::strings::rumtk_format;
84 use std::io::{Read, Write};
85
86 use crate::threading::threading_functions::async_sleep;
87 use crate::types::RUMBuffer;
88 use std::process::{Command, Stdio};
89
90 const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
91 const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
92
93 ///
94 /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
95 /// runtime can use to spawn a process.
96 ///
97 /// ## Example
98 ///
99 /// ```
100 /// use std::any::{Any, TypeId};
101 ///
102 /// use rumtk_core::strings::RUMString;
103 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
104 /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
105 ///
106 /// let command_name = "ls";
107 /// let mut command = RUMCommand::default();
108 /// command.path = RUMString::from(command_name);
109 ///
110 /// let sys_command = pipeline_generate_command(&command);
111 ///
112 /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
113 ///
114 /// ```
115 ///
116 pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
117 let mut cmd = Command::new(command.path.as_str());
118
119 for arg in command.args.iter() {
120 cmd.arg(arg);
121 }
122
123 cmd.envs(command.env.iter());
124
125 cmd.stdin(Stdio::piped())
126 .stdout(Stdio::piped())
127 .stderr(Stdio::piped());
128
129 cmd
130 }
131
132 ///
133 /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
134 ///
135 /// ## Example
136 ///
137 /// ```
138 /// use std::any::{Any, TypeId};
139 ///
140 /// use rumtk_core::strings::RUMString;
141 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
142 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
143 ///
144 /// let command_name = "ls";
145 /// let mut command = RUMCommand::default();
146 /// command.path = RUMString::from(command_name);
147 ///
148 /// let mut sys_command = pipeline_generate_command(&command);
149 ///
150 /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
151 ///
152 /// process.wait();
153 /// ```
154 ///
155 pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
156 match cmd.spawn() {
157 Ok(process) => Ok(process),
158 Err(e) => Err(rumtk_format!(
159 "Failed to spawn process {:?} because => {}",
160 cmd.get_program(),
161 e
162 )),
163 }
164 }
165
166 ///
167 /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
168 /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
169 ///
170 /// ## Example
171 ///
172 /// ```
173 /// use std::any::{Any, TypeId};
174 /// use std::process::Stdio;
175 ///
176 /// use rumtk_core::strings::RUMString;
177 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
178 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_processes, pipeline_spawn_process};
179 ///
180 /// let ls_name = "ls";
181 /// let mut ls_command = RUMCommand::default();
182 /// ls_command.path = RUMString::from(ls_name);
183 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
184 ///
185 /// let wc_name = "wc";
186 /// let mut wc_command = RUMCommand::default();
187 /// wc_command.path = RUMString::from(wc_name);
188 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
189 ///
190 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
191 /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
192 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
193 ///
194 /// sys_ls_process.wait();
195 /// sys_wc_process.wait();
196 /// ```
197 ///
198 pub fn pipeline_pipe_processes(
199 process: &mut RUMPipelineProcess,
200 piped: &mut RUMPipelineCommand,
201 ) -> RUMResult<()> {
202 let process_stdout = Stdio::from(match process.stdout.take() {
203 Some(stdout) => stdout,
204 None => {
205 return Err(rumtk_format!(
206 "No stdout handle found for process {}.",
207 process.id()
208 ));
209 }
210 });
211 let _ = piped.stdin(process_stdout);
212 Ok(())
213 }
214
215 ///
216 /// Retrieves the standard output generated by the completed process.
217 ///
218 /// ## Example
219 ///
220 /// ```
221 /// use std::any::{Any, TypeId};
222 /// use std::process::Stdio;
223 ///
224 /// use rumtk_core::strings::RUMString;
225 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
226 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process, pipeline_get_stdout, pipeline_pipe_processes};
227 ///
228 /// let ls_name = "ls";
229 /// let mut ls_command = RUMCommand::default();
230 /// ls_command.path = RUMString::from(ls_name);
231 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
232 ///
233 /// let wc_name = "wc";
234 /// let mut wc_command = RUMCommand::default();
235 /// wc_command.path = RUMString::from(wc_name);
236 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
237 ///
238 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
239 /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
240 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
241 ///
242 /// sys_ls_process.wait();
243 /// sys_wc_process.wait();
244 ///
245 /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
246 ///
247 /// let out_data = pipeline_get_stdout(pipeline).unwrap();
248 ///
249 /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
250 /// ```
251 ///
252 pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
253 let mut last_item = pipeline.pop().unwrap();
254 match last_item.wait_with_output() {
255 Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
256 Err(e) => Err(rumtk_format!(
257 "Issue reading last process output because => {}",
258 e
259 )),
260 }
261 }
262
263 ///
264 /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
265 ///
266 /// ## Example
267 ///
268 /// ```
269 /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
270 /// use rumtk_core::strings::RUMString;
271 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
272 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
273 /// use rumtk_core::types::RUMBuffer;
274 ///
275 /// let ls_name = "ls";
276 /// let mut ls_command = RUMCommand::default();
277 /// ls_command.path = RUMString::from(ls_name);
278 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
279 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
280 ///
281 /// pipeline_close_process_stdin(&mut sys_ls_process);
282 ///
283 ///
284 /// ```
285 ///
286 pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
287 // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
288 match process.stdin.take() {
289 Some(stdin) => {
290 drop(stdin);
291 }
292 None => {}
293 };
294 }
295
296 ///
297 /// Pipe data into a process.
298 ///
299 /// ## Example
300 ///
301 /// ```
302 /// use rumtk_core::strings::RUMString;
303 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
304 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
305 /// use rumtk_core::types::RUMBuffer;
306 ///
307 /// let ls_name = "ls";
308 /// let mut ls_command = RUMCommand::default();
309 /// ls_command.path = RUMString::from(ls_name);
310 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
311 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
312 /// pipeline_pipe_into_process(&mut sys_ls_process, &Some(RUMBuffer::default())).unwrap();
313 ///
314 /// let out = sys_ls_process.wait_with_output().unwrap();
315 ///
316 /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
317 /// ```
318 ///
319 pub fn pipeline_pipe_into_process(
320 process: &mut RUMPipelineProcess,
321 data: &Option<RUMBuffer>,
322 ) -> RUMResult<()> {
323 match data {
324 Some(data) => match process.stdin {
325 Some(ref mut stdin) => match stdin.write_all(&data) {
326 Ok(_) => {}
327 Err(e) => {
328 return Err(rumtk_format!(
329 "Failed to pipe data to stdin of process because => {}",
330 e
331 ))
332 }
333 },
334 None => {}
335 },
336 None => {}
337 }
338 Ok(())
339 }
340
341 ///
342 /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
343 ///
344 /// ## Example
345 ///
346 /// ```
347 /// use rumtk_core::strings::RUMString;
348 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
349 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
350 ///
351 /// let ls_name = "ls";
352 /// let mut ls_command = RUMCommand::default();
353 /// ls_command.path = RUMString::from(ls_name);
354 ///
355 /// let wc_name = "wc";
356 /// let mut wc_command = RUMCommand::default();
357 /// wc_command.path = RUMString::from(wc_name);
358 ///
359 /// let commands = vec![
360 /// ls_command,
361 /// wc_command
362 /// ];
363 ///
364 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
365 ///
366 /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
367 /// ```
368 ///
369 pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
370 let first_command = commands.first().unwrap();
371
372 // Setup pipeline
373 let mut pipeline = vec![];
374
375 //Bootstrap first process in chain
376 let mut root = pipeline_generate_command(&first_command);
377 let mut parent_process = pipeline_spawn_process(&mut root)?;
378 pipeline_pipe_into_process(&mut parent_process, &mut first_command.data.clone())?;
379 pipeline.push(parent_process);
380
381 for cmd in commands.iter().skip(1) {
382 let mut new_root = pipeline_generate_command(cmd);
383 pipeline_pipe_processes(pipeline.last_mut().unwrap(), &mut new_root)?;
384 parent_process = pipeline_spawn_process(&mut new_root)?;
385 pipeline.push(parent_process);
386 }
387
388 Ok(pipeline)
389 }
390
391 ///
392 /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
393 /// consume the pipeline and return the output.
394 ///
395 /// ## Example
396 ///
397 /// ```
398 /// use rumtk_core::strings::RUMString;
399 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
400 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
401 /// use rumtk_core::{rumtk_resolve_task};
402 ///
403 /// let ls_name = "ls";
404 /// let mut ls_command = RUMCommand::default();
405 /// ls_command.path = RUMString::from(ls_name);
406 ///
407 /// let wc_name = "wc";
408 /// let mut wc_command = RUMCommand::default();
409 /// wc_command.path = RUMString::from(wc_name);
410 ///
411 /// let commands = vec![
412 /// ls_command,
413 /// wc_command
414 /// ];
415 ///
416 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
417 /// let result = rumtk_resolve_task!(pipeline_await_pipeline(pipeline)).unwrap();
418 ///
419 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
420 /// ```
421 ///
422 pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
423 // Let's make sure the stdin is closed on the first process to make sure it exits instead of
424 // remain waiting for EOF in the stdin stream.
425 pipeline_close_process_stdin(pipeline.first_mut().unwrap());
426
427 // Now let's visit each process and await their completion!
428 for p in pipeline.iter_mut() {
429 loop {
430 match p.try_wait() {
431 Ok(code) => match code {
432 Some(code) => {
433 if !code.success() {
434 return Err(rumtk_format!(
435 "Process {} exited with non-success code => {}!",
436 p.id(),
437 code
438 ));
439 }
440 break;
441 }
442 None => {
443 async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
444 continue;
445 }
446 },
447 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
448 };
449 }
450 }
451
452 let result = pipeline_get_stdout(pipeline)?;
453 Ok(result)
454 }
455
456 ///
457 /// Await for pipeline to complete execution. Once the pipeline execution ends,
458 /// consume the pipeline and return the output.
459 ///
460 /// ## Example
461 ///
462 /// ```
463 /// use rumtk_core::strings::RUMString;
464 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
465 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
466 ///
467 /// let ls_name = "ls";
468 /// let mut ls_command = RUMCommand::default();
469 /// ls_command.path = RUMString::from(ls_name);
470 ///
471 /// let wc_name = "wc";
472 /// let mut wc_command = RUMCommand::default();
473 /// wc_command.path = RUMString::from(wc_name);
474 ///
475 /// let commands = vec![
476 /// ls_command,
477 /// wc_command
478 /// ];
479 ///
480 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
481 /// let result = pipeline_wait_pipeline(pipeline).unwrap();
482 ///
483 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
484 /// ```
485 ///
486 pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
487 // Let's make sure the stdin is closed on the first process to make sure it exits instead of
488 // remain waiting for EOF in the stdin stream.
489 pipeline_close_process_stdin(pipeline.first_mut().unwrap());
490
491 // Now let's visit each process and await their completion!
492 for p in pipeline.iter_mut() {
493 match p.wait() {
494 Ok(code) => {
495 if !code.success() {
496 return Err(rumtk_format!(
497 "Process {} exited with non-success code => {}!",
498 p.id(),
499 code
500 ));
501 }
502 continue;
503 }
504 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
505 };
506 }
507
508 let result = pipeline_get_stdout(pipeline)?;
509 Ok(result)
510 }
511}
512
513pub mod pipeline_macros {
514 ///
515 /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
516 /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
517 ///
518 /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
519 /// key value pair strings that we can use to update the process environment.
520 ///
521 /// ## Example
522 ///
523 /// ### Program Only
524 ///
525 /// ```
526 /// use rumtk_core::rumtk_pipeline_command;
527 ///
528 /// let command = rumtk_pipeline_command!("ls");
529 /// ```
530 ///
531 /// ### Program with Piped Data
532 ///
533 /// ```
534 /// use rumtk_core::rumtk_pipeline_command;
535 /// use rumtk_core::types::RUMBuffer;
536 /// use rumtk_core::strings::RUMStringConversions;
537 ///
538 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
539 /// ```
540 ///
541 /// ### Program with Args
542 ///
543 /// ```
544 /// use rumtk_core::rumtk_pipeline_command;
545 /// use rumtk_core::types::RUMBuffer;
546 /// use rumtk_core::strings::RUMStringConversions;
547 ///
548 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
549 /// "-l".to_rumstring()
550 /// ]);
551 /// ```
552 ///
553 #[macro_export]
554 macro_rules! rumtk_pipeline_command {
555 ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
556 use $crate::pipelines::pipeline_types::RUMCommand;
557
558 RUMCommand::new($path, &Some($data), $args, $env)
559 }};
560 ( $path:expr, $data:expr, $args:expr ) => {{
561 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
562
563 RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
564 }};
565 ( $path:expr, $data:expr ) => {{
566 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
567
568 RUMCommand::new(
569 $path,
570 &Some($data),
571 &RUMCommandArgs::default(),
572 &RUMCommandEnv::default(),
573 )
574 }};
575 ( $path:expr ) => {{
576 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
577 use $crate::types::RUMBuffer;
578
579 RUMCommand::new(
580 $path,
581 &None,
582 &RUMCommandArgs::default(),
583 &RUMCommandEnv::default(),
584 )
585 }};
586 }
587
588 ///
589 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
590 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
591 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
592 ///
593 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
594 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
595 ///
596 /// ## Example
597 ///
598 /// ### Simple
599 ///
600 /// ```
601 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
602 /// use rumtk_core::core::{RUMResult};
603 /// use rumtk_core::strings::RUMStringConversions;
604 /// use rumtk_core::types::RUMBuffer;
605 ///
606 /// let f = async || -> RUMResult<()> {
607 /// let result = rumtk_pipeline_run!(
608 /// rumtk_pipeline_command!("ls"),
609 /// rumtk_pipeline_command!("wc")
610 /// ).unwrap();
611 ///
612 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
613 /// Ok(())
614 /// };
615 ///
616 /// rumtk_resolve_task!(f()).unwrap();
617 /// ```
618 ///
619 /// ### With Buffer Piped In
620 ///
621 /// ```
622 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
623 /// use rumtk_core::core::{RUMResult, new_random_buffer};
624 /// use rumtk_core::strings::RUMStringConversions;
625 /// use rumtk_core::types::RUMBuffer;
626 ///
627 /// let f = || -> RUMResult<()> {
628 /// let result = rumtk_pipeline_run!(
629 /// rumtk_pipeline_command!("ls", new_random_buffer()),
630 /// rumtk_pipeline_command!("wc")
631 /// )?;
632 ///
633 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
634 /// Ok(())
635 /// };
636 ///
637 /// f().unwrap();
638 /// ```
639 ///
640 #[macro_export]
641 macro_rules! rumtk_pipeline_run {
642 ( $($command:expr),+ ) => {{
643 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
644
645 let pipeline = pipeline_generate_pipeline(&vec![
646 $($command),+
647 ])?;
648
649 pipeline_wait_pipeline(pipeline)
650 }};
651 }
652
653 ///
654 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
655 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
656 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
657 ///
658 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
659 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
660 ///
661 /// This is the `async` flavor.
662 ///
663 /// ## Example
664 ///
665 /// ### Simple
666 ///
667 /// ```
668 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
669 /// use rumtk_core::core::{RUMResult};
670 /// use rumtk_core::strings::RUMStringConversions;
671 /// use rumtk_core::types::RUMBuffer;
672 ///
673 /// let f = async || -> RUMResult<()> {
674 /// let result = rumtk_pipeline_run_async!(
675 /// rumtk_pipeline_command!("ls"),
676 /// rumtk_pipeline_command!("wc")
677 /// ).await?;
678 ///
679 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
680 /// Ok(())
681 /// };
682 ///
683 /// rumtk_resolve_task!(f()).unwrap();
684 /// ```
685 ///
686 /// ### With Buffer Piped In
687 ///
688 /// ```
689 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
690 /// use rumtk_core::core::{RUMResult, new_random_buffer};
691 /// use rumtk_core::strings::RUMStringConversions;
692 /// use rumtk_core::types::RUMBuffer;
693 ///
694 /// let f = async || -> RUMResult<()> {
695 /// let result = rumtk_pipeline_run_async!(
696 /// rumtk_pipeline_command!("wc", new_random_buffer())
697 /// ).await?;
698 ///
699 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
700 /// Ok(())
701 /// };
702 ///
703 /// rumtk_resolve_task!(f()).unwrap();
704 /// ```
705 ///
706 /// ### With Buffer Piped In W/ Return
707 ///
708 /// ```
709 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
710 /// use rumtk_core::core::{RUMResult, new_random_buffer};
711 /// use rumtk_core::strings::{RUMString, RUMStringConversions, RUMArrayConversions};
712 /// use rumtk_core::types::RUMBuffer;
713 ///
714 /// let expected = "1024\n";
715 ///
716 /// let f = async || -> RUMResult<RUMBuffer> {
717 /// let result = rumtk_pipeline_run_async!(
718 /// rumtk_pipeline_command!("wc", new_random_buffer())
719 /// ).await?;
720 ///
721 /// Ok(result)
722 /// };
723 ///
724 /// let result = rumtk_resolve_task!(f()).unwrap();
725 /// let string = result.to_vec().to_rumstring();
726 /// let result_buffer_size = string.split(" ").last().unwrap().split(" ").last().unwrap().to_rumstring();
727 ///
728 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
729 /// assert_eq!(&result_buffer_size, expected, "Pipeline returned an unexpected result from command wc! => {:?}\nvs.\n{:?}", &result_buffer_size, &expected);
730 /// ```
731 ///
732 #[macro_export]
733 macro_rules! rumtk_pipeline_run_async {
734 ( $($command:expr),+ ) => {{
735 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
736
737 let pipeline = pipeline_generate_pipeline(&vec![
738 $($command),+
739 ])?;
740
741 pipeline_await_pipeline(pipeline)
742 }};
743 }
744}