#![deny(dead_code)]
#![allow(unknown_lints)]
extern crate arrayvec;
extern crate itoa;
extern crate numtoa;
extern crate num_cpus;
extern crate permutate;
extern crate smallvec;
extern crate sys_info;
extern crate time;
extern crate wait_timeout;
mod arguments;
mod disk_buffer;
mod execute;
mod filepaths;
mod input_iterator;
mod misc;
mod tokenizer;
mod shell;
mod verbose;
use std::env;
use std::fs::{self, create_dir_all, File};
use std::io::{self, BufRead, BufReader, Read, Write};
use std::mem;
use std::process::exit;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
use arguments::Args;
use execute::pipe::disk::State;
use input_iterator::{InputIterator, InputsLock};
use tokenizer::{Token, tokenize};
unsafe fn leak_string(comm: String) -> &'static str {
let new_comm = mem::transmute(&comm as &str);
mem::forget(comm);
new_comm
}
unsafe fn static_arg(args: &[Token]) -> &'static [Token] { mem::transmute(args) }
fn main() {
let stdout = io::stdout();
let stderr = io::stderr();
if cfg!(target_os = "linux") {
if let Ok(mut file) = File::open("/sys/kernel/mm/transparent_hugepage/enabled") {
let mut buffer: [u8; 2] = [0, 0];
if let Ok(_) = file.read_exact(&mut buffer) {
if &buffer == b"[a" {
let _ = writeln!(stderr.lock(), "ion: /sys/kernel/mm/transparent_hugepage/enabled is set to always instead of madvise. This will gravely effect the performance of Parallel.");
}
}
}
}
let mut args = Args::new();
let mut comm = String::with_capacity(128);
let raw_arguments = env::args().collect::<Vec<String>>();
let mut base = match filepaths::base() {
Some(base) => base,
None => {
let mut stderr = stderr.lock();
let _ = stderr.write(b"parallel: unable to open home directory");
exit(1);
}
};
if let Err(why) = create_dir_all(&base) {
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "parallel: unable to create tempdir {:?}: {}", base, why);
exit(1);
}
args.ninputs = match args.parse(&mut comm, &raw_arguments, &mut base) {
Ok(inputs) => inputs,
Err(why) => why.handle(&raw_arguments)
};
let base_path = match base.to_str() {
Some(base) => String::from(base),
None => {
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "parallel: tempdir path, {:?}, is invalid", base);
exit(1);
}
};
let mut unprocessed_path = base.clone();
let mut processed_path = base.clone();
let mut errors_path = base;
unprocessed_path.push("unprocessed");
processed_path.push("processed");
errors_path.push("errors");
let file = match fs::OpenOptions::new().read(true).open(&unprocessed_path) {
Ok(file) => file,
Err(why) => {
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "parallel: unable to open unprocessed file: {:?}: {}", &unprocessed_path, why);
exit(1);
}
};
let inputs = InputIterator::new(&unprocessed_path, file, args.ninputs)
.expect("unable to initialize the InputIterator structure");
let static_comm = unsafe { leak_string(comm) };
if let Err(error) = tokenize(&mut args.arguments, static_comm, &unprocessed_path, args.ninputs) {
let stderr = &mut stderr.lock();
let _ = writeln!(stderr, "{}", error);
exit(1)
}
let arguments = unsafe { static_arg(&args.arguments) };
if args.flags & arguments::DRY_RUN != 0 {
execute::dry_run(args.flags, inputs, arguments);
} else {
let shared_input = Arc::new(Mutex::new(inputs));
let (output_tx, input_rx) = channel::<State>();
let mut threads = Vec::with_capacity(args.ncores);
if args.flags & arguments::VERBOSE_MODE != 0 {
verbose::total_inputs(&stdout, args.ncores, args.ninputs);
}
if args.flags & arguments::INPUTS_ARE_COMMANDS != 0 {
if shell::ion_exists() {
args.flags |= arguments::ION_EXISTS;
} else if shell::dash_exists() {
args.flags |= arguments::DASH_EXISTS;
}
for _ in 0..args.ncores {
let flags = args.flags;
let mut exec = execute::ExecInputs {
num_inputs: args.ninputs,
timeout: args.timeout,
output_tx: output_tx.clone(),
tempdir: base_path.clone(),
inputs: InputsLock {
inputs: shared_input.clone(),
memory: args.memory,
delay: args.delay,
has_delay: args.delay != Duration::from_millis(0),
completed: false,
flags: flags,
}
};
let handle: JoinHandle<()> = thread::spawn(move || exec.run(flags));
threads.push(handle);
}
} else {
shell::set_flags(&mut args.flags, arguments);
for slot in 1..args.ncores+1 {
let timeout = args.timeout;
let num_inputs = args.ninputs;
let output_tx = output_tx.clone();
let flags = args.flags;
let base_path = base_path.clone();
let inputs = InputsLock {
inputs: shared_input.clone(),
memory: args.memory,
delay: args.delay,
has_delay: args.delay != Duration::from_millis(0),
completed: false,
flags: flags,
};
let handle: JoinHandle<()> = thread::spawn(move || {
let mut exec = execute::ExecCommands {
slot: slot,
num_inputs: num_inputs,
flags: flags,
timeout: timeout,
inputs: inputs,
output_tx: output_tx,
arguments: arguments,
tempdir: base_path,
};
exec.run();
});
threads.push(handle);
}
}
let errors = execute::receive_messages(input_rx, args, &base_path, &processed_path, &errors_path);
for thread in threads { thread.join().unwrap(); }
if let Ok(file) = File::open(errors_path) {
if file.metadata().ok().map_or(0, |metadata| metadata.len()) > 0 {
let stderr = &mut stderr.lock();
let _ = stderr.write(b"parallel: encountered errors during processing:\n");
for line in BufReader::new(file).lines() {
if let Ok(line) = line {
let _ = stderr.write(line.as_bytes());
let _ = stderr.write(b"\n");
}
}
exit(errors);
}
}
}
}