rumtk_core/pipelines.rs
1/*
2 * rumtk attempts to implement HL7 and medical protocols for interoperability in medicine.
3 * This toolkit aims to be reliable, simple, performant, and standards compliant.
4 * Copyright (C) 2025 Luis M. Santos, M.D. <lsantos@medicalmasses.com>
5 * Copyright (C) 2025 MedicalMasses L.L.C. <contact@medicalmasses.com>
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21pub mod pipeline_types {
22 use crate::core::{RUMResult, RUMVec};
23 use crate::strings::{RUMString, RUMStringConversions};
24 use crate::types::{RUMBuffer, RUMDeserialize, RUMHashMap, RUMSerialize};
25 use std::process::{Child, Command};
26
27 pub static EMPTY_COMMAND_LINE: RUMCommandLine = RUMCommandLine::new();
28
29 pub type RUMCommandArgs = Vec<RUMString>;
30 pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
31 #[derive(RUMSerialize, RUMDeserialize, PartialEq, Default, Debug, Clone)]
32 pub struct RUMCommand {
33 pub path: RUMString,
34 #[serde(skip)]
35 pub data: Option<RUMBuffer>,
36 pub args: RUMCommandArgs,
37 pub env: RUMCommandEnv,
38 }
39
40 impl RUMCommand {
41 pub fn new(
42 prog: &str,
43 data: &Option<RUMBuffer>,
44 args: &RUMCommandArgs,
45 env: &RUMCommandEnv,
46 ) -> Self {
47 RUMCommand {
48 path: prog.to_string(),
49 args: args.clone(),
50 env: env.clone(),
51 data: data.clone(),
52 }
53 }
54 }
55
56 pub type RUMCommandLine = RUMVec<RUMCommand>;
57 pub type RUMPipelineCommand = Command;
58 pub type RUMPipelineProcess = Child;
59 pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
60 pub type RUMPipelineResult = RUMResult<RUMBuffer>;
61}
62
63pub mod pipeline_functions {
64 use super::pipeline_types::*;
65 use crate::core::RUMResult;
66 use crate::strings::{rumtk_format, string_format, RUMArrayConversions, RUMString, StringReplacementPair};
67 use std::io::{Read, Write};
68 use std::os::unix::ffi::OsStrExt;
69
70 use crate::rumtk_resolve_sync_task;
71 use crate::types::RUMBuffer;
72 use std::process::{Command, Stdio};
73
74 const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
75 const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
76
77 ///
78 /// Given a command of type [RUMCommand](RUMCommand), generate a command instance the Rust
79 /// runtime can use to spawn a process.
80 ///
81 /// ## Example
82 ///
83 /// ```
84 /// use std::any::{Any, TypeId};
85 ///
86 /// use rumtk_core::strings::RUMString;
87 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
88 /// use rumtk_core::pipelines::pipeline_functions::pipeline_create_command;
89 ///
90 /// let command_name = "ls";
91 /// let mut command = RUMCommand::default();
92 /// command.path = RUMString::from(command_name);
93 ///
94 /// let sys_command = pipeline_create_command(&command);
95 ///
96 /// assert_eq!(sys_command.get_program().to_str().unwrap(), command_name, "");
97 ///
98 /// ```
99 ///
100 pub fn pipeline_create_command(command: &RUMCommand) -> RUMPipelineCommand {
101 let mut cmd = Command::new(command.path.as_str());
102
103 for arg in command.args.iter() {
104 cmd.arg(arg);
105 }
106
107 cmd.envs(command.env.iter());
108
109 cmd.stdin(Stdio::piped())
110 .stdout(Stdio::piped())
111 .stderr(Stdio::piped());
112
113 cmd
114 }
115
116 ///
117 /// Spawns a process out of the [RUMPipelineCommand](RUMPipelineCommand).
118 ///
119 /// ## Example
120 ///
121 /// ```
122 /// use std::any::{Any, TypeId};
123 ///
124 /// use rumtk_core::strings::RUMString;
125 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
126 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_spawn_process};
127 ///
128 /// let command_name = "ls";
129 /// let mut command = RUMCommand::default();
130 /// command.path = RUMString::from(command_name);
131 ///
132 /// let mut sys_command = pipeline_create_command(&command);
133 ///
134 /// let mut process = pipeline_spawn_process(&mut sys_command).unwrap();
135 ///
136 /// process.wait();
137 /// ```
138 ///
139 pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
140 match cmd.spawn() {
141 Ok(process) => {
142 println!("Spawned process {} => {} with args {:?}", process.id(), cmd.get_program().as_bytes().to_string()?, cmd.get_args());
143 Ok(process)
144 },
145 Err(e) => Err(rumtk_format!(
146 "Failed to spawn process {:?} because => {}",
147 cmd.get_program(),
148 e
149 )),
150 }
151 }
152
153 ///
154 /// Given a process of type [RUMPipelineProcess](RUMPipelineProcess) and a rhs of type [RUMPipelineCommand](RUMPipelineCommand),
155 /// create a pipe of the lhs's `stdout` into the next command descriptor which is the rhs.
156 ///
157 /// ## Example
158 ///
159 /// ```
160 /// use std::any::{Any, TypeId};
161 /// use std::process::Stdio;
162 ///
163 /// use rumtk_core::strings::RUMString;
164 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
165 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_processes, pipeline_spawn_process};
166 ///
167 /// let ls_name = "ls";
168 /// let mut ls_command = RUMCommand::default();
169 /// ls_command.path = RUMString::from(ls_name);
170 /// let mut sys_ls_command = pipeline_create_command(&ls_command);
171 ///
172 /// let wc_name = "wc";
173 /// let mut wc_command = RUMCommand::default();
174 /// wc_command.path = RUMString::from(wc_name);
175 /// let mut sys_wc_command = pipeline_create_command(&wc_command);
176 ///
177 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
178 /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
179 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
180 ///
181 /// sys_ls_process.wait();
182 /// sys_wc_process.wait();
183 /// ```
184 ///
185 pub fn pipeline_pipe_processes(
186 process: &mut RUMPipelineProcess,
187 piped: &mut RUMPipelineCommand,
188 ) -> RUMResult<()> {
189 let process_stdout = Stdio::from(match process.stdout.take() {
190 Some(stdout) => stdout,
191 None => {
192 return Err(rumtk_format!(
193 "No stdout handle found for process {}.",
194 process.id()
195 ));
196 }
197 });
198 let _ = piped.stdin(process_stdout);
199 Ok(())
200 }
201
202 ///
203 /// Retrieves the standard output generated by the completed process.
204 ///
205 /// ## Example
206 ///
207 /// ```
208 /// use std::any::{Any, TypeId};
209 /// use std::process::Stdio;
210 ///
211 /// use rumtk_core::strings::RUMString;
212 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
213 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_spawn_process, pipeline_get_stdout, pipeline_pipe_processes};
214 ///
215 /// let ls_name = "ls";
216 /// let mut ls_command = RUMCommand::default();
217 /// ls_command.path = RUMString::from(ls_name);
218 /// let mut sys_ls_command = pipeline_create_command(&ls_command);
219 ///
220 /// let wc_name = "wc";
221 /// let mut wc_command = RUMCommand::default();
222 /// wc_command.path = RUMString::from(wc_name);
223 /// let mut sys_wc_command = pipeline_create_command(&wc_command);
224 ///
225 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
226 /// pipeline_pipe_processes(&mut sys_ls_process, &mut sys_wc_command).unwrap();
227 /// let mut sys_wc_process = pipeline_spawn_process(&mut sys_wc_command).unwrap();
228 ///
229 /// sys_ls_process.wait();
230 /// sys_wc_process.wait();
231 ///
232 /// let out_data = pipeline_get_stdout(sys_wc_process).unwrap();
233 ///
234 /// assert_eq!(out_data.is_empty(), false, "No output detected... {:?}", &out_data);
235 /// ```
236 ///
237 pub fn pipeline_get_stdout(mut process: RUMPipelineProcess) -> RUMResult<RUMBuffer> {
238 match process.wait_with_output() {
239 Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
240 Err(e) => Err(rumtk_format!(
241 "Issue reading last process output because => {}",
242 e
243 )),
244 }
245 }
246
247 ///
248 /// Closes the `stdin` standard in file for process. Useful to trigger a resolution of the pipeline.
249 ///
250 /// ## Example
251 ///
252 /// ```
253 /// use rumtk_core::pipelines::pipeline_functions::pipeline_close_process_stdin;
254 /// use rumtk_core::strings::RUMString;
255 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
256 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_into_process, pipeline_spawn_process};
257 /// use rumtk_core::types::RUMBuffer;
258 ///
259 /// let ls_name = "ls";
260 /// let mut ls_command = RUMCommand::default();
261 /// ls_command.path = RUMString::from(ls_name);
262 /// let mut sys_ls_command = pipeline_create_command(&ls_command);
263 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
264 ///
265 /// pipeline_close_process_stdin(&mut sys_ls_process);
266 ///
267 ///
268 /// ```
269 ///
270 pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
271 // Do not change into an expect() or such unwrap. We just want to ignore and assume stdin is closed.
272 match process.stdin.take() {
273 Some(stdin) => {
274 drop(stdin);
275 }
276 None => {}
277 };
278 }
279
280 ///
281 /// Pipe data into a process.
282 ///
283 /// ## Example
284 ///
285 /// ```
286 /// use rumtk_core::strings::RUMString;
287 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMPipelineCommand};
288 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_create_command, pipeline_pipe_into_process, pipeline_spawn_process};
289 /// use rumtk_core::types::RUMBuffer;
290 ///
291 /// let ls_name = "ls";
292 /// let mut ls_command = RUMCommand::default();
293 /// ls_command.path = RUMString::from(ls_name);
294 /// let mut sys_ls_command = pipeline_create_command(&ls_command);
295 /// let mut sys_ls_process = pipeline_spawn_process(&mut sys_ls_command).unwrap();
296 /// pipeline_pipe_into_process(&mut sys_ls_process, &RUMBuffer::default()).unwrap();
297 ///
298 /// let out = sys_ls_process.wait_with_output().unwrap();
299 ///
300 /// assert_eq!(out.stdout.is_empty(), false, "Piped command returned an empty buffer? => {:?}", String::from_utf8_lossy(out.stdout.as_slice()))
301 /// ```
302 ///
303 pub fn pipeline_pipe_into_process(
304 process: &mut RUMPipelineProcess,
305 data: &RUMBuffer,
306 ) -> RUMResult<()> {
307 match process.stdin.take() {
308 Some(ref mut stdin) => match stdin.write_all(data.as_slice()) {
309 Ok(_) => {}
310 Err(e) => {
311 return Err(rumtk_format!(
312 "Failed to pipe data to stdin of process {} because => {}",
313 process.id(),
314 e
315 ))
316 }
317 },
318 None => {}
319 }
320 Ok(())
321 }
322
323 ///
324 /// Builds an executable pipeline out of a list of [RUMCommand](RUMCommand).
325 ///
326 /// ## Example
327 ///
328 /// ```
329 /// use rumtk_core::strings::{RUMStringConversions, RUMString};
330 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
331 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_generate_command, pipeline_get_stdout, pipeline_wait_pipeline};
332 /// use rumtk_core::types::RUMBuffer;
333 ///
334 /// let ls_name = "ls";
335 /// let mut ls_command = RUMCommand::default();
336 /// ls_command.path = RUMString::from(ls_name);
337 /// ls_command.args = vec!["-la".to_string()];
338 ///
339 /// let mut process = pipeline_generate_command(&ls_command, &RUMBuffer::default()).unwrap();
340 /// let result = pipeline_get_stdout(process).unwrap();
341 ///
342 /// assert!(result.len() > 0, "Command generation failed to generate a valid command!");
343 /// ```
344 ///
345 pub fn pipeline_generate_command(command: &RUMCommand, data: &RUMBuffer) -> RUMResult<RUMPipelineProcess> {
346 let mut root = pipeline_create_command(&command);
347 let mut process = pipeline_spawn_process(&mut root)?;
348 pipeline_pipe_into_process(&mut process, data)?;
349 pipeline_close_process_stdin(&mut process);
350
351 Ok(process)
352 }
353
354 ///
355 /// Add buffer at the beginning of pipeline to pipe in. This buffer serves as the initial input of
356 /// a pipe aware program.
357 ///
358 /// ## Example
359 /// ```
360 /// use rumtk_core::core::RUMResult;
361 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMCommandLine};
362 /// use rumtk_core::rumtk_pipeline_run;
363 /// use rumtk_core::strings::{RUMString};
364 /// use rumtk_core::buffers::{buffer_to_string};
365 /// use rumtk_core::types::RUMBuffer;
366 ///
367 /// let data = RUMBuffer::from_static(b"Hello World");
368 /// let wc_name = "wc";
369 /// let mut wc_command = RUMCommand::default();
370 /// wc_command.path = RUMString::from(wc_name);
371 /// let mut pipeline = vec![
372 /// wc_command
373 /// ];
374 ///
375 /// let processor = || -> RUMResult<RUMBuffer> {rumtk_pipeline_run!(&pipeline, &data)};
376 /// let result_string = buffer_to_string(&processor().unwrap()).unwrap();
377 /// let binding = result_string.as_str().replace('\n', "");
378 /// let result_items: Vec<&str> = binding.split(" ").collect();
379 /// let result = result_items.get(2).unwrap().trim().parse::<i32>().unwrap();
380 ///
381 /// assert_eq!(result, 2, "Data was not piped properly!");
382 /// ```
383 ///
384 pub fn pipeline_add_stdin_data_to_pipeline<'a>(pipeline: &'a mut RUMCommandLine, data: &RUMBuffer) -> &'a RUMCommandLine {
385 match pipeline.get_mut(0) {
386 Some(command) => command.data = Some(data.clone()),
387 None => {
388 return pipeline;
389 }
390 };
391
392 pipeline
393 }
394
395 ///
396 /// Flatten the [RUMCommandLine] structure into a single string representing the pipeline and
397 /// print it or log it.
398 ///
399 fn print_pipeline(pipeline: &RUMCommandLine) {
400 let mut pipeline_components = Vec::<RUMString>::with_capacity(pipeline.len());
401
402 for pipe in pipeline.iter() {
403 pipeline_components.push(rumtk_format!("{} {}", pipe.path, pipe.args.clone().join(" ")));
404 }
405
406 println!("Executing {}", pipeline_components.join(" | "));
407 }
408
409 pub fn pipeline_patch_command_args<'a>(cmd: &'a mut RUMCommand, replacements: &StringReplacementPair) -> RUMResult<&'a RUMCommand> {
410 let mut new_args = RUMCommandArgs::with_capacity(cmd.args.len());
411
412 for arg in cmd.args.iter() {
413 new_args.push(string_format(arg, replacements));
414 }
415
416 cmd.args = new_args;
417
418 Ok(cmd)
419 }
420
421 ///
422 /// Patches the arguments of the first command with the pattern=replacement pairs!
423 ///
424 /// ## Example
425 /// ```
426 /// use rumtk_core::core::RUMResult;
427 /// use rumtk_core::pipelines::pipeline_functions::pipeline_patch_args;
428 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand, RUMCommandLine};
429 /// use rumtk_core::rumtk_pipeline_run;
430 /// use rumtk_core::strings::{RUMString};
431 /// use rumtk_core::buffers::{buffer_to_string};
432 /// use rumtk_core::types::RUMBuffer;
433 ///
434 /// let ls_name = "ls";
435 /// let mut ls_command = RUMCommand::default();
436 /// ls_command.path = RUMString::from(ls_name);
437 /// ls_command.args.push(RUMString::from("{options}"));
438 /// let mut pipeline = vec![
439 /// ls_command
440 /// ];
441 /// pipeline_patch_args(&mut pipeline, &[("{options}", "-la")]);
442 ///
443 /// let processor = || -> RUMResult<RUMBuffer> {rumtk_pipeline_run!(&pipeline)};
444 /// let result_string = buffer_to_string(&processor().unwrap()).unwrap();
445 /// let results: Vec<&str> = result_string.as_str().split("\n").collect();
446 /// let dot_dir = results.get(1).unwrap().chars().last().unwrap();
447 ///
448 /// assert_eq!(dot_dir, '.', "Incorrect options passed!");
449 /// ```
450 ///
451 pub fn pipeline_patch_args<'a>(pipeline: &'a mut RUMCommandLine, replacements: &StringReplacementPair) -> RUMResult<&'a RUMCommandLine> {
452 for mut cmd in pipeline.iter_mut() {
453 pipeline_patch_command_args(&mut cmd, replacements)?;
454 }
455 Ok(pipeline)
456 }
457
458 ///
459 /// Await for pipeline to execute in a async friendly manner. Once the pipeline execution ends,
460 /// consume the pipeline and return the output.
461 ///
462 /// ## Example
463 ///
464 /// ```
465 /// use rumtk_core::strings::RUMString;
466 /// use rumtk_core::types::RUMBuffer;
467 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
468 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_await_pipeline};
469 /// use rumtk_core::{rumtk_resolve_task};
470 ///
471 /// let ls_name = "ls";
472 /// let mut ls_command = RUMCommand::default();
473 /// ls_command.path = RUMString::from(ls_name);
474 ///
475 /// let wc_name = "wc";
476 /// let mut wc_command = RUMCommand::default();
477 /// wc_command.path = RUMString::from(wc_name);
478 ///
479 /// let pipeline = vec![
480 /// ls_command,
481 /// wc_command
482 /// ];
483 ///
484 /// let result = rumtk_resolve_task!(pipeline_await_pipeline(&pipeline, &RUMBuffer::default())).unwrap();
485 ///
486 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
487 /// ```
488 ///
489 pub async fn pipeline_await_pipeline(pipeline: &RUMCommandLine, initial_data: &RUMBuffer) -> RUMPipelineResult {
490 let pipeline_copy = pipeline.clone();
491 let data_copy = initial_data.clone();
492 rumtk_resolve_sync_task!(move || {
493 pipeline_wait_pipeline(&pipeline_copy, &data_copy)
494 })
495 }
496
497 ///
498 /// Await for pipeline to complete execution. Once the pipeline execution ends,
499 /// consume the pipeline and return the output.
500 ///
501 /// ## Example
502 ///
503 /// ```
504 /// use rumtk_core::strings::RUMString;
505 /// use rumtk_core::pipelines::pipeline_types::{RUMCommand};
506 /// use rumtk_core::pipelines::pipeline_functions::{pipeline_wait_pipeline};
507 /// use rumtk_core::types::RUMBuffer;
508 ///
509 /// let ls_name = "ls";
510 /// let mut ls_command = RUMCommand::default();
511 /// ls_command.path = RUMString::from(ls_name);
512 ///
513 /// let wc_name = "wc";
514 /// let mut wc_command = RUMCommand::default();
515 /// wc_command.path = RUMString::from(wc_name);
516 ///
517 /// let pipeline = vec![
518 /// ls_command,
519 /// wc_command
520 /// ];
521 ///
522 /// let result = pipeline_wait_pipeline(&pipeline, &RUMBuffer::default()).unwrap();
523 ///
524 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
525 /// ```
526 ///
527 pub fn pipeline_wait_pipeline(pipeline: &RUMCommandLine, initial_data: &RUMBuffer) -> RUMPipelineResult {
528 let mut last_data = initial_data.clone();
529
530 // Print pipeline
531 print_pipeline(&pipeline);
532
533 // Now let's visit each process and await their completion!
534 for c in pipeline.iter() {
535 let mut p = pipeline_generate_command(&c, &last_data)?;
536 pipeline_close_process_stdin(&mut p);
537 println!("Waiting on {}", p.id());
538 last_data = pipeline_get_stdout(p)?;
539 }
540
541 Ok(last_data)
542 }
543}
544
545pub mod pipeline_macros {
546 ///
547 /// Creates a pipeline command out of the provided parameters. Parameters include `path`, `args`,
548 /// and `env`. The command has [RUMCommand](super::pipeline_types::RUMCommand).
549 ///
550 /// `env` is a map of type [RUMCommandEnv](super::pipeline_types::RUMCommandEnv) containing a set of
551 /// key value pair strings that we can use to update the process environment.
552 ///
553 /// ## Example
554 ///
555 /// ### Program Only
556 ///
557 /// ```
558 /// use rumtk_core::rumtk_pipeline_command;
559 ///
560 /// let command = rumtk_pipeline_command!("ls");
561 /// ```
562 ///
563 /// ### Program with Piped Data
564 ///
565 /// ```
566 /// use rumtk_core::rumtk_pipeline_command;
567 /// use rumtk_core::types::RUMBuffer;
568 /// use rumtk_core::strings::RUMStringConversions;
569 ///
570 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default());
571 /// ```
572 ///
573 /// ### Program with Args
574 ///
575 /// ```
576 /// use rumtk_core::rumtk_pipeline_command;
577 /// use rumtk_core::types::RUMBuffer;
578 /// use rumtk_core::strings::RUMStringConversions;
579 ///
580 /// let command = rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
581 /// "-l".to_string()
582 /// ]);
583 /// ```
584 ///
585 #[macro_export]
586 macro_rules! rumtk_pipeline_command {
587 ( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
588 use $crate::pipelines::pipeline_types::RUMCommand;
589
590 RUMCommand::new($path, &Some($data), $args, $env)
591 }};
592 ( $path:expr, $data:expr, $args:expr ) => {{
593 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
594
595 RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
596 }};
597 ( $path:expr, $data:expr ) => {{
598 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
599
600 RUMCommand::new(
601 $path,
602 &Some($data),
603 &RUMCommandArgs::default(),
604 &RUMCommandEnv::default(),
605 )
606 }};
607 ( $path:expr ) => {{
608 use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
609 use $crate::types::RUMBuffer;
610
611 RUMCommand::new(
612 $path,
613 &None,
614 &RUMCommandArgs::default(),
615 &RUMCommandEnv::default(),
616 )
617 }};
618 }
619
620 ///
621 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
622 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
623 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
624 ///
625 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
626 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
627 ///
628 /// ## Example
629 ///
630 /// ### Simple
631 ///
632 /// ```
633 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_quick_run, rumtk_resolve_task, rumtk_init_threads};
634 /// use rumtk_core::core::{RUMResult};
635 /// use rumtk_core::strings::RUMStringConversions;
636 /// use rumtk_core::types::RUMBuffer;
637 ///
638 /// let f = async || -> RUMResult<()> {
639 /// let result = rumtk_pipeline_quick_run!(
640 /// rumtk_pipeline_command!("ls"),
641 /// rumtk_pipeline_command!("wc")
642 /// ).unwrap();
643 ///
644 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
645 /// Ok(())
646 /// };
647 ///
648 /// rumtk_resolve_task!(f()).unwrap();
649 /// ```
650 ///
651 #[macro_export]
652 macro_rules! rumtk_pipeline_quick_run {
653 ( $($command:expr),+ ) => {{
654 use $crate::types::RUMBuffer;
655 use $crate::pipelines::pipeline_functions::{pipeline_wait_pipeline};
656
657 let pipeline = vec![$($command),+];
658
659 pipeline_wait_pipeline(&pipeline, &RUMBuffer::default())
660 }};
661 }
662
663 ///
664 /// Given a series of [RUMCommand](super::pipeline_types::RUMCommand) passed to this macro, prepare
665 /// and execute the commands in a pipeline. A pipeline here refers to the Unix style pipeline which is the
666 /// terminal form of a pipeline. The pipeline behaves like it would in the terminal => `ls | wc`.
667 ///
668 /// See [this article](https://cscie26.dce.harvard.edu/~dce-lib113/reference/unix/unix2.html)
669 /// and the [Unix Philosophy](https://cscie2x.dce.harvard.edu/hw/ch01s06.html) to learn more!
670 ///
671 /// This is the `async` flavor.
672 ///
673 /// ## Example
674 ///
675 /// ### Simple
676 ///
677 /// ```
678 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_quick_run_async, rumtk_resolve_task, rumtk_init_threads};
679 /// use rumtk_core::core::{RUMResult};
680 /// use rumtk_core::strings::RUMStringConversions;
681 /// use rumtk_core::types::RUMBuffer;
682 ///
683 /// let f = async || -> RUMResult<()> {
684 /// let result = rumtk_pipeline_quick_run_async!(
685 /// rumtk_pipeline_command!("ls"),
686 /// rumtk_pipeline_command!("wc")
687 /// ).await?;
688 ///
689 /// assert_eq!(result.is_empty(), false, "Pipeline returned no buffer from command wc! => {:?}", &result);
690 /// Ok(())
691 /// };
692 ///
693 /// rumtk_resolve_task!(f()).unwrap();
694 /// ```
695 ///
696 #[macro_export]
697 macro_rules! rumtk_pipeline_quick_run_async {
698 ( $($command:expr),+ ) => {{
699 use $crate::types::RUMBuffer;
700 use $crate::pipelines::pipeline_functions::{pipeline_await_pipeline};
701
702 let pipeline = vec![$($command),+];
703
704 pipeline_await_pipeline(&pipeline.clone(), &RUMBuffer::default().clone())
705 }};
706 }
707
708 ///
709 /// This macro is similar to [rumtk_pipeline_quick_run](crate::rumtk_pipeline_quick_run). The difference here
710 /// is that the function takes a pipeline structure ([RUMCommandLine](crate::pipelines::pipeline_types::RUMCommandLine))
711 /// In other words, this macro simply runs an already defined pipeline.
712 ///
713 /// ## Example
714 ///
715 /// ### Run the pipeline
716 /// ```
717 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run, rumtk_resolve_task, rumtk_init_threads};
718 /// use rumtk_core::core::{RUMResult};
719 /// use rumtk_core::strings::RUMStringConversions;
720 /// use rumtk_core::types::RUMBuffer;
721 ///
722 /// let f = || -> RUMResult<RUMBuffer> {
723 /// let pipeline = vec![
724 /// rumtk_pipeline_command!("ls"),
725 /// rumtk_pipeline_command!("wc")
726 /// ];
727 ///
728 /// rumtk_pipeline_run!(&pipeline)
729 /// };
730 ///
731 /// f().unwrap();
732 /// ```
733 ///
734 /// ### Pipe Buffer to Pipeline
735 ///
736 /// ```
737 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run};
738 /// use rumtk_core::core::RUMResult;
739 /// use rumtk_core::types::RUMBuffer;
740 /// use rumtk_core::strings::{string_to_buffer};
741 /// use rumtk_core::buffers::{buffer_to_string};
742 ///
743 /// const data: &str = "Hello World!";
744 /// const expected: &str = " 0 2 12\n";
745 ///
746 ///
747 /// let f = |input: &RUMBuffer| -> RUMResult<RUMBuffer> {
748 /// let mut pipeline = vec![
749 /// rumtk_pipeline_command!("wc")
750 /// ];
751 ///
752 /// rumtk_pipeline_run!(&pipeline, &input)
753 /// };
754 /// let result = buffer_to_string(&f(&string_to_buffer(data)).unwrap()).unwrap();
755 ///
756 /// assert_eq!(result, expected, "Buffer correctly piped into pipeline!");
757 /// ```
758 ///
759 /// ### Pipe String to Pipeline
760 ///
761 /// ```
762 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run};
763 /// use rumtk_core::core::RUMResult;
764 /// use rumtk_core::strings::{string_to_buffer};
765 /// use rumtk_core::buffers::{buffer_to_string};
766 /// use rumtk_core::types::RUMBuffer;
767 ///
768 /// const data: &str = "Hello World!";
769 /// const expected: &str = " 0 2 12\n";
770 ///
771 ///
772 /// let f = |input: &str| -> RUMResult<RUMBuffer> {
773 /// let mut pipeline = vec![
774 /// rumtk_pipeline_command!("wc")
775 /// ];
776 ///
777 /// rumtk_pipeline_run!(&pipeline, &string_to_buffer(input))
778 /// };
779 /// let result = buffer_to_string(&f(data).unwrap()).unwrap();
780 ///
781 /// assert_eq!(result, expected, "String correctly piped into pipeline!");
782 /// ```
783 ///
784 #[macro_export]
785 macro_rules! rumtk_pipeline_run {
786 ( $pipeline:expr ) => {{
787 use $crate::types::RUMBuffer;
788
789 rumtk_pipeline_run!($pipeline, &RUMBuffer::default())
790 }};
791 ( $pipeline:expr, $data:expr ) => {{
792 use $crate::pipelines::pipeline_functions::{pipeline_wait_pipeline};
793
794 pipeline_wait_pipeline($pipeline, $data)
795 }};
796 }
797
798
799 ///
800 /// This macro is similar to [rumtk_pipeline_quick_run_async](crate::rumtk_pipeline_quick_run_async). The difference here
801 /// is that the function takes a pipeline structure ([RUMCommandLine](crate::pipelines::pipeline_types::RUMCommandLine))
802 /// In other words, this macro simply runs an already defined pipeline.
803 ///
804 /// ## Example
805 /// ```
806 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_init_threads};
807 /// use rumtk_core::core::{RUMResult};
808 /// use rumtk_core::strings::RUMStringConversions;
809 /// use rumtk_core::types::RUMBuffer;
810 ///
811 /// let f = async || -> RUMResult<RUMBuffer> {
812 /// let pipeline = vec![
813 /// rumtk_pipeline_command!("ls"),
814 /// rumtk_pipeline_command!("wc")
815 /// ];
816 ///
817 /// rumtk_pipeline_run_async!(&pipeline).await
818 /// };
819 ///
820 /// rumtk_resolve_task!(f()).unwrap();
821 /// ```
822 ///
823 /// ### Pipe Buffer to Pipeline
824 ///
825 /// ```
826 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_spawn_task};
827 /// use rumtk_core::core::RUMResult;
828 /// use rumtk_core::types::RUMBuffer;
829 /// use rumtk_core::buffers::{buffer_to_string};
830 /// use rumtk_core::strings::{string_to_buffer};
831 ///
832 /// const data: &str = "Hello World!";
833 /// const expected: &str = " 0 2 12\n";
834 ///
835 ///
836 /// let task = rumtk_spawn_task!(async {
837 /// let input = RUMBuffer::from("Hello World!");
838 /// let mut pipeline = vec![
839 /// rumtk_pipeline_command!("wc")
840 /// ];
841 ///
842 /// rumtk_pipeline_run_async!(&pipeline, &input).await
843 /// });
844 ///
845 /// let result = buffer_to_string(&rumtk_resolve_task!(task).unwrap().unwrap()).unwrap();
846 ///
847 /// assert_eq!(result, expected, "Buffer correctly piped into pipeline!");
848 /// ```
849 ///
850 /// ### Pipe String to Pipeline
851 ///
852 /// ```
853 /// use rumtk_core::{rumtk_pipeline_command, rumtk_pipeline_run_async, rumtk_resolve_task, rumtk_spawn_task};
854 /// use rumtk_core::core::RUMResult;
855 /// use rumtk_core::strings::{string_to_buffer};
856 /// use rumtk_core::buffers::{buffer_to_string};
857 /// use rumtk_core::types::RUMBuffer;
858 ///
859 /// const data: &str = "Hello World!";
860 /// const expected: &str = " 0 2 12\n";
861 ///
862 ///
863 /// let task = rumtk_spawn_task!(async {
864 /// let input = string_to_buffer(data);
865 /// let mut pipeline = vec![
866 /// rumtk_pipeline_command!("wc")
867 /// ];
868 ///
869 /// rumtk_pipeline_run_async!(&pipeline, &input).await
870 /// });
871 ///
872 /// let result = buffer_to_string(&rumtk_resolve_task!(task).unwrap().unwrap()).unwrap();
873 ///
874 /// assert_eq!(result, expected, "String correctly piped into pipeline!");
875 /// ```
876 ///
877 #[macro_export]
878 macro_rules! rumtk_pipeline_run_async {
879 ( $pipeline:expr ) => {{
880 use $crate::types::RUMBuffer;
881
882 rumtk_pipeline_run_async!($pipeline, &RUMBuffer::default())
883 }};
884 ( $pipeline:expr, $data:expr ) => {{
885 use $crate::pipelines::pipeline_functions::{pipeline_await_pipeline};
886
887 pipeline_await_pipeline($pipeline, $data)
888 }};
889 }
890
891 ///
892 /// Patch the pipeline's arguments with desired dynamic options.
893 ///
894 /// ## Example
895 /// ```
896 /// use rumtk_core::{rumtk_pipeline_patch_args, rumtk_pipeline_command, rumtk_pipeline_run};
897 /// use rumtk_core::core::RUMResult;
898 /// use rumtk_core::buffers::{buffer_to_string};
899 /// use rumtk_core::types::RUMBuffer;
900 /// use rumtk_core::strings::string_to_buffer;
901 ///
902 ///
903 /// let f = || -> RUMResult<RUMBuffer> {
904 /// let mut pipeline = vec![
905 /// rumtk_pipeline_command!("ls", RUMBuffer::default(), &vec![
906 /// "{options}".into()
907 /// ])
908 /// ];
909 ///
910 /// rumtk_pipeline_patch_args!(&mut pipeline, &[("{options}", "-la")]);
911 ///
912 /// rumtk_pipeline_run!(&pipeline)
913 /// };
914 ///
915 /// let result_string = buffer_to_string(&f().unwrap()).unwrap();
916 /// let results: Vec<&str> = result_string.as_str().split("\n").collect();
917 /// let dot_dir = results.get(1).unwrap().chars().last().unwrap();
918 ///
919 /// assert_eq!(dot_dir, '.', "Incorrect options passed!");
920 /// ```
921 ///
922 #[macro_export]
923 macro_rules! rumtk_pipeline_patch_args {
924 ( $pipeline:expr, $replacements:expr ) => {{
925 use $crate::pipelines::pipeline_functions::{pipeline_patch_args};
926
927 pipeline_patch_args($pipeline, $replacements)
928 }};
929 }
930}