rumtk_core/pipelines.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2026 Luis M. Santos, M.D.
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20/*
21 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
22 * This toolkit aims to be reliable, simple, performant, and standards compliant.
23 * Copyright (C) 2025 Luis M. Santos, M.D.
24 * Copyright (C) 2025 Nick Stephenson
25 * Copyright (C) 2025 Ethan Dixon
26 * Copyright (C) 2025 MedicalMasses L.L.C.
27 *
28 * This library is free software; you can redistribute it and/or
29 * modify it under the terms of the GNU Lesser General Public
30 * License as published by the Free Software Foundation; either
31 * version 2.1 of the License, or (at your option) any later version.
32 *
33 * This library is distributed in the hope that it will be useful,
34 * but WITHOUT ANY WARRANTY; without even the implied warranty of
35 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
36 * Lesser General Public License for more details.
37 *
38 * You should have received a copy of the GNU Lesser General Public
39 * License along with this library; if not, write to the Free Software
40 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
41 */
42
43pub mod pipeline_types {
44 use crate::strings::{RUMString, RUMStringConversions};
45 use crate::types::{RUMBuffer, RUMHashMap};
46
47 use crate::core::{RUMResult, RUMVec};
48 use std::process::{Child, Command};
49
50 pub type RUMCommandArgs = Vec<RUMString>;
51 pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
52 #[derive(Default, Debug, Clone)]
53 pub struct RUMCommand {
54 pub path: RUMString,
55 pub data: Option<RUMBuffer>,
56 pub args: RUMCommandArgs,
57 pub env: RUMCommandEnv,
58 }
59
60 impl RUMCommand {
61 pub fn new(
62 prog: &str,
63 data: &Option<RUMBuffer>,
64 args: &RUMCommandArgs,
65 env: &RUMCommandEnv,
66 ) -> Self {
67 RUMCommand {
68 path: prog.to_rumstring(),
69 args: args.clone(),
70 env: env.clone(),
71 data: data.clone(),
72 }
73 }
74 }
75
76 pub type RUMCommandLine = RUMVec<RUMCommand>;
77 pub type RUMPipelineCommand = Command;
78 pub type RUMPipelineProcess = Child;
79 pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
80 pub type RUMPipelineResult = RUMResult<RUMBuffer>;
81}
82
83pub mod pipeline_functions {
84 use super::pipeline_types::*;
85 use crate::core::RUMResult;
86 use crate::strings::rumtk_format;
87 use std::io::{Read, Write};
88
89 use crate::threading::threading_functions::async_sleep;
90 use crate::types::RUMBuffer;
91 use std::process::{Command, Stdio};
92
93 const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
94 const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
95
96 ///
97 /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
98 /// runtime can use to spawn a process.
99 ///
100 /// ## Example
101 ///
102 /// ```
103 /// use std::any::{Any, TypeId};
104 ///
105 /// use rumtk_core::strings::RUMString;
106 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
107 /// use rumtk_core::pipelines::pipeline_functions::pipeline_generate_command;
108 ///
109 /// let command_name = "ls";
110 /// let mut command = RUMCommand::default();
111 /// command.path = RUMString::from(command_name);
112 ///
113 /// let sys_command = pipeline_generate_command(&command);
114 ///
115 /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
116 ///
117 /// ```
118 ///
119 pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
120 let mut cmd = Command::new(command.path.as_str());
121
122 for arg in command.args.iter() {
123 cmd.arg(arg);
124 }
125
126 cmd.envs(command.env.iter());
127
128 cmd.stdin(Stdio::piped())
129 .stdout(Stdio::piped())
130 .stderr(Stdio::piped());
131
132 cmd
133 }
134
135 ///
136 /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
137 ///
138 /// ## Example
139 ///
140 /// ```
141 /// use std::any::{Any, TypeId};
142 ///
143 /// use rumtk_core::strings::RUMString;
144 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
145 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process};
146 ///
147 /// let command_name = "ls";
148 /// let mut command = RUMCommand::default();
149 /// command.path = RUMString::from(command_name);
150 ///
151 /// let mut sys_command = pipeline_generate_command(&command);
152 ///
153 /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
154 ///
155 /// process.wait();
156 /// ```
157 ///
158 pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
159 match cmd.spawn() {
160 Ok(process) => Ok(process),
161 Err(e) => Err(rumtk_format!(
162 "Failed to spawn process {:?} because => {}",
163 cmd.get_program(),
164 e
165 )),
166 }
167 }
168
169 ///
170 /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
171 /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
172 ///
173 /// ## Example
174 ///
175 /// ```
176 /// use std::any::{Any, TypeId};
177 /// use std::process::Stdio;
178 ///
179 /// use rumtk_core::strings::RUMString;
180 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
181 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_processes, pipeline_spawn_process};
182 ///
183 /// let ls_name = "ls";
184 /// let mut ls_command = RUMCommand::default();
185 /// ls_command.path = RUMString::from(ls_name);
186 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
187 ///
188 /// let wc_name = "wc";
189 /// let mut wc_command = RUMCommand::default();
190 /// wc_command.path = RUMString::from(wc_name);
191 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
192 ///
193 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
194 /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
195 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
196 ///
197 /// sys_ls_process.wait();
198 /// sys_wc_process.wait();
199 /// ```
200 ///
201 pub fn pipeline_pipe_processes(
202 process: &mut RUMPipelineProcess,
203 piped: &mut RUMPipelineCommand,
204 ) -> RUMResult<()> {
205 let process_stdout = Stdio::from(match process.stdout.take() {
206 Some(stdout) => stdout,
207 None => {
208 return Err(rumtk_format!(
209 "No stdout handle found for process {}.",
210 process.id()
211 ));
212 }
213 });
214 let _ = piped.stdin(process_stdout);
215 Ok(())
216 }
217
218 ///
219 /// Retrieves the standard output generated by the completed process.
220 ///
221 /// ## Example
222 ///
223 /// ```
224 /// use std::any::{Any, TypeId};
225 /// use std::process::Stdio;
226 ///
227 /// use rumtk_core::strings::RUMString;
228 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
229 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_spawn_process, pipeline_get_stdout, pipeline_pipe_processes};
230 ///
231 /// let ls_name = "ls";
232 /// let mut ls_command = RUMCommand::default();
233 /// ls_command.path = RUMString::from(ls_name);
234 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
235 ///
236 /// let wc_name = "wc";
237 /// let mut wc_command = RUMCommand::default();
238 /// wc_command.path = RUMString::from(wc_name);
239 /// let mut sys_wc_command = pipeline_generate_command(&wc_command);
240 ///
241 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
242 /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
243 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
244 ///
245 /// sys_ls_process.wait();
246 /// sys_wc_process.wait();
247 ///
248 /// let mut pipeline = vec![sys_ls_process, sys_wc_process];
249 ///
250 /// let out_data = pipeline_get_stdout(pipeline).unwrap();
251 ///
252 /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
253 /// ```
254 ///
255 pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
256 let mut last_item = pipeline.pop().unwrap();
257 match last_item.wait_with_output() {
258 Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
259 Err(e) => Err(rumtk_format!(
260 "Issue reading last process output because => {}",
261 e
262 )),
263 }
264 }
265
266 ///
267 /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
268 ///
269 /// ## Example
270 ///
271 /// ```
272 /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
273 /// use rumtk_core::strings::RUMString;
274 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
275 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
276 /// use rumtk_core::types::RUMBuffer;
277 ///
278 /// let ls_name = "ls";
279 /// let mut ls_command = RUMCommand::default();
280 /// ls_command.path = RUMString::from(ls_name);
281 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
282 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
283 ///
284 /// pipeline_close_process_stdin(&mut sys_ls_process);
285 ///
286 ///
287 /// ```
288 ///
289 pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
290 // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
291 match process.stdin.take() {
292 Some(stdin) => {
293 drop(stdin);
294 }
295 None => {}
296 };
297 }
298
299 ///
300 /// Pipe data into a process.
301 ///
302 /// ## Example
303 ///
304 /// ```
305 /// use rumtk_core::strings::RUMString;
306 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
307 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_pipe_into_process, pipeline_spawn_process};
308 /// use rumtk_core::types::RUMBuffer;
309 ///
310 /// let ls_name = "ls";
311 /// let mut ls_command = RUMCommand::default();
312 /// ls_command.path = RUMString::from(ls_name);
313 /// let mut sys_ls_command = pipeline_generate_command(&ls_command);
314 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
315 /// pipeline_pipe_into_process(&mut sys_ls_process, &Some(RUMBuffer::default())).unwrap();
316 ///
317 /// let out = sys_ls_process.wait_with_output().unwrap();
318 ///
319 /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
320 /// ```
321 ///
322 pub fn pipeline_pipe_into_process(
323 process: &mut RUMPipelineProcess,
324 data: &Option<RUMBuffer>,
325 ) -> RUMResult<()> {
326 match data {
327 Some(data) => match process.stdin {
328 Some(ref mut stdin) => match stdin.write_all(&data) {
329 Ok(_) => {}
330 Err(e) => {
331 return Err(rumtk_format!(
332 "Failed to pipe data to stdin of process because => {}",
333 e
334 ))
335 }
336 },
337 None => {}
338 },
339 None => {}
340 }
341 Ok(())
342 }
343
344 ///
345 /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
346 ///
347 /// ## Example
348 ///
349 /// ```
350 /// use rumtk_core::strings::RUMString;
351 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
352 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline};
353 ///
354 /// let ls_name = "ls";
355 /// let mut ls_command = RUMCommand::default();
356 /// ls_command.path = RUMString::from(ls_name);
357 ///
358 /// let wc_name = "wc";
359 /// let mut wc_command = RUMCommand::default();
360 /// wc_command.path = RUMString::from(wc_name);
361 ///
362 /// let commands = vec![
363 /// ls_command,
364 /// wc_command
365 /// ];
366 ///
367 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
368 ///
369 /// assert_eq!(pipeline.len(), commands.len(), "Pipeline generation returned unexpected number of items!");
370 /// ```
371 ///
372 pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
373 let first_command = commands.first().unwrap();
374
375 // Setup pipeline
376 let mut pipeline = vec![];
377
378 //Bootstrap first process in chain
379 let mut root = pipeline_generate_command(&first_command);
380 let mut parent_process = pipeline_spawn_process(&mut root)?;
381 pipeline_pipe_into_process(&mut parent_process, &mut first_command.data.clone())?;
382 pipeline.push(parent_process);
383
384 for cmd in commands.iter().skip(1) {
385 let mut new_root = pipeline_generate_command(cmd);
386 pipeline_pipe_processes(pipeline.last_mut().unwrap(), &mut new_root)?;
387 parent_process = pipeline_spawn_process(&mut new_root)?;
388 pipeline.push(parent_process);
389 }
390
391 Ok(pipeline)
392 }
393
394 ///
395 /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
396 /// consume the pipeline and return the output.
397 ///
398 /// ## Example
399 ///
400 /// ```
401 /// use rumtk_core::strings::RUMString;
402 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
403 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
404 /// use rumtk_core::{rumtk_resolve_task};
405 ///
406 /// let ls_name = "ls";
407 /// let mut ls_command = RUMCommand::default();
408 /// ls_command.path = RUMString::from(ls_name);
409 ///
410 /// let wc_name = "wc";
411 /// let mut wc_command = RUMCommand::default();
412 /// wc_command.path = RUMString::from(wc_name);
413 ///
414 /// let commands = vec![
415 /// ls_command,
416 /// wc_command
417 /// ];
418 ///
419 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
420 /// let result = rumtk_resolve_task!(pipeline_await_pipeline(pipeline)).unwrap().unwrap();
421 ///
422 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
423 /// ```
424 ///
425 pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
426 // Let's make sure the stdin is closed on the first process to make sure it exits instead of
427 // remain waiting for EOF in the stdin stream.
428 pipeline_close_process_stdin(pipeline.first_mut().unwrap());
429
430 // Now let's visit each process and await their completion!
431 for p in pipeline.iter_mut() {
432 loop {
433 match p.try_wait() {
434 Ok(code) => match code {
435 Some(code) => {
436 if !code.success() {
437 return Err(rumtk_format!(
438 "Process {} exited with non-success code => {}!",
439 p.id(),
440 code
441 ));
442 }
443 break;
444 }
445 None => {
446 async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
447 continue;
448 }
449 },
450 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
451 };
452 }
453 }
454
455 let result = pipeline_get_stdout(pipeline)?;
456 Ok(result)
457 }
458
459 ///
460 /// Await for pipeline to complete execution. Once the pipeline execution ends,
461 /// consume the pipeline and return the output.
462 ///
463 /// ## Example
464 ///
465 /// ```
466 /// use rumtk_core::strings::RUMString;
467 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
468 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
469 ///
470 /// let ls_name = "ls";
471 /// let mut ls_command = RUMCommand::default();
472 /// ls_command.path = RUMString::from(ls_name);
473 ///
474 /// let wc_name = "wc";
475 /// let mut wc_command = RUMCommand::default();
476 /// wc_command.path = RUMString::from(wc_name);
477 ///
478 /// let commands = vec![
479 /// ls_command,
480 /// wc_command
481 /// ];
482 ///
483 /// let pipeline = pipeline_generate_pipeline(&commands).unwrap();
484 /// let result = pipeline_wait_pipeline(pipeline).unwrap();
485 ///
486 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
487 /// ```
488 ///
489 pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
490 // Let's make sure the stdin is closed on the first process to make sure it exits instead of
491 // remain waiting for EOF in the stdin stream.
492 pipeline_close_process_stdin(pipeline.first_mut().unwrap());
493
494 // Now let's visit each process and await their completion!
495 for p in pipeline.iter_mut() {
496 match p.wait() {
497 Ok(code) => {
498 if !code.success() {
499 return Err(rumtk_format!(
500 "Process {} exited with non-success code => {}!",
501 p.id(),
502 code
503 ));
504 }
505 continue;
506 }
507 Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
508 };
509 }
510
511 let result = pipeline_get_stdout(pipeline)?;
512 Ok(result)
513 }
514}
515
516pub mod pipeline_macros {
517 ///
518 /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
519 /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
520 ///
521 /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
522 /// key value pair strings that we can use to update the process environment.
523 ///
524 /// ## Example
525 ///
526 /// ### Program Only
527 ///
528 /// ```
529 /// use rumtk_core::rumtk_pipeline_command;
530 ///
531 /// let command = rumtk_pipeline_command!("ls");
532 /// ```
533 ///
534 /// ### Program with Piped Data
535 ///
536 /// ```
537 /// use rumtk_core::rumtk_pipeline_command;
538 /// use rumtk_core::types::RUMBuffer;
539 /// use rumtk_core::strings::RUMStringConversions;
540 ///
541 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
542 /// ```
543 ///
544 /// ### Program with Args
545 ///
546 /// ```
547 /// use rumtk_core::rumtk_pipeline_command;
548 /// use rumtk_core::types::RUMBuffer;
549 /// use rumtk_core::strings::RUMStringConversions;
550 ///
551 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
552 /// "-l".to_rumstring()
553 /// ]);
554 /// ```
555 ///
556 #[macro_export]
557 macro_rules! rumtk_pipeline_command {
558 ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
559 use $crate::pipelines::pipeline_types::RUMCommand;
560
561 RUMCommand::new($path, &Some($data), $args, $env)
562 }};
563 ( $path:expr, $data:expr, $args:expr ) => {{
564 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
565
566 RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
567 }};
568 ( $path:expr, $data:expr ) => {{
569 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
570
571 RUMCommand::new(
572 $path,
573 &Some($data),
574 &RUMCommandArgs::default(),
575 &RUMCommandEnv::default(),
576 )
577 }};
578 ( $path:expr ) => {{
579 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
580 use $crate::types::RUMBuffer;
581
582 RUMCommand::new(
583 $path,
584 &None,
585 &RUMCommandArgs::default(),
586 &RUMCommandEnv::default(),
587 )
588 }};
589 }
590
591 ///
592 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
593 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
594 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
595 ///
596 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
597 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
598 ///
599 /// ## Example
600 ///
601 /// ### Simple
602 ///
603 /// ```
604 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
605 /// use rumtk_core::core::{RUMResult};
606 /// use rumtk_core::strings::RUMStringConversions;
607 /// use rumtk_core::types::RUMBuffer;
608 ///
609 /// let f = async || -> RUMResult<()> {
610 /// let result = rumtk_pipeline_run!(
611 /// rumtk_pipeline_command!("ls"),
612 /// rumtk_pipeline_command!("wc")
613 /// ).unwrap();
614 ///
615 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
616 /// Ok(())
617 /// };
618 ///
619 /// rumtk_resolve_task!(f()).unwrap();
620 /// ```
621 ///
622 /// ### With Buffer Piped In
623 ///
624 /// ```
625 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
626 /// use rumtk_core::core::{RUMResult, new_random_buffer};
627 /// use rumtk_core::strings::RUMStringConversions;
628 /// use rumtk_core::types::RUMBuffer;
629 ///
630 /// let f = || -> RUMResult<()> {
631 /// let result = rumtk_pipeline_run!(
632 /// rumtk_pipeline_command!("ls", new_random_buffer()),
633 /// rumtk_pipeline_command!("wc")
634 /// )?;
635 ///
636 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
637 /// Ok(())
638 /// };
639 ///
640 /// f().unwrap();
641 /// ```
642 ///
643 #[macro_export]
644 macro_rules! rumtk_pipeline_run {
645 ( $($command:expr),+ ) => {{
646 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
647
648 let pipeline = pipeline_generate_pipeline(&vec![
649 $($command),+
650 ])?;
651
652 pipeline_wait_pipeline(pipeline)
653 }};
654 }
655
656 ///
657 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
658 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
659 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
660 ///
661 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
662 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
663 ///
664 /// This is the `async` flavor.
665 ///
666 /// ## Example
667 ///
668 /// ### Simple
669 ///
670 /// ```
671 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
672 /// use rumtk_core::core::{RUMResult};
673 /// use rumtk_core::strings::RUMStringConversions;
674 /// use rumtk_core::types::RUMBuffer;
675 ///
676 /// let f = async || -> RUMResult<()> {
677 /// let result = rumtk_pipeline_run_async!(
678 /// rumtk_pipeline_command!("ls"),
679 /// rumtk_pipeline_command!("wc")
680 /// ).await?;
681 ///
682 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
683 /// Ok(())
684 /// };
685 ///
686 /// rumtk_resolve_task!(f()).unwrap();
687 /// ```
688 ///
689 /// ### With Buffer Piped In
690 ///
691 /// ```
692 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
693 /// use rumtk_core::core::{RUMResult, new_random_buffer};
694 /// use rumtk_core::strings::RUMStringConversions;
695 /// use rumtk_core::types::RUMBuffer;
696 ///
697 /// let f = async || -> RUMResult<()> {
698 /// let result = rumtk_pipeline_run_async!(
699 /// rumtk_pipeline_command!("wc", new_random_buffer())
700 /// ).await?;
701 ///
702 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
703 /// Ok(())
704 /// };
705 ///
706 /// rumtk_resolve_task!(f()).unwrap();
707 /// ```
708 ///
709 /// ### With Buffer Piped In W/ Return
710 ///
711 /// ```
712 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
713 /// use rumtk_core::core::{RUMResult, new_random_buffer};
714 /// use rumtk_core::strings::{RUMString, RUMStringConversions, RUMArrayConversions};
715 /// use rumtk_core::types::RUMBuffer;
716 ///
717 /// let expected = "1024\n";
718 ///
719 /// let f = async || -> RUMResult<RUMBuffer> {
720 /// let result = rumtk_pipeline_run_async!(
721 /// rumtk_pipeline_command!("wc", new_random_buffer())
722 /// ).await?;
723 ///
724 /// Ok(result)
725 /// };
726 ///
727 /// let result = rumtk_resolve_task!(f()).unwrap().unwrap();
728 /// let string = result.to_vec().to_rumstring();
729 /// let result_buffer_size = string.split(" ").last().unwrap().split(" ").last().unwrap().to_rumstring();
730 ///
731 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
732 /// assert_eq!(&result_buffer_size, expected, "Pipeline returned an unexpected result from command wc! => {:?}\nvs.\n{:?}", &result_buffer_size, &expected);
733 /// ```
734 ///
735 #[macro_export]
736 macro_rules! rumtk_pipeline_run_async {
737 ( $($command:expr),+ ) => {{
738 use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
739
740 let pipeline = pipeline_generate_pipeline(&vec![
741 $($command),+
742 ])?;
743
744 pipeline_await_pipeline(pipeline)
745 }};
746 }
747}