use std::io;
use std::iter;
use std::str;
use dsrs::counters::{Counter, HeavyHitter, KeyedCounter, KeyedMerger, Merger};
use dsrs::stream_reducer::reduce_stream;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(name = "dsrs", about = "Approximate count distinct lines.")]
struct Opt {
#[structopt(long)]
key: bool,
#[structopt(long)]
raw: bool,
#[structopt(long)]
merge: bool,
#[structopt(long)]
hh: Option<u64>,
}
fn main() {
let opt = Opt::from_args();
if let Some(k) = opt.hh {
assert!(!opt.key, "--key and --hh cannot be set simultaneously");
assert!(!opt.raw, "--raw and --hh cannot be set simultaneously");
assert!(!opt.merge, "--merge and --hh cannot be set simultaneously");
if k == 0 {
return
}
let reduced =
reduce_stream(io::stdin().lock(), HeavyHitter::new(k)).expect("no io error");
for (line, count) in reduced.estimate() {
println!("{} {}", count, str::from_utf8(line).expect("valid UTF-8"));
}
return
}
match (opt.key, opt.merge) {
(true, false) => {
let reduced =
reduce_stream(io::stdin().lock(), KeyedCounter::default()).expect("no io error");
print_dict(reduced.state(), opt.raw)
}
(false, false) => {
let reduced =
reduce_stream(io::stdin().lock(), Counter::default()).expect("no io error");
print_single(&reduced, opt.raw);
}
(true, true) => {
let reduced =
reduce_stream(io::stdin().lock(), KeyedMerger::default()).expect("no io error");
for (key, ctr) in reduced.state() {
print_dict(iter::once((key, &ctr)), opt.raw)
}
}
(false, true) => {
let reduced =
reduce_stream(io::stdin().lock(), Merger::default()).expect("no io error");
print_single(&reduced.counter(), opt.raw)
}
}
}
fn print_dict<'a>(it: impl Iterator<Item = (&'a [u8], &'a Counter)>, raw: bool) {
for (key, ctr) in it {
let as_str = str::from_utf8(key).expect("valid UTF-8");
print!("{} ", as_str);
print_single(ctr, raw);
}
}
fn print_single(c: &Counter, raw: bool) {
if raw {
println!("{}", c.serialize());
} else {
println!("{}", c.estimate().round());
}
}
#[cfg(test)]
mod tests {
use std::process;
use std::str;
use assert_cmd;
use itertools::Itertools;
fn sort_lines(stdout: Vec<u8>) -> Vec<u8> {
let mut lines: Vec<_> = stdout
.split(|c| *c == b'\n')
.map(|s| s.to_owned())
.collect();
lines.sort_unstable();
lines.rotate_left(1); lines.join(&b'\n')
}
fn communicate(stdin: Vec<u8>, dsrs_flags: &[&str]) -> Vec<u8> {
let out = assert_cmd::Command::cargo_bin(env!("CARGO_PKG_NAME"))
.expect("command created")
.args(dsrs_flags)
.write_stdin(stdin)
.assert()
.success()
.get_output()
.clone();
assert!(out.stderr.is_empty(), "stderr {}",
str::from_utf8(&out.stderr).expect("valid UTF-8"));
out
.stdout
}
fn eval_bash(cmd: &str) -> Vec<u8> {
let out = process::Command::new("/bin/bash")
.arg("-c")
.arg(cmd)
.output()
.expect("datagen process successful");
assert!(out.stderr.is_empty(), "{}",
str::from_utf8(&out.stderr).unwrap());
out.stdout
}
fn validate_equal(datagen: &str, keyed: bool, unix: &str) {
let ref args = if keyed { vec!["--key"] } else { vec![] };
validate_equal_cmd(datagen, args, unix);
let stdin = eval_bash(datagen);
let dsrs_stdout = communicate(stdin.clone(), args);
let dsrs_stdout = sort_lines(dsrs_stdout);
let groups = stdin
.split(|c| *c == b'\n')
.enumerate()
.into_group_map_by(|(i, _)| i % 3)
.into_iter()
.map(|(_, v)| {
v.into_iter()
.map(|(_, vv)| vv)
.collect::<Vec<_>>()
.join(&b'\n')
})
.collect();
let modulo_stdout = reduce_with_merge(groups, keyed);
assert_eq!(
&modulo_stdout,
&dsrs_stdout,
"\nmodulo:\n{}\ndsrs:\n{}",
str::from_utf8(&modulo_stdout).expect("valid UTF-8"),
str::from_utf8(&dsrs_stdout).expect("valid UTF-8")
);
let nlines = stdin.split(|c| *c == b'\n').count() - 1;
let groups: Vec<_> = stdin
.split(|c| *c == b'\n')
.enumerate()
.into_group_map_by(|(i, _)| (i * 2) / nlines)
.into_iter()
.map(|(_, v)| {
v.into_iter()
.map(|(_, vv)| vv)
.collect::<Vec<_>>()
.join(&b'\n')
})
.collect();
let chunked_stdout = reduce_with_merge(groups, keyed);
assert_eq!(
&chunked_stdout,
&dsrs_stdout,
"\nchunked:\n{}\ndsrs:\n{}",
str::from_utf8(&chunked_stdout).expect("valid UTF-8"),
str::from_utf8(&dsrs_stdout).expect("valid UTF-8")
);
}
fn reduce_with_merge(groups: Vec<Vec<u8>>, keyed: bool) -> Vec<u8> {
let raw: Vec<_> = groups
.into_iter()
.map(|stdin| {
let flags: &[&str] = if keyed {
&["--key", "--raw"]
} else {
&["--raw"]
};
communicate(stdin, flags)
})
.flatten()
.collect();
let flags: &[&str] = if keyed {
&["--key", "--merge"]
} else {
&["--merge"]
};
let stdout = communicate(raw, flags);
sort_lines(stdout)
}
const UNIX_COUNT_DISTINCT: &'static str = "sort --unique | wc -l";
#[test]
fn unique_lines() {
validate_equal("seq 100", false, UNIX_COUNT_DISTINCT)
}
#[test]
fn equally_dup_lines() {
validate_equal("seq 100 && seq 100 && seq 100", false, UNIX_COUNT_DISTINCT)
}
#[test]
fn unequally_dup_lines() {
validate_equal("seq 100 | xargs -L1 seq", false, UNIX_COUNT_DISTINCT)
}
#[test]
fn count_empty() {
validate_equal("echo ; echo ; echo 1", false, UNIX_COUNT_DISTINCT)
}
const UNIX_GROUPBY_COUNT_DISTINCT: &'static str =
"sort --unique | uniq -w1 -c | awk '{print$2\" \"$1}'";
#[test]
fn unique_keyed_lines() {
validate_equal(
"(seq 100 | xargs -L1 echo 1) && \
(seq 50 | xargs -L1 echo 2) && \
(seq 25 | xargs -L1 echo 3)",
true,
UNIX_GROUPBY_COUNT_DISTINCT,
)
}
#[test]
fn keyed_dup_lines() {
validate_equal(
"(seq 100 | xargs -L1 echo 1) && \
(seq 50 | xargs -L1 echo 2) && \
(seq 100 | xargs -L1 echo 1) && \
(seq 50 | xargs -L1 echo 2) && \
(seq 100 | xargs -L1 echo 1) && \
(seq 50 | xargs -L1 echo 2)",
true,
UNIX_GROUPBY_COUNT_DISTINCT,
)
}
#[test]
fn keyed_count_empty() {
validate_equal(
"echo \"1 \"; echo 1 1; echo 1 3",
true,
UNIX_GROUPBY_COUNT_DISTINCT,
)
}
fn validate_equal_cmd(datagen: &str, args: &[&str], unix: &str) {
let stdin = eval_bash(datagen);
let dsrs_stdout = communicate(stdin.clone(), args);
let unix_stdout = eval_bash(&format!("({}) | ({})", datagen, unix));
let dsrs_stdout = sort_lines(dsrs_stdout);
assert_eq!(
&unix_stdout,
&dsrs_stdout,
"\nunix:\n{}\ndsrs:\n{}",
str::from_utf8(&unix_stdout).expect("valid UTF-8"),
str::from_utf8(&dsrs_stdout).expect("valid UTF-8")
);
}
fn unix_hh(k: usize) -> String {
format!("sort | uniq -c | sort -rn | head -{} | sed 's/^ *//' | sort", k)
}
fn validate_unix_hh(datagen: &str, k: usize) {
let unix = unix_hh(k);
let kstr = format!("{}", k);
let dsrs = &["--hh", &kstr];
validate_equal_cmd(datagen, dsrs, &unix);
}
#[test]
fn hh_unique_lines() {
validate_unix_hh("seq 100", 100);
}
#[test]
fn hh_equally_dup_lines() {
validate_unix_hh("seq 1000 | sed 's/$/\\n1\\n2\\n3/'", 3);
}
#[test]
fn hh_count_empty() {
validate_unix_hh("echo ; echo ; echo 1", 1)
}
}