pub mod errors;
mod jobs;
mod man;
mod redirection;
use std::env;
use std::fs::{self, create_dir_all};
use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::num::ParseIntError;
use std::path::{Path, PathBuf};
use std::process::exit;
use std::time::Duration;
use arrayvec::ArrayVec;
use permutate::Permutator;
use tokenizer::Token;
use num_cpus;
use self::errors::ParseErr;
pub use self::errors::FileErr;
#[derive(PartialEq)]
enum Mode { Arguments, Command, Inputs, InputsAppend, Files, FilesAppend }
pub const INPUTS_ARE_COMMANDS: u16 = 1;
pub const PIPE_IS_ENABLED: u16 = 2;
pub const SHELL_ENABLED: u16 = 4;
pub const QUIET_MODE: u16 = 8;
pub const VERBOSE_MODE: u16 = 16;
pub const DASH_EXISTS: u16 = 32;
pub const DRY_RUN: u16 = 64;
pub const SHELL_QUOTE: u16 = 128;
pub const ETA: u16 = 256;
pub const JOBLOG: u16 = 512;
pub const JOBLOG_8601: u16 = 1024;
pub const ION_EXISTS: u16 = 2048;
pub struct Args {
pub flags: u16,
pub ncores: usize,
pub ninputs: usize,
pub memory: u64,
pub delay: Duration,
pub timeout: Duration,
pub arguments: ArrayVec<[Token; 128]>,
pub joblog: Option<String>,
pub tempdir: Option<PathBuf>,
}
impl Args {
pub fn new() -> Args {
Args {
ncores: num_cpus::get(),
flags: 0,
arguments: ArrayVec::new(),
ninputs: 0,
memory: 0,
delay: Duration::from_millis(0),
timeout: Duration::from_millis(0),
joblog: None,
tempdir: None,
}
}
pub fn parse(&mut self, comm: &mut String, arguments: &[String], base_path: &mut PathBuf)
-> Result<usize, ParseErr>
{
let mut lists: Vec<Vec<String>> = Vec::new();
let mut current_inputs: Vec<String> = Vec::with_capacity(1024);
let mut max_args = 0;
let mut number_of_arguments = 0;
let mut quote_enabled = false;
if env::args().len() > 1 {
let (mut mode, mut index) = match arguments[1].as_str() {
":::" | ":::+" => (Mode::Inputs, 2),
"::::" | "::::+" => (Mode::Files, 2),
_ => (Mode::Arguments, 1)
};
let mut shebang = false;
if let Mode::Arguments = mode {
while let Some(argument) = arguments.get(index) {
index += 1;
let mut char_iter = argument.bytes();
if char_iter.next().unwrap() == b'-' {
let character = char_iter.next().ok_or_else(|| ParseErr::InvalidArgument(index-1))?;
if character == b'j' {
let val = parse_jobs(argument, arguments.get(index), &mut index)?;
if val != 0 { self.ncores = val; }
} else if character == b'n' {
max_args = parse_max_args(argument, arguments.get(index), &mut index)?;
} else if character != b'-' {
for character in argument[1..].bytes() {
match character {
b'h' => {
println!("{}", man::MAN_PAGE);
exit(0);
},
b'p' => self.flags |= PIPE_IS_ENABLED,
b'q' => quote_enabled = true,
b's' => self.flags |= QUIET_MODE,
b'v' => self.flags |= VERBOSE_MODE,
_ => {
let stderr = io::stderr();
let _ = writeln!(stderr.lock(), "parallel: unsupported argument: '-{}'", character as char);
}
}
}
} else {
match &argument[2..] {
"delay" => {
let val = arguments.get(index).ok_or(ParseErr::DelayNoValue)?;
let seconds = val.parse::<f64>().map_err(|_| ParseErr::DelayNaN(index))?;
self.delay = Duration::from_millis((seconds * 1000f64) as u64);
index += 1;
},
"dry-run" => self.flags |= DRY_RUN,
"eta" => self.flags |= ETA + QUIET_MODE,
"help" => {
println!("{}", man::MAN_PAGE);
exit(0);
},
"joblog" => {
let file = arguments.get(index).ok_or(ParseErr::JoblogNoValue)?;
self.joblog = Some(file.to_owned());
index += 1;
self.flags |= JOBLOG;
},
"joblog-8601" => self.flags |= JOBLOG_8601,
"jobs" => {
let val = jobs::parse(arguments.get(index).ok_or(ParseErr::JobsNoValue)?)?;
if val != 0 { self.ncores = val; }
index += 1;
},
"num-cpu-cores" => {
println!("{}", num_cpus::get());
exit(0);
},
"max-args" => {
let val = arguments.get(index).ok_or(ParseErr::MaxArgsNoValue)?;
max_args = val.parse::<usize>().map_err(|_| ParseErr::MaxArgsNaN(index))?;
index += 1;
},
"mem-free" => {
let val = arguments.get(index).ok_or(ParseErr::MemNoValue)?;
self.memory = parse_memory(val).map_err(|_| ParseErr::MemInvalid(index))?;
index += 1;
},
"pipe" => self.flags |= PIPE_IS_ENABLED,
"quiet" | "silent" => self.flags |= QUIET_MODE,
"quote" => quote_enabled = true,
"shellquote" => self.flags |= DRY_RUN + SHELL_QUOTE,
"timeout" => {
let val = arguments.get(index).ok_or(ParseErr::TimeoutNoValue)?;
let seconds = val.parse::<f64>().map_err(|_| ParseErr::TimeoutNaN(index))?;
self.timeout = Duration::from_millis((seconds * 1000f64) as u64);
index += 1;
},
"verbose" => self.flags |= VERBOSE_MODE,
"version" => {
println!("MIT/Rust Parallel {}", env!("CARGO_PKG_VERSION"));
exit(0);
},
"tmpdir" | "tempdir" => {
*base_path = PathBuf::from(arguments.get(index).ok_or(ParseErr::WorkDirNoValue)?);
index += 1;
if let Err(why) = create_dir_all(base_path.as_path()) {
let stderr = io::stderr();
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "parallel: unable to create tempdir {:?}: {}", base_path.as_path(), why);
exit(1);
}
}
_ if &argument[2..9] == "shebang" => {
shebang = true;
comm.push_str(&argument[10..]);
break
},
_ => {
let stderr = io::stderr();
let _ = writeln!(stderr.lock(), "parallel: unsupported argument: '{}'", argument);
}
}
}
} else {
match argument.as_str() {
":::" => mode = Mode::Inputs,
"::::" => mode = Mode::Files,
_ => {
if quote_enabled {
comm.push_str("e_command(argument));
} else {
comm.push_str(argument);
}
mode = Mode::Command;
}
}
break
}
}
}
if comm.is_empty() {
self.flags |= INPUTS_ARE_COMMANDS;
} else {
if !quote_enabled { check_command(comm.as_str())?; }
}
if let Some(path) = redirection::input_was_redirected() {
file_parse(&mut current_inputs, path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?,
self.flags & INPUTS_ARE_COMMANDS != 0)?;
} else if let Mode::Command = mode {
while let Some(argument) = arguments.get(index) {
index += 1;
match argument.as_str() {
":::" | ":::+" => mode = Mode::Inputs,
"::::" | "::::+" => mode = Mode::Files,
_ => {
comm.push(' ');
if quote_enabled {
comm.push_str("e_inputs(argument));
} else {
comm.push_str(argument);
}
continue
}
}
break
}
if comm.is_empty() {
self.flags |= INPUTS_ARE_COMMANDS;
} else {
if !quote_enabled { check_command(comm.as_str())?; }
}
if shebang {
file_parse(&mut current_inputs, &arguments.last().unwrap(),
self.flags & INPUTS_ARE_COMMANDS != 0)?;
} else {
parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode,
self.flags & INPUTS_ARE_COMMANDS != 0)?;
}
} else {
parse_inputs(arguments, index, &mut current_inputs, &mut lists, &mut mode,
self.flags & INPUTS_ARE_COMMANDS != 0)?;
}
number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, base_path.clone())?;
} else if let Some(path) = redirection::input_was_redirected() {
let path = path.to_str().ok_or_else(|| ParseErr::RedirFile(path.clone()))?;
file_parse(&mut current_inputs, path, true)?;
number_of_arguments = write_inputs_to_disk(lists, current_inputs, max_args, base_path.clone())?;
}
if number_of_arguments == 0 {
if comm.is_empty() {
self.flags |= INPUTS_ARE_COMMANDS;
} else {
if !quote_enabled { check_command(comm.as_str())?; }
}
number_of_arguments = write_stdin_to_disk(max_args, base_path.clone(),
self.flags & INPUTS_ARE_COMMANDS != 0, quote_enabled)?;
}
if number_of_arguments == 0 { return Err(ParseErr::NoArguments); }
Ok(number_of_arguments)
}
}
fn check_command(input: &str) -> Result<(), ParseErr> {
let (mut single, mut double, mut back) = (false, false, false);
for byte in input.bytes() {
match byte {
_ if back => back = !back,
b'\\' => back = !back,
b'"' => double = !double,
b'\'' => single = !single,
_ => (),
}
}
if double || single { Err(ParseErr::NonTerminated(String::from(input))) } else { Ok(()) }
}
fn quote_command(input: &str) -> String {
let mut output = Vec::with_capacity(input.len());
let mut bytes = input.bytes();
while let Some(byte) = bytes.next() {
output.push(byte);
if byte == b' ' { break }
}
for byte in bytes {
match byte {
b'"' | b'\\' | b'\'' | b' ' => output.push(b'\\'),
_ => ()
}
output.push(byte);
}
unsafe { String::from_utf8_unchecked(output) }
}
fn quote_inputs(input: &str) -> String {
let mut output = Vec::with_capacity(input.len());
for character in input.bytes() {
match character {
b'"' | b'\\' | b'\'' | b' ' => output.push(b'\\'),
_ => ()
}
output.push(character);
}
unsafe { String::from_utf8_unchecked(output) }
}
fn write_stdin_to_disk(max_args: usize, mut unprocessed_path: PathBuf, inputs_are_commands: bool,
quote_enabled: bool) -> Result<usize, ParseErr>
{
let stderr = io::stderr();
let mut stderr = stderr.lock();
let _ = stderr.write(b"parallel: reading inputs from standard input\n");
unprocessed_path.push("unprocessed");
let disk_buffer = fs::OpenOptions::new().truncate(true).write(true).create(true).open(&unprocessed_path)
.map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.clone(), why)))?;
let mut disk_buffer = BufWriter::new(disk_buffer);
let mut number_of_arguments = 0;
let parse_line: Box<Fn(io::Result<String>) -> io::Result<String>> =
if inputs_are_commands && quote_enabled
{
Box::new(|input: io::Result<String>| -> io::Result<String> {
input.map(|x| quote_command(&x))
})
} else {
Box::new(|input: io::Result<String>| -> io::Result<String> { input })
};
let stdin = io::stdin();
if max_args < 2 {
for line in BufReader::new(stdin.lock()).lines() {
if let Ok(line) = parse_line(line) {
if line.is_empty() { continue }
disk_buffer.write(line.as_bytes()).and_then(|_| disk_buffer.write(b"\n"))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
number_of_arguments += 1;
}
}
} else {
let mut max_args_index = max_args;
for line in BufReader::new(stdin.lock()).lines() {
if let Ok(line) = parse_line(line) {
if line.is_empty() { continue }
if max_args_index == max_args {
max_args_index -= 1;
number_of_arguments += 1;
disk_buffer.write(line.as_bytes())
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
} else if max_args_index == 1 {
max_args_index = max_args;
disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(line.as_bytes()))
.and_then(|_| disk_buffer.write(b"\n"))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
} else {
max_args_index -= 1;
disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(line.as_bytes()))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
}
}
if max_args_index != max_args {
disk_buffer.write(b"\n")
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
}
Ok(number_of_arguments)
}
fn write_inputs_to_disk(lists: Vec<Vec<String>>, current_inputs: Vec<String>, max_args: usize,
mut unprocessed_path: PathBuf) -> Result<usize, ParseErr>
{
unprocessed_path.push("unprocessed");
let disk_buffer = fs::OpenOptions::new().truncate(true).write(true).create(true).open(&unprocessed_path)
.map_err(|why| ParseErr::File(FileErr::Open(unprocessed_path.to_owned(), why)))?;
let mut disk_buffer = BufWriter::new(disk_buffer);
let mut number_of_arguments = 0;
if lists.len() > 1 {
let tmp: Vec<Vec<&str>> = lists.iter()
.map(|list| list.iter().map(AsRef::as_ref).collect::<Vec<&str>>())
.collect();
let list_array: Vec<&[&str]> = tmp.iter().map(AsRef::as_ref).collect();
let mut permutator = Permutator::new(&list_array[..]);
let mut permutation_buffer = permutator.next().unwrap();
{
let mut iter = permutation_buffer.iter();
disk_buffer.write(iter.next().unwrap().as_bytes())
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
number_of_arguments += 1;
}
if max_args < 2 {
disk_buffer.write(b"\n").map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
while permutator.next_with_buffer(&mut permutation_buffer) {
let mut iter = permutation_buffer.iter();
disk_buffer.write(iter.next().unwrap().as_bytes())
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
disk_buffer.write(b"\n")
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
number_of_arguments += 1;
}
} else {
let mut max_args_index = max_args - 1;
while permutator.next_with_buffer(&mut permutation_buffer) {
let mut iter = permutation_buffer.iter();
if max_args_index == max_args {
max_args_index -= 1;
number_of_arguments += 1;
disk_buffer.write(iter.next().unwrap().as_bytes())
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
} else if max_args_index == 1 {
max_args_index = max_args;
disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes()))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
disk_buffer.write(b"\n")
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
} else {
max_args_index -= 1;
disk_buffer.write(b" ")
.and_then(|_| disk_buffer.write(iter.next().unwrap().as_bytes()))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
for element in iter {
disk_buffer.write(b" ").and_then(|_| disk_buffer.write(element.as_bytes()))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
}
}
}
} else if max_args < 2 {
for input in current_inputs {
disk_buffer.write(input.as_bytes())
.and_then(|_| disk_buffer.write(b"\n"))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
number_of_arguments += 1;
}
} else {
for chunk in current_inputs.chunks(max_args) {
let max_index = chunk.len()-1;
let mut index = 0;
number_of_arguments += 1;
while index != max_index {
disk_buffer.write(chunk[index].as_bytes())
.and_then(|_| disk_buffer.write(b" "))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
index += 1;
}
disk_buffer.write(chunk[max_index].as_bytes())
.and_then(|_| disk_buffer.write(b"\n"))
.map_err(|why| FileErr::Write(unprocessed_path.clone(), why))?;
}
}
Ok(number_of_arguments)
}
fn parse_inputs(arguments: &[String], mut index: usize, current_inputs: &mut Vec<String>,
lists: &mut Vec<Vec<String>>, mode: &mut Mode, inputs_are_commands: bool) -> Result<(), ParseErr>
{
let mut append_list = &mut Vec::new();
macro_rules! switch_mode {
($mode:expr) => {{
match *mode {
Mode::InputsAppend | Mode::FilesAppend => merge_lists(current_inputs, append_list),
_ => (),
}
*mode = $mode;
if !current_inputs.is_empty() {
lists.push(current_inputs.clone());
current_inputs.clear();
}
}};
(append $mode:expr) => {{
match *mode {
Mode::InputsAppend | Mode::FilesAppend => merge_lists(current_inputs, append_list),
_ => (),
}
*mode = $mode;
}};
}
while let Some(argument) = arguments.get(index) {
index += 1;
match argument.as_str() {
":::" => switch_mode!(Mode::Inputs),
":::+" => switch_mode!(append Mode::InputsAppend),
"::::" => switch_mode!(Mode::Files),
"::::+" => switch_mode!(append Mode::FilesAppend),
_ => match *mode {
Mode::Inputs if inputs_are_commands => current_inputs.push(quote_command(&argument)),
Mode::InputsAppend if inputs_are_commands => append_list.push(quote_command(&argument)),
Mode::Inputs => current_inputs.push(argument.clone()),
Mode::InputsAppend => append_list.push(argument.clone()),
Mode::Files => file_parse(current_inputs, argument, inputs_are_commands)?,
Mode::FilesAppend => file_parse(append_list, argument, inputs_are_commands)?,
_ => unreachable!()
}
}
}
if !append_list.is_empty() {
match *mode {
Mode::InputsAppend | Mode::FilesAppend => merge_lists(current_inputs, append_list),
_ => (),
}
}
if !current_inputs.is_empty() {
lists.push(current_inputs.clone());
}
Ok(())
}
fn parse_max_args(argument: &str, next_argument: Option<&String>,index: &mut usize) -> Result<usize, ParseErr> {
if argument.len() > 2 {
Ok(argument[2..].parse::<usize>().map_err(|_| ParseErr::MaxArgsNaN(*index))?)
} else {
*index += 1;
let argument = next_argument.ok_or(ParseErr::MaxArgsNoValue)?;
Ok(argument.parse::<usize>().map_err(|_| ParseErr::MaxArgsNaN(*index))?)
}
}
fn merge_lists(original: &mut Vec<String>, append: &mut Vec<String>) {
if original.len() > append.len() {
original.truncate(append.len());
}
for (input, element) in original.iter_mut().zip(append.drain(..)) {
input.push(' ');
input.push_str(&element);
}
}
fn parse_memory(input: &str) -> Result<u64, ParseIntError> {
let result = match input.bytes().last().unwrap() {
b'k' => &input[..input.len()-1].parse::<u64>()? * 1_000,
b'K' => &input[..input.len()-1].parse::<u64>()? * 1_024,
b'm' => &input[..input.len()-1].parse::<u64>()? * 1_000_000,
b'M' => &input[..input.len()-1].parse::<u64>()? * 1_048_576,
b'g' => &input[..input.len()-1].parse::<u64>()? * 1_000_000_000,
b'G' => &input[..input.len()-1].parse::<u64>()? * 1_073_741_824,
b't' => &input[..input.len()-1].parse::<u64>()? * 1_000_000_000_000,
b'T' => &input[..input.len()-1].parse::<u64>()? * 1_099_511_627_776,
b'p' => &input[..input.len()-1].parse::<u64>()? * 1_000_000_000_000_000,
b'P' => &input[..input.len()-1].parse::<u64>()? * 1_125_899_906_842_624,
_ => input.parse::<u64>()?
};
Ok(result)
}
fn parse_jobs(argument: &str, next_argument: Option<&String>, index: &mut usize) -> Result<usize, ParseErr> {
let ncores = if argument.len() > 2 {
jobs::parse(&argument[2..])?
} else {
*index += 1;
jobs::parse(next_argument.ok_or(ParseErr::JobsNoValue)?)?
};
Ok(ncores)
}
fn file_parse<P: AsRef<Path>>(inputs: &mut Vec<String>, path: P, inputs_are_commands: bool)
-> Result<(), ParseErr>
{
let path = path.as_ref();
let file = fs::File::open(path).map_err(|err| ParseErr::File(FileErr::Open(path.to_owned(), err)))?;
for line in BufReader::new(file).lines() {
if let Ok(line) = line {
if !line.is_empty() && !line.starts_with("#") {
if inputs_are_commands {
inputs.push(quote_command(&line));
} else {
inputs.push(line);
}
}
}
}
Ok(())
}