rumtk_core/pipelines.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2026 Luis M. Santos, M.D.
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025 Luis M. Santos, M.D.
24 * Copyright (C) 2025 Nick Stephenson
25 * Copyright (C) 2025 Ethan Dixon
26 * Copyright (C) 2025 MedicalMasses L.L.C.
27 *
28 * This library is free software; you can redistribute it and/or
29 * modify it under the terms of the GNU Lesser General Public
30 * License as published by the Free Software Foundation; either
31 * version 2.1 of the License, or (at your option) any later version.
32 *
33 * This library is distributed in the hope that it will be useful,
34 * but WITHOUT ANY WARRANTY; without even the implied warranty of
35 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
36 * Lesser General Public License for more details.
37 *
38 * You should have received a copy of the GNU Lesser General Public
39 * License along with this library; if not, write to the Free Software
40 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
41 */
42
43pub mod pipeline_types {
44 use crate::strings::{RUMString, RUMStringConversions};
45 use crate::types::{RUMBuffer, RUMHashMap};
46
47 use crate::core::{RUMResult, RUMVec};
48 use std::process::{Child, Command};
49
50 pub type RUMCommandArgs = Vec<RUMString>;
51 pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
52 #[derive(Default, Debug, Clone)]
53 pub struct RUMCommand {
54 pub path: RUMString,
55 pub data: Option<RUMBuffer>,
56 pub args: RUMCommandArgs,
57 pub env: RUMCommandEnv,
58 }
59
60 impl RUMCommand {
61 pub fn new(
62 prog: &str,
63 data: &Option<RUMBuffer>,
64 args: &RUMCommandArgs,
65 env: &RUMCommandEnv,
66 ) -> Self {
67 RUMCommand {
68 path: prog.to_rumstring(),
69 args: args.clone(),
70 env: env.clone(),
71 data: data.clone(),
72 }
73 }
74 }
75
76 pub type RUMCommandLine = RUMVec<RUMCommand>;
77 pub type RUMPipelineCommand = Command;
78 pub type RUMPipelineProcess = Child;
79 pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
80 pub type RUMPipelineResult = RUMResult<RUMBuffer>;
81}
82
83pub mod pipeline_functions {
84 use super::pipeline_types::*;
85 use crate::core::RUMResult;
86 use crate::strings::rumtk_format;
87 use std::io::{Read, Write};
88
89 use crate::threading::threading_functions::async_sleep;
90 use crate::types::RUMBuffer;
91 use std::process::{Command, Stdio};
92
93 const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
94 const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
95
96 ///
97 /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
98 /// runtime can use to spawn a process.
99 ///
100 /// ## Example
101 ///
102 /// ```
103 /// use std::any::{Any, TypeId};
104 ///
105 /// use rumtk_core::strings::RUMString;
106 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
107 /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
108 ///
109 /// let command_name = "ls";
110 /// let mut command = RUMCommand::default();
111 /// command.path = RUMString::from(command_name);
112 ///
113 /// let sys_command = pipeline_generate_command(&command);
114 ///
115 /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
116 ///
117 /// ```
118 ///
119 pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
120 let mut cmd = Command::new(command.path.as_str());
121
122 for arg in command.args.iter() {
123 cmd.arg(arg);
124 }
125
126 cmd.envs(command.env.iter());
127
128 cmd.stdin(Stdio::piped())
129 .stdout(Stdio::piped())
130 .stderr(Stdio::piped());
131
132 cmd
133 }
134
135 ///
136 /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
137 ///
138 /// ## Example
139 ///
140 /// ```
141 /// use std::any::{Any, TypeId};
142 ///
143 /// use rumtk_core::strings::RUMString;
144 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
145 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
146 ///
147 /// let command_name = "ls";
148 /// let mut command = RUMCommand::default();
149 /// command.path = RUMString::from(command_name);
150 ///
151 /// let mut sys_command = pipeline_generate_command(&command);
152 ///
153 /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
154 ///
155 /// process.wait();
156 /// ```
157 ///
158 pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
159 match cmd.spawn() {
160 Ok(process) => Ok(process),
161 Err(e) => Err(rumtk_format!(
162 "Failed to spawn process {:?} because => {}",
163 cmd.get_program(),
164 e
165 )),
166 }
167 }
168
169 ///
170 /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
171 /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
172 ///
173 /// ## Example
174 ///
175 /// ```
176 /// use std::any::{Any, TypeId};
177 /// use std::process::Stdio;
178 ///
179 /// use rumtk_core::strings::RUMString;
180 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
181 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process};
182 ///
183 /// let ls_name = "ls";
184 /// let mut ls_command = RUMCommand::default();
185 /// ls_command.path = RUMString::from(ls_name);
186 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
187 ///
188 /// let wc_name = "wc";
189 /// let mut wc_command = RUMCommand::default();
190 /// wc_command.path = RUMString::from(wc_name);
191 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
192 ///
193 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
194 /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
195 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
196 ///
197 /// sys_ls_process.wait();
198 /// sys_wc_process.wait();
199 /// ```
200 ///
201 pub fn pipeline_pipe_process(
202 process: &mut RUMPipelineProcess,
203 piped: &mut RUMPipelineCommand,
204 ) -> RUMResult<()> {
205 let process_stdout = Stdio::from(match process.stdout.take() {
206 Some(stdout) => stdout,
207 None => {
208 return Err(rumtk_format!(
209 "No stdout handle found for process {}.",
210 process.id()
211 ));
212 }
213 });
214 let _ = piped.stdin(process_stdout);
215 Ok(())
216 }
217
218 ///
219 /// Retrieves the standard output generated by the completed process.
220 ///
221 /// ## Example
222 ///
223 /// ```
224 /// use std::any::{Any, TypeId};
225 /// use std::process::Stdio;
226 ///
227 /// use rumtk_core::strings::RUMString;
228 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
229 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process, pipeline_get_stdout};
230 ///
231 /// let ls_name = "ls";
232 /// let mut ls_command = RUMCommand::default();
233 /// ls_command.path = RUMString::from(ls_name);
234 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
235 ///
236 /// let wc_name = "wc";
237 /// let mut wc_command = RUMCommand::default();
238 /// wc_command.path = RUMString::from(wc_name);
239 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
240 ///
241 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
242 /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
243 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
244 ///
245 /// sys_ls_process.wait();
246 /// sys_wc_process.wait();
247 ///
248 /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
249 ///
250 /// let out_data = pipeline_get_stdout(pipeline).unwrap();
251 ///
252 /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
253 /// ```
254 ///
255 pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
256 let mut last_item = pipeline.pop().unwrap();
257 match last_item.wait_with_output() {
258 Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
259 Err(e) => Err(rumtk_format!(
260 "Issue reading last process output because => {}",
261 e
262 )),
263 }
264 }
265
266 ///
267 /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
268 ///
269 /// ## Example
270 ///
271 /// ```
272 /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
273 /// use rumtk_core::strings::RUMString;
274 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
275 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
276 /// use rumtk_core::types::RUMBuffer;
277 ///
278 /// let ls_name = "ls";
279 /// let mut ls_command = RUMCommand::default();
280 /// ls_command.path = RUMString::from(ls_name);
281 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
282 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
283 ///
284 /// pipeline_close_process_stdin(&mut sys_ls_process);
285 ///
286 ///
287 /// ```
288 ///
289 pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
290 // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
291 match process.stdin.take() {
292 Some(stdin) => {
293 drop(stdin);
294 }
295 None => {}
296 };
297 }
298
299 ///
300 /// Pipe data into a process.
301 ///
302 /// ## Example
303 ///
304 /// ```
305 /// use rumtk_core::strings::RUMString;
306 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
307 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
308 /// use rumtk_core::types::RUMBuffer;
309 ///
310 /// let ls_name = "ls";
311 /// let mut ls_command = RUMCommand::default();
312 /// ls_command.path = RUMString::from(ls_name);
313 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
314 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
315 /// pipeline_pipe_into_process(&mut sys_ls_process, &Some(RUMBuffer::default())).unwrap();
316 ///
317 /// let out = sys_ls_process.wait_with_output().unwrap();
318 ///
319 /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
320 /// ```
321 ///
322 pub fn pipeline_pipe_into_process(
323 process: &mut RUMPipelineProcess,
324 data: &Option<RUMBuffer>,
325 ) -> RUMResult<()> {
326 match data {
327 Some(data) => match process.stdin {
328 Some(ref mut stdin) => match stdin.write_all(&data) {
329 Ok(_) => {}
330 Err(e) => {
331 return Err(rumtk_format!(
332 "Failed to pipe data to stdin of process because => {}",
333 e
334 ))
335 }
336 },
337 None => {}
338 },
339 None => {}
340 }
341 Ok(())
342 }
343
344 ///
345 /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
346 ///
347 /// ## Example
348 ///
349 /// ```
350 /// use rumtk_core::strings::RUMString;
351 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
352 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
353 ///
354 /// let ls_name = "ls";
355 /// let mut ls_command = RUMCommand::default();
356 /// ls_command.path = RUMString::from(ls_name);
357 ///
358 /// let wc_name = "wc";
359 /// let mut wc_command = RUMCommand::default();
360 /// wc_command.path = RUMString::from(wc_name);
361 ///
362 /// let commands = vec![
363 /// ls_command,
364 /// wc_command
365 /// ];
366 ///
367 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
368 ///
369 /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
370 /// ```
371 ///
372 pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
373 let first_command = commands.first().unwrap();
374
375 // Setup pipeline
376 let mut pipeline = vec![];
377
378 //Bootstrap first process in chain
379 let mut root = pipeline_generate_command(&first_command);
380 let mut parent_process = pipeline_spawn_process(&mut root)?;
381 pipeline_pipe_into_process(&mut parent_process, &mut first_command.data.clone())?;
382 pipeline.push(parent_process);
383
384 for cmd in commands.iter().skip(1) {
385 let mut new_root = pipeline_generate_command(cmd);
386 pipeline_pipe_process(pipeline.last_mut().unwrap(), &mut new_root)?;
387 parent_process = pipeline_spawn_process(&mut new_root)?;
388 pipeline.push(parent_process);
389 }
390
391 Ok(pipeline)
392 }
393
394 ///
395 /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
396 /// consume the pipeline and return the output.
397 ///
398 /// ## Example
399 ///
400 /// ```
401 /// use rumtk_core::strings::RUMString;
402 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
403 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
404 /// use rumtk_core::{rumtk_resolve_task, rumtk_init_threads};
405 ///
406 /// let ls_name = "ls";
407 /// let mut ls_command = RUMCommand::default();
408 /// ls_command.path = RUMString::from(ls_name);
409 ///
410 /// let wc_name = "wc";
411 /// let mut wc_command = RUMCommand::default();
412 /// wc_command.path = RUMString::from(wc_name);
413 ///
414 /// let commands = vec![
415 /// ls_command,
416 /// wc_command
417 /// ];
418 ///
419 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
420 /// let rt = rumtk_init_threads!(&5);
421 /// let result = rumtk_resolve_task!(rt, pipeline_await_pipeline(pipeline)).unwrap();
422 ///
423 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
424 /// ```
425 ///
426 pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
427 // Let's make sure the stdin is closed on the first process to make sure it exits instead of
428 // remain waiting for EOF in the stdin stream.
429 pipeline_close_process_stdin(pipeline.first_mut().unwrap());
430
431 // Now let's visit each process and await their completion!
432 for p in pipeline.iter_mut() {
433 loop {
434 match p.try_wait() {
435 Ok(code) => match code {
436 Some(code) => {
437 if !code.success() {
438 return Err(rumtk_format!(
439 "Process {} exited with non-success code => {}!",
440 p.id(),
441 code
442 ));
443 }
444 break;
445 }
446 None => {
447 async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
448 continue;
449 }
450 },
451 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
452 };
453 }
454 }
455
456 let result = pipeline_get_stdout(pipeline)?;
457 Ok(result)
458 }
459
460 ///
461 /// Await for pipeline to complete execution. Once the pipeline execution ends,
462 /// consume the pipeline and return the output.
463 ///
464 /// ## Example
465 ///
466 /// ```
467 /// use rumtk_core::strings::RUMString;
468 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
469 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
470 ///
471 /// let ls_name = "ls";
472 /// let mut ls_command = RUMCommand::default();
473 /// ls_command.path = RUMString::from(ls_name);
474 ///
475 /// let wc_name = "wc";
476 /// let mut wc_command = RUMCommand::default();
477 /// wc_command.path = RUMString::from(wc_name);
478 ///
479 /// let commands = vec![
480 /// ls_command,
481 /// wc_command
482 /// ];
483 ///
484 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
485 /// let result = pipeline_wait_pipeline(pipeline).unwrap();
486 ///
487 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
488 /// ```
489 ///
490 pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
491 // Let's make sure the stdin is closed on the first process to make sure it exits instead of
492 // remain waiting for EOF in the stdin stream.
493 pipeline_close_process_stdin(pipeline.first_mut().unwrap());
494
495 // Now let's visit each process and await their completion!
496 for p in pipeline.iter_mut() {
497 match p.wait() {
498 Ok(code) => {
499 if !code.success() {
500 return Err(rumtk_format!(
501 "Process {} exited with non-success code => {}!",
502 p.id(),
503 code
504 ));
505 }
506 break;
507 }
508 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
509 };
510 }
511
512 let result = pipeline_get_stdout(pipeline)?;
513 Ok(result)
514 }
515}
516
517pub mod pipeline_macros {
518 ///
519 /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
520 /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
521 ///
522 /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
523 /// key value pair strings that we can use to update the process environment.
524 ///
525 /// ## Example
526 ///
527 /// ### Program Only
528 ///
529 /// ```
530 /// use rumtk_core::rumtk_pipeline_command;
531 ///
532 /// let command = rumtk_pipeline_command!("ls");
533 /// ```
534 ///
535 /// ### Program with Piped Data
536 ///
537 /// ```
538 /// use rumtk_core::rumtk_pipeline_command;
539 /// use rumtk_core::types::RUMBuffer;
540 /// use rumtk_core::strings::RUMStringConversions;
541 ///
542 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
543 /// ```
544 ///
545 /// ### Program with Args
546 ///
547 /// ```
548 /// use rumtk_core::rumtk_pipeline_command;
549 /// use rumtk_core::types::RUMBuffer;
550 /// use rumtk_core::strings::RUMStringConversions;
551 ///
552 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
553 /// "-l".to_rumstring()
554 /// ]);
555 /// ```
556 ///
557 #[macro_export]
558 macro_rules! rumtk_pipeline_command {
559 ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
560 use $crate::pipelines::pipeline_types::RUMCommand;
561
562 RUMCommand::new($path, &Some($data), $args, $env)
563 }};
564 ( $path:expr, $data:expr, $args:expr ) => {{
565 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
566
567 RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
568 }};
569 ( $path:expr, $data:expr ) => {{
570 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
571
572 RUMCommand::new(
573 $path,
574 &Some($data),
575 &RUMCommandArgs::default(),
576 &RUMCommandEnv::default(),
577 )
578 }};
579 ( $path:expr ) => {{
580 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
581 use $crate::types::RUMBuffer;
582
583 RUMCommand::new(
584 $path,
585 &None,
586 &RUMCommandArgs::default(),
587 &RUMCommandEnv::default(),
588 )
589 }};
590 }
591
592 ///
593 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
594 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
595 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
596 ///
597 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
598 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
599 ///
600 /// ## Example
601 ///
602 /// ### Simple
603 ///
604 /// ```
605 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
606 /// use rumtk_core::core::{RUMResult};
607 /// use rumtk_core::strings::RUMStringConversions;
608 /// use rumtk_core::types::RUMBuffer;
609 ///
610 /// let f = async || -> RUMResult<()> {
611 /// let result = rumtk_pipeline_run!(
612 /// rumtk_pipeline_command!("ls"),
613 /// rumtk_pipeline_command!("wc")
614 /// ).unwrap();
615 ///
616 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
617 /// Ok(())
618 /// };
619 ///
620 /// let rt = rumtk_init_threads!(&5);
621 /// rumtk_resolve_task!(rt, f()).unwrap();
622 /// ```
623 ///
624 /// ### With Buffer Piped In
625 ///
626 /// ```
627 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
628 /// use rumtk_core::core::{RUMResult, new_random_buffer};
629 /// use rumtk_core::strings::RUMStringConversions;
630 /// use rumtk_core::types::RUMBuffer;
631 ///
632 /// let f = || -> RUMResult<()> {
633 /// let result = rumtk_pipeline_run!(
634 /// rumtk_pipeline_command!("ls", new_random_buffer()),
635 /// rumtk_pipeline_command!("wc")
636 /// )?;
637 ///
638 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
639 /// Ok(())
640 /// };
641 ///
642 /// f().unwrap();
643 /// ```
644 ///
645 #[macro_export]
646 macro_rules! rumtk_pipeline_run {
647 ( $($command:expr),+ ) => {{
648 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
649
650 let pipeline = pipeline_generate_pipeline(&vec![
651 $($command),+
652 ])?;
653
654 pipeline_wait_pipeline(pipeline)
655 }};
656 }
657
658 ///
659 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
660 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
661 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
662 ///
663 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
664 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
665 ///
666 /// This is the `async` flavor.
667 ///
668 /// ## Example
669 ///
670 /// ### Simple
671 ///
672 /// ```
673 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
674 /// use rumtk_core::core::{RUMResult};
675 /// use rumtk_core::strings::RUMStringConversions;
676 /// use rumtk_core::types::RUMBuffer;
677 ///
678 /// let f = async || -> RUMResult<()> {
679 /// let result = rumtk_pipeline_run_async!(
680 /// rumtk_pipeline_command!("ls"),
681 /// rumtk_pipeline_command!("wc")
682 /// ).await?;
683 ///
684 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
685 /// Ok(())
686 /// };
687 ///
688 /// let rt = rumtk_init_threads!(&5);
689 /// rumtk_resolve_task!(rt, f()).unwrap();
690 /// ```
691 ///
692 /// ### With Buffer Piped In
693 ///
694 /// ```
695 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
696 /// use rumtk_core::core::{RUMResult, new_random_buffer};
697 /// use rumtk_core::strings::RUMStringConversions;
698 /// use rumtk_core::types::RUMBuffer;
699 ///
700 /// let f = async || -> RUMResult<()> {
701 /// let result = rumtk_pipeline_run_async!(
702 /// rumtk_pipeline_command!("ls", new_random_buffer()),
703 /// rumtk_pipeline_command!("wc")
704 /// ).await?;
705 ///
706 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
707 /// Ok(())
708 /// };
709 ///
710 /// let rt = rumtk_init_threads!(&5);
711 /// rumtk_resolve_task!(rt, f()).unwrap();
712 /// ```
713 ///
714 #[macro_export]
715 macro_rules! rumtk_pipeline_run_async {
716 ( $($command:expr),+ ) => {{
717 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
718
719 let pipeline = pipeline_generate_pipeline(&vec![
720 $($command),+
721 ])?;
722
723 pipeline_await_pipeline(pipeline)
724 }};
725 }
726}