use std::num::NonZeroUsize;
use bstr::ByteSlice;
use simd_csv::ByteRecord;
use crate::cmd::parallel::Args as ParallelArgs;
use crate::collections::{ClusteredInsertHashmap, Counter};
use crate::config::{Config, Delimiter};
use crate::select::SelectedColumns;
use crate::util;
use crate::CliResult;
type GroupKey = ByteRecord;
type ValueKey = Vec<u8>;
static USAGE: &str = "
Compute a frequency table on CSV data.
The resulting frequency table will look like this:
field - Name of the column
value - Some distinct value of the column
count - Number of rows containing this value
Pipe into `xan hist` to easily visualize the result:
$ xan freq -s category data.csv | xan hist
By default, there is a row for the N most frequent values for each field in the
data. The number of returned values can be tweaked with -l/--limit or you can
disable the limit altogether using the -A/--all flag.
Since this computes an exact frequency table, memory proportional to the
cardinality of each selected column is required. If you expect this will overflow
your memory, you can compute an approximate top-k using the -a, --approx flag.
To compute custom aggregations per group, beyond just counting, please be sure to
check the `xan groupby` command instead.
Frequency tables can be computed in parallel using the -p/--parallel or -t/--threads
flags. This cannot work on streams or gzipped files, unless a `.gzi` index (as
created by `bgzip -i`) can be found beside it. Parallelization is not compatible
with the -g/--groupby option.
Usage:
xan frequency [options] [<input>]
xan freq [options] [<input>]
frequency options:
-s, --select <arg> Select a subset of columns to compute frequencies
for. See 'xan select --help' for the selection language
details.
--sep <char> Split the cell into multiple values to count using the
provided separator.
-g, --groupby <cols> If given, will compute frequency tables per group
as defined by the given columns.
-A, --all Remove the limit.
-l, --limit <arg> Limit the frequency table to the N most common
items. Use -A, -all or set to 0 to disable the limit.
[default: 10]
-a, --approx If set, return the items most likely having the top counts,
as per given --limit. Won't work if --limit is 0 or
with -A, --all. Accuracy of results increases with the
given limit.
-N, --no-extra Don't include empty cells & remaining counts.
-p, --parallel Whether to use parallelization to speed up computation.
Will automatically select a suitable number of threads to use
based on your number of cores. Use -t, --threads if you want to
indicate the number of threads yourself.
-t, --threads <threads> Parellize computations using this many threads. Use -p, --parallel
if you want the number of threads to be automatically chosen instead.
Hidden options:
--no-limit-we-reach-for-the-sky Nothing to see here...
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-n, --no-headers When set, the first row will NOT be included
in the frequency table. Additionally, the 'field'
column will be 0-based indices instead of header
names.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character.
";
#[derive(Clone, Deserialize)]
struct Args {
arg_input: Option<String>,
flag_select: SelectedColumns,
flag_sep: Option<String>,
flag_all: bool,
flag_limit: usize,
flag_approx: bool,
flag_no_extra: bool,
flag_output: Option<String>,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_parallel: bool,
flag_threads: Option<NonZeroUsize>,
flag_groupby: Option<SelectedColumns>,
flag_no_limit_we_reach_for_the_sky: bool,
}
impl Args {
fn resolve(&mut self) {
if self.flag_all {
self.flag_limit = 0;
}
}
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let mut args: Args = util::get_args(USAGE, argv)?;
if args.flag_parallel || args.flag_threads.is_some() {
if args.flag_groupby.is_some() {
Err("-p/--parallel or -t/--threads cannot be used with -g/--groupby!")?;
}
let mut parallel_args = ParallelArgs::single_file(&args.arg_input, args.flag_threads)?;
parallel_args.cmd_freq = true;
parallel_args.flag_select = args.flag_select;
parallel_args.flag_sep = args.flag_sep;
parallel_args.flag_all = args.flag_all;
parallel_args.flag_limit = args.flag_limit;
parallel_args.flag_approx = args.flag_approx;
parallel_args.flag_no_extra = args.flag_no_extra;
parallel_args.flag_no_headers = args.flag_no_headers;
parallel_args.flag_output = args.flag_output;
parallel_args.flag_delimiter = args.flag_delimiter;
return parallel_args.run();
}
args.resolve();
if args.flag_approx && args.flag_limit == 0 {
Err("-a, --approx cannot work with --limit=0 or -A, --all!")?;
}
if args.flag_no_limit_we_reach_for_the_sky {
opener::open_browser("https://www.youtube.com/watch?v=7kmEEkECFQw")
.expect("could not easter egg");
return Ok(());
}
let approx_k = if args.flag_approx {
Some(args.flag_limit)
} else {
None
};
let rconf = Config::new(&args.arg_input)
.delimiter(args.flag_delimiter)
.no_headers(args.flag_no_headers)
.select(args.flag_select);
if let Some(sep) = args.flag_sep.as_ref() {
if sep.as_bytes() == [rconf.quote] {
Err("--sep cannot be the CSV quote character!")?;
}
}
let mut rdr = rconf.simd_zero_copy_reader()?;
let mut wtr = Config::new(&args.flag_output).simd_writer()?;
let headers = rdr.byte_headers()?.clone();
let mut sel = rconf.selection(&headers)?;
let groupby_sel_opt = args
.flag_groupby
.map(|cols| cols.selection(&headers, !rconf.no_headers))
.transpose()?;
if let Some(gsel) = &groupby_sel_opt {
sel.subtract(gsel);
}
if sel.is_empty() {
return Ok(());
}
let field_names: ByteRecord = if args.flag_no_headers {
sel.iter()
.map(|i| i.to_string().as_bytes().to_vec())
.collect()
} else {
sel.select(&headers).map(|h| h.to_vec()).collect()
};
fn coerce_cell(cell: &[u8], no_extra: bool) -> Option<&[u8]> {
if !no_extra {
if cell.is_empty() {
Some(b"<empty>")
} else {
Some(cell)
}
} else if cell.is_empty() {
None
} else {
Some(cell)
}
}
if let Some(groupby_sel) = groupby_sel_opt {
let mut groups_to_fields_to_counter: ClusteredInsertHashmap<
GroupKey,
Vec<Counter<ValueKey>>,
> = ClusteredInsertHashmap::new();
let output_headers = {
let mut r = ByteRecord::new();
r.push_field(b"field");
for col_name in groupby_sel.select(&headers) {
r.push_field(col_name);
}
r.push_field(b"value");
r.push_field(b"count");
r
};
wtr.write_byte_record(&output_headers)?;
let mut output_record = ByteRecord::new();
while let Some(record) = rdr.read_byte_record()? {
let group: ByteRecord = groupby_sel
.iter()
.map(|i| record.unescape(*i).unwrap().into_owned())
.collect();
let fields_to_counter = groups_to_fields_to_counter.insert_with(group, || {
let mut list = Vec::with_capacity(sel.len());
for _ in 0..sel.len() {
list.push(Counter::new(approx_k));
}
list
});
for (i, cell) in sel.iter().map(|i| record.unquote(*i).unwrap()).enumerate() {
if let Some(sep) = &args.flag_sep {
for sub_cell in cell.split_str(sep) {
let sub_cell = match coerce_cell(sub_cell, args.flag_no_extra) {
Some(c) => c,
None => continue,
};
fields_to_counter[i].add(sub_cell.to_vec());
}
} else {
let cell = match coerce_cell(cell, args.flag_no_extra) {
Some(c) => c,
None => continue,
};
fields_to_counter[i].add(cell.to_vec());
}
}
}
for name in field_names.into_iter().rev() {
for (group, counters) in groups_to_fields_to_counter.iter_mut() {
let counter = counters.pop().unwrap();
let (total, items) = counter.into_total_and_items(
if args.flag_limit == 0 {
None
} else {
Some(args.flag_limit)
},
false,
);
let mut emitted: u64 = 0;
for (value, count) in items {
emitted += count;
output_record.clear();
output_record.push_field(name);
for cell in group {
output_record.push_field(cell);
}
output_record.push_field(&simd_csv::unescape(&value, rconf.quote));
output_record.push_field(count.to_string().as_bytes());
wtr.write_byte_record(&output_record)?;
}
let remaining = total - emitted;
if !args.flag_no_extra && remaining > 0 {
output_record.clear();
output_record.push_field(name);
for cell in group {
output_record.push_field(cell);
}
output_record.push_field(b"<rest>");
output_record.push_field(remaining.to_string().as_bytes());
wtr.write_byte_record(&output_record)?;
}
}
}
} else {
let mut fields: Vec<Counter<ValueKey>> =
(0..sel.len()).map(|_| Counter::new(approx_k)).collect();
let output_headers = {
let mut r = ByteRecord::new();
r.push_field(b"field");
r.push_field(b"value");
r.push_field(b"count");
r
};
wtr.write_byte_record(&output_headers)?;
let mut output_record = ByteRecord::new();
while let Some(record) = rdr.read_byte_record()? {
for (cell, counter) in sel
.iter()
.map(|i| record.unquote(*i).unwrap())
.zip(fields.iter_mut())
{
if let Some(sep) = &args.flag_sep {
for sub_cell in cell.split_str(sep) {
let sub_cell = match coerce_cell(sub_cell, args.flag_no_extra) {
Some(c) => c,
None => continue,
};
counter.add(sub_cell.to_vec());
}
} else {
let cell = match coerce_cell(cell, args.flag_no_extra) {
Some(c) => c,
None => continue,
};
counter.add(cell.to_vec());
}
}
}
for (name, counter) in field_names.into_iter().zip(fields.into_iter()) {
let (total, items) = counter.into_total_and_items(
if args.flag_limit == 0 {
None
} else {
Some(args.flag_limit)
},
false,
);
let mut emitted: u64 = 0;
for (value, count) in items {
emitted += count;
output_record.clear();
output_record.push_field(name);
output_record.push_field(&simd_csv::unescape(&value, rconf.quote));
output_record.push_field(count.to_string().as_bytes());
wtr.write_byte_record(&output_record)?;
}
let remaining = total - emitted;
if !args.flag_no_extra && remaining > 0 {
output_record.clear();
output_record.push_field(name);
output_record.push_field(b"<rest>");
output_record.push_field(remaining.to_string().as_bytes());
wtr.write_byte_record(&output_record)?;
}
}
}
Ok(wtr.flush()?)
}