rumtk_core/pipelines.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2026 Luis M. Santos, M.D.
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025 Luis M. Santos, M.D.
24 * Copyright (C) 2025 Nick Stephenson
25 * Copyright (C) 2025 Ethan Dixon
26 * Copyright (C) 2025 MedicalMasses L.L.C.
27 *
28 * This library is free software; you can redistribute it and/or
29 * modify it under the terms of the GNU Lesser General Public
30 * License as published by the Free Software Foundation; either
31 * version 2.1 of the License, or (at your option) any later version.
32 *
33 * This library is distributed in the hope that it will be useful,
34 * but WITHOUT ANY WARRANTY; without even the implied warranty of
35 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
36 * Lesser General Public License for more details.
37 *
38 * You should have received a copy of the GNU Lesser General Public
39 * License along with this library; if not, write to the Free Software
40 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
41 */
42
43pub mod pipeline_types {
44 use crate::strings::{RUMString, RUMStringConversions};
45 use crate::types::{RUMBuffer, RUMHashMap};
46
47 use crate::core::{RUMResult, RUMVec};
48 use std::process::{Child, Command};
49
50 pub type RUMCommandArgs = Vec<RUMString>;
51 pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
52 #[derive(Default, Debug, Clone)]
53 pub struct RUMCommand {
54 pub path: RUMString,
55 pub data: Option<RUMBuffer>,
56 pub args: RUMCommandArgs,
57 pub env: RUMCommandEnv,
58 }
59
60 impl RUMCommand {
61 pub fn new(
62 prog: &str,
63 data: &Option<RUMBuffer>,
64 args: &RUMCommandArgs,
65 env: &RUMCommandEnv,
66 ) -> Self {
67 RUMCommand {
68 path: prog.to_rumstring(),
69 args: args.clone(),
70 env: env.clone(),
71 data: data.clone(),
72 }
73 }
74 }
75
76 pub type RUMCommandLine = RUMVec<RUMCommand>;
77 pub type RUMPipelineCommand = Command;
78 pub type RUMPipelineProcess = Child;
79 pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
80 pub type RUMPipelineResult = RUMResult<RUMBuffer>;
81}
82
83pub mod pipeline_functions {
84 use super::pipeline_types::*;
85 use crate::core::RUMResult;
86 use crate::strings::rumtk_format;
87 use std::io::{Read, Write};
88
89 use crate::threading::threading_functions::async_sleep;
90 use crate::types::RUMBuffer;
91 use std::process::{Command, Stdio};
92
93 const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
94 const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
95
96 ///
97 /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
98 /// runtime can use to spawn a process.
99 ///
100 /// ## Example
101 ///
102 /// ```
103 /// use std::any::{Any, TypeId};
104 ///
105 /// use rumtk_core::strings::RUMString;
106 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
107 /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
108 ///
109 /// let command_name = "ls";
110 /// let mut command = RUMCommand::default();
111 /// command.path = RUMString::from(command_name);
112 ///
113 /// let sys_command = pipeline_generate_command(&command);
114 ///
115 /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
116 ///
117 /// ```
118 ///
119 pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
120 let mut cmd = Command::new(command.path.as_str());
121
122 for arg in command.args.iter() {
123 cmd.arg(arg);
124 }
125
126 cmd.envs(command.env.iter());
127
128 cmd.stdout(Stdio::piped())
129 .stderr(Stdio::piped())
130 .stdin(Stdio::piped());
131
132 cmd
133 }
134
135 ///
136 /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
137 ///
138 /// ## Example
139 ///
140 /// ```
141 /// use std::any::{Any, TypeId};
142 ///
143 /// use rumtk_core::strings::RUMString;
144 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
145 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
146 ///
147 /// let command_name = "ls";
148 /// let mut command = RUMCommand::default();
149 /// command.path = RUMString::from(command_name);
150 ///
151 /// let mut sys_command = pipeline_generate_command(&command);
152 ///
153 /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
154 ///
155 /// process.wait();
156 /// ```
157 ///
158 pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
159 match cmd.spawn() {
160 Ok(process) => Ok(process),
161 Err(e) => Err(rumtk_format!(
162 "Failed to spawn process {:?} because => {}",
163 cmd.get_program(),
164 e
165 )),
166 }
167 }
168
169 ///
170 /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
171 /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
172 ///
173 /// ## Example
174 ///
175 /// ```
176 /// use std::any::{Any, TypeId};
177 /// use std::process::Stdio;
178 ///
179 /// use rumtk_core::strings::RUMString;
180 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
181 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process};
182 ///
183 /// let ls_name = "ls";
184 /// let mut ls_command = RUMCommand::default();
185 /// ls_command.path = RUMString::from(ls_name);
186 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
187 ///
188 /// let wc_name = "wc";
189 /// let mut wc_command = RUMCommand::default();
190 /// wc_command.path = RUMString::from(wc_name);
191 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
192 ///
193 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
194 /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
195 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
196 ///
197 /// sys_ls_process.wait();
198 /// sys_wc_process.wait();
199 /// ```
200 ///
201 pub fn pipeline_pipe_process(
202 process: &mut RUMPipelineProcess,
203 piped: &mut RUMPipelineCommand,
204 ) -> RUMResult<()> {
205 let process_stdout = Stdio::from(match process.stdout.take() {
206 Some(stdout) => stdout,
207 None => {
208 return Err(rumtk_format!(
209 "No stdout handle found for process {}.",
210 process.id()
211 ));
212 }
213 });
214 let _ = piped.stdin(process_stdout);
215 Ok(())
216 }
217
218 ///
219 /// Retrieves the standard output generated by the completed process.
220 ///
221 /// ## Example
222 ///
223 /// ```
224 /// use std::any::{Any, TypeId};
225 /// use std::process::Stdio;
226 ///
227 /// use rumtk_core::strings::RUMString;
228 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
229 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_process, pipeline_spawn_process, pipeline_get_stdout};
230 ///
231 /// let ls_name = "ls";
232 /// let mut ls_command = RUMCommand::default();
233 /// ls_command.path = RUMString::from(ls_name);
234 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
235 ///
236 /// let wc_name = "wc";
237 /// let mut wc_command = RUMCommand::default();
238 /// wc_command.path = RUMString::from(wc_name);
239 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
240 ///
241 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
242 /// pipeline_pipe_process(&mut sys_ls_process, &mut sys_wc_command).unwrap();
243 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
244 ///
245 /// sys_ls_process.wait();
246 /// sys_wc_process.wait();
247 ///
248 /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
249 ///
250 /// let out_data = pipeline_get_stdout(pipeline).unwrap();
251 ///
252 /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
253 /// ```
254 ///
255 pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
256 let mut last_item = pipeline.pop().unwrap();
257 match last_item.wait_with_output() {
258 Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
259 Err(e) => Err(rumtk_format!(
260 "Issue reading last process output because => {}",
261 e
262 )),
263 }
264 }
265
266 ///
267 /// Pipe data into a process.
268 ///
269 /// ## Example
270 ///
271 /// ```
272 /// use rumtk_core::strings::RUMString;
273 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
274 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
275 /// use rumtk_core::types::RUMBuffer;
276 ///
277 /// let ls_name = "ls";
278 /// let mut ls_command = RUMCommand::default();
279 /// ls_command.path = RUMString::from(ls_name);
280 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
281 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
282 /// pipeline_pipe_into_process(&mut sys_ls_process, &Some(RUMBuffer::default())).unwrap();
283 ///
284 /// let out = sys_ls_process.wait_with_output().unwrap();
285 ///
286 /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
287 /// ```
288 ///
289 pub fn pipeline_pipe_into_process(
290 process: &mut RUMPipelineProcess,
291 data: &Option<RUMBuffer>,
292 ) -> RUMResult<()> {
293 match data {
294 Some(data) => match process.stdin {
295 Some(ref mut stdin) => match stdin.write_all(&data) {
296 Ok(_) => {}
297 Err(e) => {
298 return Err(rumtk_format!(
299 "Failed to pipe data to stdin of process because => {}",
300 e
301 ))
302 }
303 },
304 None => {}
305 },
306 None => {}
307 }
308 Ok(())
309 }
310
311 pub fn pipeline_connect_processes<'a>(
312 root: &'a mut RUMPipelineCommand,
313 piped: &'a mut RUMPipelineCommand,
314 data: &'a Option<RUMBuffer>,
315 ) -> RUMResult<RUMPipelineProcess> {
316 let mut root_process = pipeline_spawn_process(root)?;
317 pipeline_pipe_process(&mut root_process, piped)?;
318
319 pipeline_pipe_into_process(&mut root_process, data)?;
320
321 Ok(root_process)
322 }
323
324 ///
325 /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
326 ///
327 /// ## Example
328 ///
329 /// ```
330 /// use rumtk_core::strings::RUMString;
331 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
332 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
333 ///
334 /// let ls_name = "ls";
335 /// let mut ls_command = RUMCommand::default();
336 /// ls_command.path = RUMString::from(ls_name);
337 ///
338 /// let wc_name = "wc";
339 /// let mut wc_command = RUMCommand::default();
340 /// wc_command.path = RUMString::from(wc_name);
341 ///
342 /// let commands = vec![
343 /// ls_command,
344 /// wc_command
345 /// ];
346 ///
347 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
348 ///
349 /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
350 /// ```
351 ///
352 pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
353 let first_command = commands.first().unwrap();
354 let mut root = pipeline_generate_command(&first_command);
355 let mut data = first_command.data.clone();
356 let mut parent_process;
357
358 // Setup pipeline
359 let mut pipeline = vec![];
360 root.stdin(Stdio::piped()).stdout(Stdio::piped());
361
362 for cmd in commands.iter().skip(1) {
363 let mut new_root = pipeline_generate_command(cmd);
364 parent_process = pipeline_connect_processes(&mut root, &mut new_root, &data)?;
365 pipeline.push(parent_process);
366 root = new_root;
367 data = None;
368 }
369
370 pipeline.push(pipeline_spawn_process(&mut root)?);
371
372 Ok(pipeline)
373 }
374
375 ///
376 /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
377 /// consume the pipeline and return the output.
378 ///
379 /// ## Example
380 ///
381 /// ```
382 /// use rumtk_core::strings::RUMString;
383 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
384 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
385 /// use rumtk_core::{rumtk_resolve_task, rumtk_init_threads};
386 ///
387 /// let ls_name = "ls";
388 /// let mut ls_command = RUMCommand::default();
389 /// ls_command.path = RUMString::from(ls_name);
390 ///
391 /// let wc_name = "wc";
392 /// let mut wc_command = RUMCommand::default();
393 /// wc_command.path = RUMString::from(wc_name);
394 ///
395 /// let commands = vec![
396 /// ls_command,
397 /// wc_command
398 /// ];
399 ///
400 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
401 /// let rt = rumtk_init_threads!(&5);
402 /// let result = rumtk_resolve_task!(rt, pipeline_await_pipeline(pipeline)).unwrap();
403 ///
404 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
405 /// ```
406 ///
407 pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
408 for p in pipeline.iter_mut() {
409 loop {
410 match p.try_wait() {
411 Ok(code) => match code {
412 Some(code) => {
413 if !code.success() {
414 return Err(rumtk_format!(
415 "Process {} exited with non-success code => {}!",
416 p.id(),
417 code
418 ));
419 }
420 break;
421 }
422 None => {
423 async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
424 continue;
425 }
426 },
427 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
428 };
429 }
430 }
431
432 let result = pipeline_get_stdout(pipeline)?;
433 Ok(result)
434 }
435
436 ///
437 /// Await for pipeline to complete execution. Once the pipeline execution ends,
438 /// consume the pipeline and return the output.
439 ///
440 /// ## Example
441 ///
442 /// ```
443 /// use rumtk_core::strings::RUMString;
444 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
445 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
446 ///
447 /// let ls_name = "ls";
448 /// let mut ls_command = RUMCommand::default();
449 /// ls_command.path = RUMString::from(ls_name);
450 ///
451 /// let wc_name = "wc";
452 /// let mut wc_command = RUMCommand::default();
453 /// wc_command.path = RUMString::from(wc_name);
454 ///
455 /// let commands = vec![
456 /// ls_command,
457 /// wc_command
458 /// ];
459 ///
460 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
461 /// let result = pipeline_wait_pipeline(pipeline).unwrap();
462 ///
463 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
464 /// ```
465 ///
466 pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
467 for p in pipeline.iter_mut() {
468 match p.wait() {
469 Ok(code) => {
470 if !code.success() {
471 return Err(rumtk_format!(
472 "Process {} exited with non-success code => {}!",
473 p.id(),
474 code
475 ));
476 }
477 break;
478 }
479 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
480 };
481 }
482
483 let result = pipeline_get_stdout(pipeline)?;
484 Ok(result)
485 }
486}
487
488pub mod pipeline_macros {
489 ///
490 /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
491 /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
492 ///
493 /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
494 /// key value pair strings that we can use to update the process environment.
495 ///
496 /// ## Example
497 ///
498 /// ### Program Only
499 ///
500 /// ```
501 /// use rumtk_core::rumtk_pipeline_command;
502 ///
503 /// let command = rumtk_pipeline_command!("ls");
504 /// ```
505 ///
506 /// ### Program with Piped Data
507 ///
508 /// ```
509 /// use rumtk_core::rumtk_pipeline_command;
510 /// use rumtk_core::types::RUMBuffer;
511 /// use rumtk_core::strings::RUMStringConversions;
512 ///
513 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
514 /// ```
515 ///
516 /// ### Program with Args
517 ///
518 /// ```
519 /// use rumtk_core::rumtk_pipeline_command;
520 /// use rumtk_core::types::RUMBuffer;
521 /// use rumtk_core::strings::RUMStringConversions;
522 ///
523 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
524 /// "-l".to_rumstring()
525 /// ]);
526 /// ```
527 ///
528 #[macro_export]
529 macro_rules! rumtk_pipeline_command {
530 ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
531 use $crate::pipelines::pipeline_types::RUMCommand;
532
533 RUMCommand::new($path, &Some($data), $args, $env)
534 }};
535 ( $path:expr, $data:expr, $args:expr ) => {{
536 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
537
538 RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
539 }};
540 ( $path:expr, $data:expr ) => {{
541 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
542
543 RUMCommand::new(
544 $path,
545 &Some($data),
546 &RUMCommandArgs::default(),
547 &RUMCommandEnv::default(),
548 )
549 }};
550 ( $path:expr ) => {{
551 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
552 use $crate::types::RUMBuffer;
553
554 RUMCommand::new(
555 $path,
556 &None,
557 &RUMCommandArgs::default(),
558 &RUMCommandEnv::default(),
559 )
560 }};
561 }
562
563 ///
564 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
565 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
566 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
567 ///
568 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
569 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
570 ///
571 /// ## Example
572 ///
573 /// ```
574 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
575 /// use rumtk_core::core::RUMResult;
576 /// use rumtk_core::strings::RUMStringConversions;
577 /// use rumtk_core::types::RUMBuffer;
578 ///
579 /// let f = || -> RUMResult<()> {
580 /// let result = rumtk_pipeline_run!(
581 /// rumtk_pipeline_command!("ls", RUMBuffer::default()),
582 /// rumtk_pipeline_command!("wc")
583 /// )?;
584 ///
585 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
586 /// Ok(())
587 /// };
588 ///
589 /// f().unwrap();
590 /// ```
591 ///
592 #[macro_export]
593 macro_rules! rumtk_pipeline_run {
594 ( $($command:expr),+ ) => {{
595 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
596
597 let pipeline = pipeline_generate_pipeline(&vec![
598 $($command),+
599 ])?;
600
601 pipeline_wait_pipeline(pipeline)
602 }};
603 }
604
605 ///
606 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
607 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
608 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
609 ///
610 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
611 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
612 ///
613 /// This is the `async` flavor.
614 ///
615 /// ## Example
616 ///
617 /// ```
618 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
619 /// use rumtk_core::core::RUMResult;
620 /// use rumtk_core::strings::RUMStringConversions;
621 /// use rumtk_core::types::RUMBuffer;
622 ///
623 /// let f = async || -> RUMResult<()> {
624 /// let result = rumtk_pipeline_run_async!(
625 /// rumtk_pipeline_command!("ls", RUMBuffer::default()),
626 /// rumtk_pipeline_command!("wc")
627 /// )?;
628 ///
629 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
630 /// Ok(())
631 /// };
632 ///
633 /// let rt = rumtk_init_threads!(&5);
634 /// rumtk_resolve_task!(rt, f()).unwrap();
635 /// ```
636 ///
637 #[macro_export]
638 macro_rules! rumtk_pipeline_run_async {
639 ( $($command:expr),+ ) => {{
640 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
641
642 let pipeline = pipeline_generate_pipeline(&vec![
643 $($command),+
644 ])?;
645
646 pipeline_await_pipeline(pipeline).await
647 }};
648 }
649}