#![deny(dead_code)]
#![deny(unused_imports)]
#![feature(alloc_system)]
extern crate alloc_system;
extern crate arrayvec;
extern crate num_cpus;
extern crate permutate;
mod arguments;
mod command;
mod disk_buffer;
mod filepaths;
mod init;
mod input_iterator;
mod threads;
mod tokenizer;
mod verbose;
use std::fs::File;
use std::io::{self, BufRead, BufReader, Write};
use std::mem;
use std::process::exit;
use std::thread::{self, JoinHandle};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
use arguments::Args;
use threads::pipe::State;
use tokenizer::{TokenErr, tokenize};
unsafe fn leak_command(comm: String) -> &'static str {
let static_comm = mem::transmute(&comm as &str);
mem::forget(comm);
static_comm
}
fn main() {
let stdout = io::stdout();
let stderr = io::stderr();
let (unprocessed_path, processed_path, errors_path) = init::cleanup(&mut stderr.lock());
let mut args = Args::new();
let mut comm = String::with_capacity(128);
let inputs = init::parse(&mut args, &mut comm, &unprocessed_path);
let comm = unsafe { leak_command(comm) };
if let Err(error) = tokenize(&mut args.arguments, comm, &unprocessed_path, args.ninputs) {
let mut stderr = stderr.lock();
match error {
TokenErr::File(why) => {
let _ = write!(stderr, "unable to obtain Nth input: {}\n", why);
},
TokenErr::OutOfBounds => {
let _ = write!(stderr, "input token out of bounds\n");
}
}
exit(1)
}
let shared_input = Arc::new(Mutex::new(inputs));
let (output_tx, input_rx) = channel::<State>();
let mut threads = Vec::with_capacity(args.ncores);
if args.flags & arguments::VERBOSE_MODE != 0 {
verbose::total_inputs(&stdout, args.ncores, args.ninputs);
}
if args.flags & arguments::INPUTS_ARE_COMMANDS != 0 {
for _ in 0..args.ncores {
let num_inputs = args.ninputs;
let inputs = shared_input.clone();
let flags = args.flags;
let output_tx = output_tx.clone();
let handle: JoinHandle<()> = thread::spawn(move || {
threads::execute::inputs(num_inputs, flags, inputs, output_tx)
});
threads.push(handle);
}
} else {
for slot in 1..args.ncores+1 {
let num_inputs = args.ninputs;
let inputs = shared_input.clone();
let flags = args.flags;
let output_tx = output_tx.clone();
let arguments = args.arguments.clone();
let handle: JoinHandle<()> = thread::spawn(move || {
threads::execute::command(slot, num_inputs, flags, &arguments, inputs, output_tx)
});
threads.push(handle);
}
}
threads::receive_messages(input_rx, args, &processed_path, &errors_path);
for thread in threads { thread.join().unwrap(); }
if let Ok(file) = File::open(errors_path) {
if file.metadata().ok().map_or(0, |metadata| metadata.len()) > 0 {
let stderr = &mut stderr.lock();
let _ = stderr.write(b"parallel: encountered errors during processing:\n");
for line in BufReader::new(file).lines() {
if let Ok(line) = line {
let _ = stderr.write(line.as_bytes());
let _ = stderr.write(b"\n");
}
}
exit(1);
}
}
}