rumtk_core/pipelines.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025 MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21pub mod pipeline_types {
22 use crate::base::{RUMResult, RUMVec};
23 use crate::serde::json::{RUMDeJson, RUMSerJson};
24 use crate::strings::{RUMString, RUMStringConversions};
25 use crate::types::{RUMBuffer, RUMHashMap};
26 use std::process::{Child, Command};
27
28 pub static EMPTY_COMMAND_LINE: RUMCommandLine = RUMCommandLine::new();
29
30 pub type RUMCommandArgs = Vec<RUMString>;
31 pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
32 #[derive(RUMSerJson, RUMDeJson, PartialEq, Default, Debug, Clone)]
33 pub struct RUMCommand {
34 pub path: RUMString,
35 #[serde(skip)]
36 pub data: Option<RUMBuffer>,
37 pub args: RUMCommandArgs,
38 pub env: RUMCommandEnv,
39 }
40
41 impl RUMCommand {
42 pub fn new(
43 prog: &str,
44 data: &Option<RUMBuffer>,
45 args: &RUMCommandArgs,
46 env: &RUMCommandEnv,
47 ) -> Self {
48 RUMCommand {
49 path: prog.to_string(),
50 args: args.clone(),
51 env: env.clone(),
52 data: data.clone(),
53 }
54 }
55 }
56
57 pub type RUMCommandLine = RUMVec<RUMCommand>;
58 pub type RUMPipelineCommand = Command;
59 pub type RUMPipelineProcess = Child;
60 pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
61 pub type RUMPipelineResult = RUMResult<RUMBuffer>;
62}
63
64pub mod pipeline_functions {
65 use super::pipeline_types::*;
66 use crate::base::RUMResult;
67 use crate::strings::{rumtk_format, string_format, RUMArrayConversions, RUMString, StringReplacementPair};
68 use std::io::{Read, Write};
69 use std::os::unix::ffi::OsStrExt;
70
71 use crate::rumtk_resolve_sync_task;
72 use crate::types::RUMBuffer;
73 use std::process::{Command, Stdio};
74
75 const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
76 const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
77
78 ///
79 /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
80 /// runtime can use to spawn a process.
81 ///
82 /// ## Example
83 ///
84 /// ```
85 /// use std::any::{Any, TypeId};
86 ///
87 /// use rumtk_core::strings::RUMString;
88 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
89 /// use rumtk_core::pipelines::pipeline_functions::pipeline_create_command;
90 ///
91 /// let command_name = "ls";
92 /// let mut command = RUMCommand::default();
93 /// command.path = RUMString::from(command_name);
94 ///
95 /// let sys_command = pipeline_create_command(&command);
96 ///
97 /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
98 ///
99 /// ```
100 ///
101 pub fn pipeline_create_command(command: &RUMCommand) -> RUMPipelineCommand {
102 let mut cmd = Command::new(command.path.as_str());
103
104 for arg in command.args.iter() {
105 cmd.arg(arg);
106 }
107
108 cmd.envs(command.env.iter());
109
110 cmd.stdin(Stdio::piped())
111 .stdout(Stdio::piped())
112 .stderr(Stdio::piped());
113
114 cmd
115 }
116
117 ///
118 /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
119 ///
120 /// ## Example
121 ///
122 /// ```
123 /// use std::any::{Any, TypeId};
124 ///
125 /// use rumtk_core::strings::RUMString;
126 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
127 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_spawn_process};
128 ///
129 /// let command_name = "ls";
130 /// let mut command = RUMCommand::default();
131 /// command.path = RUMString::from(command_name);
132 ///
133 /// let mut sys_command = pipeline_create_command(&command);
134 ///
135 /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
136 ///
137 /// process.wait();
138 /// ```
139 ///
140 pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
141 match cmd.spawn() {
142 Ok(process) => {
143 println!("Spawned process {} => {} with args {:?}", process.id(), cmd.get_program().as_bytes().to_string()?, cmd.get_args());
144 Ok(process)
145 },
146 Err(e) => Err(rumtk_format!(
147 "Failed to spawn process {:?} because => {}",
148 cmd.get_program(),
149 e
150 )),
151 }
152 }
153
154 ///
155 /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
156 /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
157 ///
158 /// ## Example
159 ///
160 /// ```
161 /// use std::any::{Any, TypeId};
162 /// use std::process::Stdio;
163 ///
164 /// use rumtk_core::strings::RUMString;
165 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
166 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_processes, pipeline_spawn_process};
167 ///
168 /// let ls_name = "ls";
169 /// let mut ls_command = RUMCommand::default();
170 /// ls_command.path = RUMString::from(ls_name);
171 /// let mut sys_ls_command = pipeline_create_command(&ls_command);
172 ///
173 /// let wc_name = "wc";
174 /// let mut wc_command = RUMCommand::default();
175 /// wc_command.path = RUMString::from(wc_name);
176 /// let mut sys_wc_command = pipeline_create_command(&wc_command);
177 ///
178 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
179 /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
180 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
181 ///
182 /// sys_ls_process.wait();
183 /// sys_wc_process.wait();
184 /// ```
185 ///
186 pub fn pipeline_pipe_processes(
187 process: &mut RUMPipelineProcess,
188 piped: &mut RUMPipelineCommand,
189 ) -> RUMResult<()> {
190 let process_stdout = Stdio::from(match process.stdout.take() {
191 Some(stdout) => stdout,
192 None => {
193 return Err(rumtk_format!(
194 "No stdout handle found for process {}.",
195 process.id()
196 ));
197 }
198 });
199 let _ = piped.stdin(process_stdout);
200 Ok(())
201 }
202
203 ///
204 /// Retrieves the standard output generated by the completed process.
205 ///
206 /// ## Example
207 ///
208 /// ```
209 /// use std::any::{Any, TypeId};
210 /// use std::process::Stdio;
211 ///
212 /// use rumtk_core::strings::RUMString;
213 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
214 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_spawn_process, pipeline_get_stdout, pipeline_pipe_processes};
215 ///
216 /// let ls_name = "ls";
217 /// let mut ls_command = RUMCommand::default();
218 /// ls_command.path = RUMString::from(ls_name);
219 /// let mut sys_ls_command = pipeline_create_command(&ls_command);
220 ///
221 /// let wc_name = "wc";
222 /// let mut wc_command = RUMCommand::default();
223 /// wc_command.path = RUMString::from(wc_name);
224 /// let mut sys_wc_command = pipeline_create_command(&wc_command);
225 ///
226 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
227 /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
228 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
229 ///
230 /// sys_ls_process.wait();
231 /// sys_wc_process.wait();
232 ///
233 /// let out_data = pipeline_get_stdout(sys_wc_process).unwrap();
234 ///
235 /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
236 /// ```
237 ///
238 pub fn pipeline_get_stdout(mut process: RUMPipelineProcess) -> RUMResult<RUMBuffer> {
239 match process.wait_with_output() {
240 Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
241 Err(e) => Err(rumtk_format!(
242 "Issue reading last process output because => {}",
243 e
244 )),
245 }
246 }
247
248 ///
249 /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
250 ///
251 /// ## Example
252 ///
253 /// ```
254 /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
255 /// use rumtk_core::strings::RUMString;
256 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
257 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_into_process, pipeline_spawn_process};
258 /// use rumtk_core::types::RUMBuffer;
259 ///
260 /// let ls_name = "ls";
261 /// let mut ls_command = RUMCommand::default();
262 /// ls_command.path = RUMString::from(ls_name);
263 /// let mut sys_ls_command = pipeline_create_command(&ls_command);
264 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
265 ///
266 /// pipeline_close_process_stdin(&mut sys_ls_process);
267 ///
268 ///
269 /// ```
270 ///
271 pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
272 // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
273 match process.stdin.take() {
274 Some(stdin) => {
275 drop(stdin);
276 }
277 None => {}
278 };
279 }
280
281 ///
282 /// Pipe data into a process.
283 ///
284 /// ## Example
285 ///
286 /// ```
287 /// use rumtk_core::strings::RUMString;
288 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
289 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_into_process, pipeline_spawn_process};
290 /// use rumtk_core::types::RUMBuffer;
291 ///
292 /// let ls_name = "ls";
293 /// let mut ls_command = RUMCommand::default();
294 /// ls_command.path = RUMString::from(ls_name);
295 /// let mut sys_ls_command = pipeline_create_command(&ls_command);
296 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
297 /// pipeline_pipe_into_process(&mut sys_ls_process, &RUMBuffer::default()).unwrap();
298 ///
299 /// let out = sys_ls_process.wait_with_output().unwrap();
300 ///
301 /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
302 /// ```
303 ///
304 pub fn pipeline_pipe_into_process(
305 process: &mut RUMPipelineProcess,
306 data: &RUMBuffer,
307 ) -> RUMResult<()> {
308 match process.stdin.take() {
309 Some(ref mut stdin) => match stdin.write_all(data.as_slice()) {
310 Ok(_) => {}
311 Err(e) => {
312 return Err(rumtk_format!(
313 "Failed to pipe data to stdin of process {} because => {}",
314 process.id(),
315 e
316 ))
317 }
318 },
319 None => {}
320 }
321 Ok(())
322 }
323
324 ///
325 /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
326 ///
327 /// ## Example
328 ///
329 /// ```
330 /// use rumtk_core::strings::{RUMStringConversions, RUMString};
331 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
332 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_get_stdout, pipeline_wait_pipeline};
333 /// use rumtk_core::types::RUMBuffer;
334 ///
335 /// let ls_name = "ls";
336 /// let mut ls_command = RUMCommand::default();
337 /// ls_command.path = RUMString::from(ls_name);
338 /// ls_command.args = vec!["-la".to_string()];
339 ///
340 /// let mut process = pipeline_generate_command(&ls_command, &RUMBuffer::default()).unwrap();
341 /// let result = pipeline_get_stdout(process).unwrap();
342 ///
343 /// assert!(result.len() > 0, "Command generation failed to generate a valid command!");
344 /// ```
345 ///
346 pub fn pipeline_generate_command(command: &RUMCommand, data: &RUMBuffer) -> RUMResult<RUMPipelineProcess> {
347 let mut root = pipeline_create_command(&command);
348 let mut process = pipeline_spawn_process(&mut root)?;
349 pipeline_pipe_into_process(&mut process, data)?;
350 pipeline_close_process_stdin(&mut process);
351
352 Ok(process)
353 }
354
355 ///
356 /// Add buffer at the beginning of pipeline to pipe in. This buffer serves as the initial input of
357 /// a pipe aware program.
358 ///
359 /// ## Example
360 /// ```
361 /// use rumtk_core::base::RUMResult;
362 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMCommandLine};
363 /// use rumtk_core::rumtk_pipeline_run;
364 /// use rumtk_core::strings::{RUMString};
365 /// use rumtk_core::buffers::{buffer_to_string};
366 /// use rumtk_core::types::RUMBuffer;
367 ///
368 /// let data = RUMBuffer::from_static(b"Hello World");
369 /// let wc_name = "wc";
370 /// let mut wc_command = RUMCommand::default();
371 /// wc_command.path = RUMString::from(wc_name);
372 /// let mut pipeline = vec![
373 /// wc_command
374 /// ];
375 ///
376 /// let processor = || -> RUMResult<RUMBuffer> {rumtk_pipeline_run!(&pipeline, &data)};
377 /// let result_string = buffer_to_string(&processor().unwrap()).unwrap();
378 /// let binding = result_string.as_str().replace('\n', "");
379 /// let result_items: Vec<&str> = binding.split(" ").collect();
380 /// let result = result_items.get(2).unwrap().trim().parse::<i32>().unwrap();
381 ///
382 /// assert_eq!(result, 2, "Data was not piped properly!");
383 /// ```
384 ///
385 pub fn pipeline_add_stdin_data_to_pipeline<'a>(pipeline: &'a mut RUMCommandLine, data: &RUMBuffer) -> &'a RUMCommandLine {
386 match pipeline.get_mut(0) {
387 Some(command) => command.data = Some(data.clone()),
388 None => {
389 return pipeline;
390 }
391 };
392
393 pipeline
394 }
395
396 ///
397 /// Flatten the [RUMCommandLine] structure into a single string representing the pipeline and
398 /// print it or log it.
399 ///
400 fn print_pipeline(pipeline: &RUMCommandLine) {
401 let mut pipeline_components = Vec::<RUMString>::with_capacity(pipeline.len());
402
403 for pipe in pipeline.iter() {
404 pipeline_components.push(rumtk_format!("{} {}", pipe.path, pipe.args.clone().join(" ")));
405 }
406
407 println!("Executing {}", pipeline_components.join(" | "));
408 }
409
410 pub fn pipeline_patch_command_args<'a>(cmd: &'a mut RUMCommand, replacements: &StringReplacementPair) -> RUMResult<&'a RUMCommand> {
411 let mut new_args = RUMCommandArgs::with_capacity(cmd.args.len());
412
413 for arg in cmd.args.iter() {
414 new_args.push(string_format(arg, replacements));
415 }
416
417 cmd.args = new_args;
418
419 Ok(cmd)
420 }
421
422 ///
423 /// Patches the arguments of the first command with the pattern=replacement pairs!
424 ///
425 /// ## Example
426 /// ```
427 /// use rumtk_core::base::RUMResult;
428 /// use rumtk_core::pipelines::pipeline_functions::pipeline_patch_args;
429 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMCommandLine};
430 /// use rumtk_core::rumtk_pipeline_run;
431 /// use rumtk_core::strings::{RUMString};
432 /// use rumtk_core::buffers::{buffer_to_string};
433 /// use rumtk_core::types::RUMBuffer;
434 ///
435 /// let ls_name = "ls";
436 /// let mut ls_command = RUMCommand::default();
437 /// ls_command.path = RUMString::from(ls_name);
438 /// ls_command.args.push(RUMString::from("{options}"));
439 /// let mut pipeline = vec![
440 /// ls_command
441 /// ];
442 /// pipeline_patch_args(&mut pipeline, &[("{options}", "-la")]);
443 ///
444 /// let processor = || -> RUMResult<RUMBuffer> {rumtk_pipeline_run!(&pipeline)};
445 /// let result_string = buffer_to_string(&processor().unwrap()).unwrap();
446 /// let results: Vec<&str> = result_string.as_str().split("\n").collect();
447 /// let dot_dir = results.get(1).unwrap().chars().last().unwrap();
448 ///
449 /// assert_eq!(dot_dir, '.', "Incorrect options passed!");
450 /// ```
451 ///
452 pub fn pipeline_patch_args<'a>(pipeline: &'a mut RUMCommandLine, replacements: &StringReplacementPair) -> RUMResult<&'a RUMCommandLine> {
453 for mut cmd in pipeline.iter_mut() {
454 pipeline_patch_command_args(&mut cmd, replacements)?;
455 }
456 Ok(pipeline)
457 }
458
459 ///
460 /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
461 /// consume the pipeline and return the output.
462 ///
463 /// ## Example
464 ///
465 /// ```
466 /// use rumtk_core::strings::RUMString;
467 /// use rumtk_core::types::RUMBuffer;
468 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
469 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_await_pipeline};
470 /// use rumtk_core::{rumtk_resolve_task};
471 ///
472 /// let ls_name = "ls";
473 /// let mut ls_command = RUMCommand::default();
474 /// ls_command.path = RUMString::from(ls_name);
475 ///
476 /// let wc_name = "wc";
477 /// let mut wc_command = RUMCommand::default();
478 /// wc_command.path = RUMString::from(wc_name);
479 ///
480 /// let pipeline = vec![
481 /// ls_command,
482 /// wc_command
483 /// ];
484 ///
485 /// let result = rumtk_resolve_task!(pipeline_await_pipeline(&pipeline, &RUMBuffer::default())).unwrap();
486 ///
487 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
488 /// ```
489 ///
490 pub async fn pipeline_await_pipeline(pipeline: &RUMCommandLine, initial_data: &RUMBuffer) -> RUMPipelineResult {
491 let pipeline_copy = pipeline.clone();
492 let data_copy = initial_data.clone();
493 rumtk_resolve_sync_task!(move || {
494 pipeline_wait_pipeline(&pipeline_copy, &data_copy)
495 })
496 }
497
498 ///
499 /// Await for pipeline to complete execution. Once the pipeline execution ends,
500 /// consume the pipeline and return the output.
501 ///
502 /// ## Example
503 ///
504 /// ```
505 /// use rumtk_core::strings::RUMString;
506 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
507 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_wait_pipeline};
508 /// use rumtk_core::types::RUMBuffer;
509 ///
510 /// let ls_name = "ls";
511 /// let mut ls_command = RUMCommand::default();
512 /// ls_command.path = RUMString::from(ls_name);
513 ///
514 /// let wc_name = "wc";
515 /// let mut wc_command = RUMCommand::default();
516 /// wc_command.path = RUMString::from(wc_name);
517 ///
518 /// let pipeline = vec![
519 /// ls_command,
520 /// wc_command
521 /// ];
522 ///
523 /// let result = pipeline_wait_pipeline(&pipeline, &RUMBuffer::default()).unwrap();
524 ///
525 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
526 /// ```
527 ///
528 pub fn pipeline_wait_pipeline(pipeline: &RUMCommandLine, initial_data: &RUMBuffer) -> RUMPipelineResult {
529 let mut last_data = initial_data.clone();
530
531 // Print pipeline
532 print_pipeline(&pipeline);
533
534 // Now let's visit each process and await their completion!
535 for c in pipeline.iter() {
536 let mut p = pipeline_generate_command(&c, &last_data)?;
537 pipeline_close_process_stdin(&mut p);
538 println!("Waiting on {}", p.id());
539 last_data = pipeline_get_stdout(p)?;
540 }
541
542 Ok(last_data)
543 }
544}
545
546pub mod pipeline_macros {
547 ///
548 /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
549 /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
550 ///
551 /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
552 /// key value pair strings that we can use to update the process environment.
553 ///
554 /// ## Example
555 ///
556 /// ### Program Only
557 ///
558 /// ```
559 /// use rumtk_core::rumtk_pipeline_command;
560 ///
561 /// let command = rumtk_pipeline_command!("ls");
562 /// ```
563 ///
564 /// ### Program with Piped Data
565 ///
566 /// ```
567 /// use rumtk_core::rumtk_pipeline_command;
568 /// use rumtk_core::types::RUMBuffer;
569 /// use rumtk_core::strings::RUMStringConversions;
570 ///
571 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
572 /// ```
573 ///
574 /// ### Program with Args
575 ///
576 /// ```
577 /// use rumtk_core::rumtk_pipeline_command;
578 /// use rumtk_core::types::RUMBuffer;
579 /// use rumtk_core::strings::RUMStringConversions;
580 ///
581 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
582 /// "-l".to_string()
583 /// ]);
584 /// ```
585 ///
586 #[macro_export]
587 macro_rules! rumtk_pipeline_command {
588 ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
589 use $crate::pipelines::pipeline_types::RUMCommand;
590
591 RUMCommand::new($path, &Some($data), $args, $env)
592 }};
593 ( $path:expr, $data:expr, $args:expr ) => {{
594 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
595
596 RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
597 }};
598 ( $path:expr, $data:expr ) => {{
599 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
600
601 RUMCommand::new(
602 $path,
603 &Some($data),
604 &RUMCommandArgs::default(),
605 &RUMCommandEnv::default(),
606 )
607 }};
608 ( $path:expr ) => {{
609 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
610 use $crate::types::RUMBuffer;
611
612 RUMCommand::new(
613 $path,
614 &None,
615 &RUMCommandArgs::default(),
616 &RUMCommandEnv::default(),
617 )
618 }};
619 }
620
621 ///
622 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
623 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
624 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
625 ///
626 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
627 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
628 ///
629 /// ## Example
630 ///
631 /// ### Simple
632 ///
633 /// ```
634 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_quick_run, rumtk_resolve_task, rumtk_init_threads};
635 /// use rumtk_core::base::{RUMResult};
636 /// use rumtk_core::strings::RUMStringConversions;
637 /// use rumtk_core::types::RUMBuffer;
638 ///
639 /// let f = async || -> RUMResult<()> {
640 /// let result = rumtk_pipeline_quick_run!(
641 /// rumtk_pipeline_command!("ls"),
642 /// rumtk_pipeline_command!("wc")
643 /// ).unwrap();
644 ///
645 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
646 /// Ok(())
647 /// };
648 ///
649 /// rumtk_resolve_task!(f()).unwrap();
650 /// ```
651 ///
652 #[macro_export]
653 macro_rules! rumtk_pipeline_quick_run {
654 ( $($command:expr),+ ) => {{
655 use $crate::types::RUMBuffer;
656 use $crate::pipelines::pipeline_functions::{pipeline_wait_pipeline};
657
658 let pipeline = vec![$($command),+];
659
660 pipeline_wait_pipeline(&pipeline, &RUMBuffer::default())
661 }};
662 }
663
664 ///
665 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
666 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
667 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
668 ///
669 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
670 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
671 ///
672 /// This is the `async` flavor.
673 ///
674 /// ## Example
675 ///
676 /// ### Simple
677 ///
678 /// ```
679 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_quick_run_async, rumtk_resolve_task, rumtk_init_threads};
680 /// use rumtk_core::base::{RUMResult};
681 /// use rumtk_core::strings::RUMStringConversions;
682 /// use rumtk_core::types::RUMBuffer;
683 ///
684 /// let f = async || -> RUMResult<()> {
685 /// let result = rumtk_pipeline_quick_run_async!(
686 /// rumtk_pipeline_command!("ls"),
687 /// rumtk_pipeline_command!("wc")
688 /// ).await?;
689 ///
690 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
691 /// Ok(())
692 /// };
693 ///
694 /// rumtk_resolve_task!(f()).unwrap();
695 /// ```
696 ///
697 #[macro_export]
698 macro_rules! rumtk_pipeline_quick_run_async {
699 ( $($command:expr),+ ) => {{
700 use $crate::types::RUMBuffer;
701 use $crate::pipelines::pipeline_functions::{pipeline_await_pipeline};
702
703 let pipeline = vec![$($command),+];
704
705 pipeline_await_pipeline(&pipeline.clone(), &RUMBuffer::default().clone())
706 }};
707 }
708
709 ///
710 /// This macro is similar to [rumtk_pipeline_quick_run](crate::rumtk_pipeline_quick_run). The difference here
711 /// is that the function takes a pipeline structure ([RUMCommandLine](crate::pipelines::pipeline_types::RUMCommandLine))
712 /// In other words, this macro simply runs an already defined pipeline.
713 ///
714 /// ## Example
715 ///
716 /// ### Run the pipeline
717 /// ```
718 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
719 /// use rumtk_core::base::{RUMResult};
720 /// use rumtk_core::strings::RUMStringConversions;
721 /// use rumtk_core::types::RUMBuffer;
722 ///
723 /// let f = || -> RUMResult<RUMBuffer> {
724 /// let pipeline = vec![
725 /// rumtk_pipeline_command!("ls"),
726 /// rumtk_pipeline_command!("wc")
727 /// ];
728 ///
729 /// rumtk_pipeline_run!(&pipeline)
730 /// };
731 ///
732 /// f().unwrap();
733 /// ```
734 ///
735 /// ### Pipe Buffer to Pipeline
736 ///
737 /// ```
738 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run};
739 /// use rumtk_core::base::RUMResult;
740 /// use rumtk_core::types::RUMBuffer;
741 /// use rumtk_core::strings::{string_to_buffer};
742 /// use rumtk_core::buffers::{buffer_to_string};
743 ///
744 /// const data: &str = "Hello World!";
745 /// const expected: &str = " 0 2 12\n";
746 ///
747 ///
748 /// let f = |input: &RUMBuffer| -> RUMResult<RUMBuffer> {
749 /// let mut pipeline = vec![
750 /// rumtk_pipeline_command!("wc")
751 /// ];
752 ///
753 /// rumtk_pipeline_run!(&pipeline, &input)
754 /// };
755 /// let result = buffer_to_string(&f(&string_to_buffer(data)).unwrap()).unwrap();
756 ///
757 /// assert_eq!(result, expected, "Buffer correctly piped into pipeline!");
758 /// ```
759 ///
760 /// ### Pipe String to Pipeline
761 ///
762 /// ```
763 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run};
764 /// use rumtk_core::base::RUMResult;
765 /// use rumtk_core::strings::{string_to_buffer};
766 /// use rumtk_core::buffers::{buffer_to_string};
767 /// use rumtk_core::types::RUMBuffer;
768 ///
769 /// const data: &str = "Hello World!";
770 /// const expected: &str = " 0 2 12\n";
771 ///
772 ///
773 /// let f = |input: &str| -> RUMResult<RUMBuffer> {
774 /// let mut pipeline = vec![
775 /// rumtk_pipeline_command!("wc")
776 /// ];
777 ///
778 /// rumtk_pipeline_run!(&pipeline, &string_to_buffer(input))
779 /// };
780 /// let result = buffer_to_string(&f(data).unwrap()).unwrap();
781 ///
782 /// assert_eq!(result, expected, "String correctly piped into pipeline!");
783 /// ```
784 ///
785 #[macro_export]
786 macro_rules! rumtk_pipeline_run {
787 ( $pipeline:expr ) => {{
788 use $crate::types::RUMBuffer;
789
790 rumtk_pipeline_run!($pipeline, &RUMBuffer::default())
791 }};
792 ( $pipeline:expr, $data:expr ) => {{
793 use $crate::pipelines::pipeline_functions::{pipeline_wait_pipeline};
794
795 pipeline_wait_pipeline($pipeline, $data)
796 }};
797 }
798
799
800 ///
801 /// This macro is similar to [rumtk_pipeline_quick_run_async](crate::rumtk_pipeline_quick_run_async). The difference here
802 /// is that the function takes a pipeline structure ([RUMCommandLine](crate::pipelines::pipeline_types::RUMCommandLine))
803 /// In other words, this macro simply runs an already defined pipeline.
804 ///
805 /// ## Example
806 /// ```
807 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
808 /// use rumtk_core::base::{RUMResult};
809 /// use rumtk_core::strings::RUMStringConversions;
810 /// use rumtk_core::types::RUMBuffer;
811 ///
812 /// let f = async || -> RUMResult<RUMBuffer> {
813 /// let pipeline = vec![
814 /// rumtk_pipeline_command!("ls"),
815 /// rumtk_pipeline_command!("wc")
816 /// ];
817 ///
818 /// rumtk_pipeline_run_async!(&pipeline).await
819 /// };
820 ///
821 /// rumtk_resolve_task!(f()).unwrap();
822 /// ```
823 ///
824 /// ### Pipe Buffer to Pipeline
825 ///
826 /// ```
827 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_spawn_task};
828 /// use rumtk_core::base::RUMResult;
829 /// use rumtk_core::types::RUMBuffer;
830 /// use rumtk_core::buffers::{buffer_to_string};
831 /// use rumtk_core::strings::{string_to_buffer};
832 ///
833 /// const data: &str = "Hello World!";
834 /// const expected: &str = " 0 2 12\n";
835 ///
836 ///
837 /// let task = rumtk_spawn_task!(async {
838 /// let input = RUMBuffer::from("Hello World!");
839 /// let mut pipeline = vec![
840 /// rumtk_pipeline_command!("wc")
841 /// ];
842 ///
843 /// rumtk_pipeline_run_async!(&pipeline, &input).await
844 /// });
845 ///
846 /// let result = buffer_to_string(&rumtk_resolve_task!(task).unwrap().unwrap()).unwrap();
847 ///
848 /// assert_eq!(result, expected, "Buffer correctly piped into pipeline!");
849 /// ```
850 ///
851 /// ### Pipe String to Pipeline
852 ///
853 /// ```
854 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_spawn_task};
855 /// use rumtk_core::base::RUMResult;
856 /// use rumtk_core::strings::{string_to_buffer};
857 /// use rumtk_core::buffers::{buffer_to_string};
858 /// use rumtk_core::types::RUMBuffer;
859 ///
860 /// const data: &str = "Hello World!";
861 /// const expected: &str = " 0 2 12\n";
862 ///
863 ///
864 /// let task = rumtk_spawn_task!(async {
865 /// let input = string_to_buffer(data);
866 /// let mut pipeline = vec![
867 /// rumtk_pipeline_command!("wc")
868 /// ];
869 ///
870 /// rumtk_pipeline_run_async!(&pipeline, &input).await
871 /// });
872 ///
873 /// let result = buffer_to_string(&rumtk_resolve_task!(task).unwrap().unwrap()).unwrap();
874 ///
875 /// assert_eq!(result, expected, "String correctly piped into pipeline!");
876 /// ```
877 ///
878 #[macro_export]
879 macro_rules! rumtk_pipeline_run_async {
880 ( $pipeline:expr ) => {{
881 use $crate::types::RUMBuffer;
882
883 rumtk_pipeline_run_async!($pipeline, &RUMBuffer::default())
884 }};
885 ( $pipeline:expr, $data:expr ) => {{
886 use $crate::pipelines::pipeline_functions::{pipeline_await_pipeline};
887
888 pipeline_await_pipeline($pipeline, $data)
889 }};
890 }
891
892 ///
893 /// Patch the pipeline's arguments with desired dynamic options.
894 ///
895 /// ## Example
896 /// ```
897 /// use rumtk_core::{rumtk_pipeline_patch_args, rumtk_pipeline_command, rumtk_pipeline_run};
898 /// use rumtk_core::base::RUMResult;
899 /// use rumtk_core::buffers::{buffer_to_string};
900 /// use rumtk_core::types::RUMBuffer;
901 /// use rumtk_core::strings::string_to_buffer;
902 ///
903 ///
904 /// let f = || -> RUMResult<RUMBuffer> {
905 /// let mut pipeline = vec![
906 /// rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
907 /// "{options}".into()
908 /// ])
909 /// ];
910 ///
911 /// rumtk_pipeline_patch_args!(&mut pipeline, &[("{options}", "-la")]);
912 ///
913 /// rumtk_pipeline_run!(&pipeline)
914 /// };
915 ///
916 /// let result_string = buffer_to_string(&f().unwrap()).unwrap();
917 /// let results: Vec<&str> = result_string.as_str().split("\n").collect();
918 /// let dot_dir = results.get(1).unwrap().chars().last().unwrap();
919 ///
920 /// assert_eq!(dot_dir, '.', "Incorrect options passed!");
921 /// ```
922 ///
923 #[macro_export]
924 macro_rules! rumtk_pipeline_patch_args {
925 ( $pipeline:expr, $replacements:expr ) => {{
926 use $crate::pipelines::pipeline_functions::{pipeline_patch_args};
927
928 pipeline_patch_args($pipeline, $replacements)
929 }};
930 }
931}