use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::io;
use std::path::Path;
use csv;
use regex::Regex;
use config::{Config, Delimiter};
use select::SelectColumns;
use util::{self, FilenameTemplate, ImmutableRecordHelpers};
use CliResult;
static USAGE: &str = "
Partitions the given CSV data into chunks based on the value of a column
The files are written to the output directory with filenames based on the
values in the partition column and the `--filename` flag.
Usage:
xan partition [options] <column> <outdir> [<input>]
xan partition --help
partition options:
--filename <filename> A filename template to use when constructing
the names of the output files. The string '{}'
will be replaced by a value based on the value
of the field, but sanitized for shell safety.
[default: {}.csv]
-p, --prefix-length <n> Truncate the partition column after the
specified number of bytes when creating the
output file.
-S, --sorted Use this flag if you know the file is sorted
on the partition column in advance, so the command
can run faster and with less memory and resources
opened.
--drop Drop the partition column from results.
Common options:
-h, --help Display this message
-n, --no-headers When set, the first row will NOT be interpreted
as column names. Otherwise, the first row will
appear in all chunks as the header row.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character. [default: ,]
";
#[derive(Clone, Deserialize)]
struct Args {
arg_column: SelectColumns,
arg_input: Option<String>,
arg_outdir: String,
flag_filename: FilenameTemplate,
flag_prefix_length: Option<usize>,
flag_drop: bool,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_sorted: bool,
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
fs::create_dir_all(&args.arg_outdir)?;
args.sequential_partition()
}
impl Args {
fn rconfig(&self) -> Config {
Config::new(&self.arg_input)
.delimiter(self.flag_delimiter)
.no_headers(self.flag_no_headers)
.select(self.arg_column.clone())
}
fn key_column(&self, rconfig: &Config, headers: &csv::ByteRecord) -> CliResult<usize> {
let select_cols = rconfig.selection(headers)?;
if select_cols.len() == 1 {
Ok(select_cols[0])
} else {
fail!("can only partition on one column")
}
}
fn sequential_partition(&self) -> CliResult<()> {
let rconfig = self.rconfig();
let mut rdr = rconfig.reader()?;
let mut headers = rdr.byte_headers()?.clone();
let key_col = self.key_column(&rconfig, &headers)?;
let mut gen = WriterGenerator::new(self.flag_filename.clone());
if self.flag_drop {
headers = headers.remove(key_col);
}
let mut row = csv::ByteRecord::new();
if self.flag_sorted {
let mut current: Option<(Vec<u8>, BoxedWriter)> = None;
while rdr.read_byte_record(&mut row)? {
let column = &row[key_col];
let key = match self.flag_prefix_length {
Some(len) if len < column.len() => &column[0..len],
_ => column,
};
match current {
Some((ref k, _)) if k == key => {}
_ => {
let mut wtr = gen.writer(&self.arg_outdir, key)?;
if !rconfig.no_headers {
wtr.write_record(&headers)?;
}
current = Some((key.to_vec(), wtr));
}
};
let wtr = &mut current.as_mut().unwrap().1;
if self.flag_drop {
wtr.write_record(&row.remove(key_col))?;
} else {
wtr.write_byte_record(&row)?;
}
}
} else {
let mut writers: HashMap<Vec<u8>, BoxedWriter> = HashMap::new();
while rdr.read_byte_record(&mut row)? {
let column = &row[key_col];
let key = match self.flag_prefix_length {
Some(len) if len < column.len() => &column[0..len],
_ => column,
};
let mut entry = writers.entry(key.to_vec());
let wtr = match entry {
Entry::Occupied(ref mut occupied) => occupied.get_mut(),
Entry::Vacant(vacant) => {
let mut wtr = gen.writer(&self.arg_outdir, key)?;
if !rconfig.no_headers {
wtr.write_record(&headers)?;
}
vacant.insert(wtr)
}
};
if self.flag_drop {
wtr.write_record(&row.remove(key_col))?;
} else {
wtr.write_byte_record(&row)?;
}
}
}
Ok(())
}
}
type BoxedWriter = csv::Writer<Box<dyn io::Write + 'static>>;
struct WriterGenerator {
template: FilenameTemplate,
counter: usize,
used: HashSet<String>,
non_word_char: Regex,
}
impl WriterGenerator {
fn new(template: FilenameTemplate) -> WriterGenerator {
WriterGenerator {
template,
counter: 1,
used: HashSet::new(),
non_word_char: Regex::new(r"\W").unwrap(),
}
}
fn writer<P>(&mut self, path: P, key: &[u8]) -> io::Result<BoxedWriter>
where
P: AsRef<Path>,
{
let unique_value = self.unique_value(key);
self.template.writer(path.as_ref(), &unique_value)
}
fn unique_value(&mut self, key: &[u8]) -> String {
let utf8 = String::from_utf8_lossy(key);
let safe = self.non_word_char.replace_all(&utf8, "").into_owned();
let base = if safe.is_empty() {
"empty".to_owned()
} else {
safe
};
if !self.used.contains(&base) {
self.used.insert(base.clone());
base
} else {
loop {
let candidate = format!("{}_{}", &base, self.counter);
self.counter = self.counter.checked_add(1).unwrap_or_else(|| {
panic!("Cannot generate unique value")
});
if !self.used.contains(&candidate) {
self.used.insert(candidate.clone());
return candidate;
}
}
}
}
}