extern crate clap;
extern crate pipelines;
use std::ffi::OsString;
use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::io::stderr;
use std::process::{exit, Command, Stdio};
use std::thread;
use clap::{App, Arg};
use pipelines::{Pipeline, PipelineConfig};
fn main() {
let app = App::new("mr_tools")
.arg(
Arg::with_name("nmappers")
.short("M")
.help("how many copies of the mapper to run")
.takes_value(true)
.default_value("1")
)
.arg(
Arg::with_name("nreducers")
.short("R")
.help("how many copies of the reducer to run")
.takes_value(true)
.default_value("1")
)
.arg(
Arg::with_name("field_sep")
.short("F")
.required(false)
.default_value("\t")
.help("field separator (used to group reducer keys; default is tab)"),
)
.arg(
Arg::with_name("mapper")
.short("m")
.long("map")
.takes_value(true)
.required(true)
.help("command to run as mapper (executed with /bin/sh -c)"),
)
.arg(
Arg::with_name("reducer")
.short("r")
.long("reduce")
.takes_value(true)
.required(true)
.help("command to run as reducer (executed with /bin/sh -c). each reducer also runs its own sorter"),
);
let arg_matches = app.get_matches();
let mut u_field_sep = arg_matches.value_of("field_sep").unwrap_or("\t");
if u_field_sep == "\\t" {
u_field_sep = "\t";
}
let field_sep_bytes = u_field_sep.as_bytes();
if field_sep_bytes.len() > 1 {
stderr()
.write(b"field sep must be one byte")
.expect("failed stderr");
exit(1);
}
let field_sep = field_sep_bytes[0];
let nmappers = arg_matches
.value_of("nmappers")
.unwrap_or("1")
.parse::<usize>()
.expect("bad -M integer");
let nreducers = arg_matches
.value_of("nreducers")
.unwrap_or("1")
.parse::<usize>()
.expect("bad -R integer");
let mapper = arg_matches
.value_of_os("mapper")
.expect("no mapper")
.to_owned();
let reducer = arg_matches
.value_of_os("reducer")
.expect("no reducer")
.to_owned();
let pl = Pipeline::new(|tx| {
let stdin = io::stdin();
let locked = stdin.lock();
for line in locked.split(b'\n') {
let line = line.expect("bad line in generator");
tx.send(line);
}
}).configure(PipelineConfig::default().batch_size(100));
let pl = pl.ppipe(nmappers, move |tx, rx| {
let child = Command::new("/bin/sh")
.arg("-c")
.arg(&mapper)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("failed spawn");
let stdout = child.stdout.expect("no stdout?");
let stdin = child.stdin.expect("no stdin?");
thread::spawn(move || {
let buff = BufReader::new(stdout);
for line in buff.split(b'\n') {
let line = line.expect("no line?");
let key_position = {
line.iter()
.position(|&x| x == field_sep)
.unwrap_or(line.len())
};
let key = line[..key_position].to_owned();
tx.send((key, line));
}
});
let mut buf = BufWriter::new(stdin);
for line in rx {
buf.write(&line).expect("failed mapper write");
buf.write(b"\n").expect("failed mapper write");
}
buf.flush().expect("failed mapper flush");
});
let pl = pl.distribute(nreducers, move |tx, rx| {
let mut reducer_cmd = OsString::new();
reducer_cmd.push("sort | ");
reducer_cmd.push(&reducer);
let child = Command::new("/bin/sh")
.arg("-c")
.arg(reducer_cmd)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.expect("failed spawn");
let stdout = child.stdout.expect("no stdout?");
let stdin = child.stdin.expect("no stdin?");
thread::spawn(move || {
let buff = BufReader::new(stdout);
for line in buff.split(b'\n') {
let line = line.expect("no line?");
tx.send(line);
}
});
let mut buf = BufWriter::new(stdin);
for (_key, line) in rx {
buf.write(&line).expect("failed reducer write");
buf.write(b"\n").expect("failed reducer write");
}
buf.flush().expect("failed reducer flush");
});
let stdout = io::stdout();
let locked = stdout.lock();
let mut buf = BufWriter::new(locked);
for output in pl.into_iter() {
buf.write_all(&output).expect("failed write stdout");
buf.write_all(b"\n").expect("failed newline stdout");
}
buf.flush().expect("failed final flush");
}