pub mod pipeline_types {
use crate::strings::{RUMString, RUMStringConversions};
use crate::types::{RUMBuffer, RUMHashMap};
use crate::core::{RUMResult, RUMVec};
use std::process::{Child, Command};
pub type RUMCommandArgs = Vec<RUMString>;
pub type RUMCommandEnv = RUMHashMap<RUMString, RUMString>;
#[derive(Default, Debug, Clone)]
pub struct RUMCommand {
pub path: RUMString,
pub data: Option<RUMBuffer>,
pub args: RUMCommandArgs,
pub env: RUMCommandEnv,
}
impl RUMCommand {
pub fn new(
prog: &str,
data: &Option<RUMBuffer>,
args: &RUMCommandArgs,
env: &RUMCommandEnv,
) -> Self {
RUMCommand {
path: prog.to_rumstring(),
args: args.clone(),
env: env.clone(),
data: data.clone(),
}
}
}
pub type RUMCommandLine = RUMVec<RUMCommand>;
pub type RUMPipelineCommand = Command;
pub type RUMPipelineProcess = Child;
pub type RUMPipeline = RUMVec<RUMPipelineProcess>;
pub type RUMPipelineResult = RUMResult<RUMBuffer>;
}
pub mod pipeline_functions {
use super::pipeline_types::*;
use crate::core::RUMResult;
use crate::strings::rumtk_format;
use std::io::{Read, Write};
use crate::threading::threading_functions::async_sleep;
use crate::types::RUMBuffer;
use std::process::{Command, Stdio};
const DEFAULT_PROCESS_ASYNC_WAIT: f32 = 0.001;
const DEFAULT_STDOUT_CHUNK_SIZE: usize = 1024;
pub fn pipeline_generate_command(command: &RUMCommand) -> RUMPipelineCommand {
let mut cmd = Command::new(command.path.as_str());
for arg in command.args.iter() {
cmd.arg(arg);
}
cmd.envs(command.env.iter());
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
cmd
}
pub fn pipeline_spawn_process(cmd: &mut RUMPipelineCommand) -> RUMResult<RUMPipelineProcess> {
match cmd.spawn() {
Ok(process) => Ok(process),
Err(e) => Err(rumtk_format!(
"Failed to spawn process {:?} because => {}",
cmd.get_program(),
e
)),
}
}
pub fn pipeline_pipe_processes(
process: &mut RUMPipelineProcess,
piped: &mut RUMPipelineCommand,
) -> RUMResult<()> {
let process_stdout = Stdio::from(match process.stdout.take() {
Some(stdout) => stdout,
None => {
return Err(rumtk_format!(
"No stdout handle found for process {}.",
process.id()
));
}
});
let _ = piped.stdin(process_stdout);
Ok(())
}
pub fn pipeline_get_stdout(mut pipeline: RUMPipeline) -> RUMResult<RUMBuffer> {
let mut last_item = pipeline.pop().unwrap();
match last_item.wait_with_output() {
Ok(stdout) => Ok(RUMBuffer::from(stdout.stdout.clone())),
Err(e) => Err(rumtk_format!(
"Issue reading last process output because => {}",
e
)),
}
}
pub fn pipeline_close_process_stdin(process: &mut RUMPipelineProcess) {
match process.stdin.take() {
Some(stdin) => {
drop(stdin);
}
None => {}
};
}
pub fn pipeline_pipe_into_process(
process: &mut RUMPipelineProcess,
data: &Option<RUMBuffer>,
) -> RUMResult<()> {
match data {
Some(data) => match process.stdin {
Some(ref mut stdin) => match stdin.write_all(&data) {
Ok(_) => {}
Err(e) => {
return Err(rumtk_format!(
"Failed to pipe data to stdin of process because => {}",
e
))
}
},
None => {}
},
None => {}
}
Ok(())
}
pub fn pipeline_generate_pipeline(commands: &RUMCommandLine) -> RUMResult<RUMPipeline> {
let first_command = commands.first().unwrap();
let mut pipeline = vec![];
let mut root = pipeline_generate_command(&first_command);
let mut parent_process = pipeline_spawn_process(&mut root)?;
pipeline_pipe_into_process(&mut parent_process, &mut first_command.data.clone())?;
pipeline.push(parent_process);
for cmd in commands.iter().skip(1) {
let mut new_root = pipeline_generate_command(cmd);
pipeline_pipe_processes(pipeline.last_mut().unwrap(), &mut new_root)?;
parent_process = pipeline_spawn_process(&mut new_root)?;
pipeline.push(parent_process);
}
Ok(pipeline)
}
pub async fn pipeline_await_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
pipeline_close_process_stdin(pipeline.first_mut().unwrap());
for p in pipeline.iter_mut() {
loop {
match p.try_wait() {
Ok(code) => match code {
Some(code) => {
if !code.success() {
return Err(rumtk_format!(
"Process {} exited with non-success code => {}!",
p.id(),
code
));
}
break;
}
None => {
async_sleep(DEFAULT_PROCESS_ASYNC_WAIT).await;
continue;
}
},
Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
};
}
}
let result = pipeline_get_stdout(pipeline)?;
Ok(result)
}
pub fn pipeline_wait_pipeline(mut pipeline: RUMPipeline) -> RUMPipelineResult {
pipeline_close_process_stdin(pipeline.first_mut().unwrap());
for p in pipeline.iter_mut() {
match p.wait() {
Ok(code) => {
if !code.success() {
return Err(rumtk_format!(
"Process {} exited with non-success code => {}!",
p.id(),
code
));
}
continue;
}
Err(e) => return Err(rumtk_format!("Issue with process {} => {}", p.id(), e)),
};
}
let result = pipeline_get_stdout(pipeline)?;
Ok(result)
}
}
pub mod pipeline_macros {
#[macro_export]
macro_rules! rumtk_pipeline_command {
( $path:expr, $data:expr, $args:expr, $env:expr ) => {{
use $crate::pipelines::pipeline_types::RUMCommand;
RUMCommand::new($path, &Some($data), $args, $env)
}};
( $path:expr, $data:expr, $args:expr ) => {{
use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandEnv};
RUMCommand::new($path, &Some($data), $args, &RUMCommandEnv::default())
}};
( $path:expr, $data:expr ) => {{
use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
RUMCommand::new(
$path,
&Some($data),
&RUMCommandArgs::default(),
&RUMCommandEnv::default(),
)
}};
( $path:expr ) => {{
use $crate::pipelines::pipeline_types::{RUMCommand, RUMCommandArgs, RUMCommandEnv};
use $crate::types::RUMBuffer;
RUMCommand::new(
$path,
&None,
&RUMCommandArgs::default(),
&RUMCommandEnv::default(),
)
}};
}
#[macro_export]
macro_rules! rumtk_pipeline_run {
( $($command:expr),+ ) => {{
use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_wait_pipeline};
let pipeline = pipeline_generate_pipeline(&vec![
$($command),+
])?;
pipeline_wait_pipeline(pipeline)
}};
}
#[macro_export]
macro_rules! rumtk_pipeline_run_async {
( $($command:expr),+ ) => {{
use $crate::pipelines::pipeline_functions::{pipeline_generate_pipeline, pipeline_await_pipeline};
let pipeline = pipeline_generate_pipeline(&vec![
$($command),+
])?;
pipeline_await_pipeline(pipeline)
}};
}
}