use std::fs;
use std::io;
use std::path::Path;
use regex::Regex;
use crate::collections::{hash_map::Entry, HashMap, HashSet};
use crate::config::{Config, Delimiter};
use crate::record::Record;
use crate::select::SelectedColumns;
use crate::util::{self, FilenameTemplate};
use crate::CliResult;
static USAGE: &str = "
Partition the given CSV data into chunks based on the values of a column.
The files are written to the output directory with filenames based on the
values in the partition column and the `--filename` flag.
By default, this command will consider it works in a case-insensitive filesystem
(e.g. on macOS). This can have an impact on the names of the create files. If you
know beforehand that your filesystem is case-sensitive and want filenames to be
better aligned with the original values use the -C/--case-sensitive flag.
Note that most operating systems avoid opening more than 1024 files at once,
so if you know the cardinality of the partitioned column is very high, please
sort the file on this column beforehand and use the -S/--sorted flag.
Usage:
xan partition [options] <column> [<input>]
xan partition --help
partition options:
-O, --out-dir <dir> Where to write the chunks. Defaults to current working
directory.
-f, --filename <filename> A filename template to use when constructing
the names of the output files. The string '{}'
will be replaced by a value based on the value
of the field, but sanitized for shell safety.
[default: {}.csv]
-p, --prefix-length <n> Truncate the partition column after the
specified number of bytes when creating the
output file.
-S, --sorted Use this flag if you know the file is sorted
on the partition column in advance, so the command
can run faster and with less memory and resources
opened.
--drop Drop the partition column from results.
-C, --case-sensitive Don't perform case normalization to assess whether a
new file has to be created when seeing a new value.
Only use on case-sensitive filesystems or this can have
adverse effects!
Common options:
-h, --help Display this message
-n, --no-headers When set, the first row will NOT be interpreted
as column names. Otherwise, the first row will
appear in all chunks as the header row.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character.
";
#[derive(Clone, Deserialize)]
struct Args {
arg_column: SelectedColumns,
arg_input: Option<String>,
flag_out_dir: Option<String>,
flag_filename: FilenameTemplate,
flag_prefix_length: Option<usize>,
flag_drop: bool,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_sorted: bool,
flag_case_sensitive: bool,
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
if let Some(dir) = &args.flag_out_dir {
fs::create_dir_all(dir)?;
}
args.sequential_partition()
}
impl Args {
fn rconfig(&self) -> Config {
Config::new(&self.arg_input)
.delimiter(self.flag_delimiter)
.no_headers(self.flag_no_headers)
.select(self.arg_column.clone())
}
fn key_column(&self, rconfig: &Config, headers: &simd_csv::ByteRecord) -> CliResult<usize> {
let select_cols = rconfig.selection(headers)?;
if select_cols.len() == 1 {
Ok(select_cols[0])
} else {
Err("can only partition on one column")?
}
}
fn sequential_partition(&self) -> CliResult<()> {
let rconfig = self.rconfig();
let mut rdr = rconfig.simd_reader()?;
let mut headers = rdr.byte_headers()?.clone();
let key_col = self.key_column(&rconfig, &headers)?;
let mut generator =
WriterGenerator::new(self.flag_filename.clone(), self.flag_case_sensitive);
let out_dir = match &self.flag_out_dir {
Some(dir) => Path::new(dir),
None => Path::new(""),
};
if self.flag_drop {
headers = headers.remove(key_col);
}
let mut row = simd_csv::ByteRecord::new();
if self.flag_sorted {
let mut current: Option<(Vec<u8>, BoxedWriter)> = None;
while rdr.read_byte_record(&mut row)? {
let column = &row[key_col];
let key = match self.flag_prefix_length {
Some(len) if len < column.len() => &column[0..len],
_ => column,
};
match current {
Some((ref k, _)) if k == key => {}
_ => {
let mut wtr = generator.writer(out_dir, key)?;
if !rconfig.no_headers {
wtr.write_record(&headers)?;
}
current = Some((key.to_vec(), wtr));
}
};
let wtr = &mut current.as_mut().unwrap().1;
if self.flag_drop {
wtr.write_record(&row.remove(key_col))?;
} else {
wtr.write_byte_record(&row)?;
}
}
} else {
let mut writers: HashMap<Vec<u8>, BoxedWriter> = HashMap::new();
while rdr.read_byte_record(&mut row)? {
let column = &row[key_col];
let key = match self.flag_prefix_length {
Some(len) if len < column.len() => &column[0..len],
_ => column,
};
let mut entry = writers.entry(key.to_vec());
let wtr = match entry {
Entry::Occupied(ref mut occupied) => occupied.get_mut(),
Entry::Vacant(vacant) => {
let mut wtr = generator.writer(out_dir, key)?;
if !rconfig.no_headers {
wtr.write_record(&headers)?;
}
vacant.insert(wtr)
}
};
if self.flag_drop {
wtr.write_record(&row.remove(key_col))?;
} else {
wtr.write_byte_record(&row)?;
}
}
}
Ok(())
}
}
type BoxedWriter = simd_csv::Writer<Box<dyn io::Write + 'static>>;
struct WriterGenerator {
template: FilenameTemplate,
counter: usize,
used: HashSet<String>,
non_word_char: Regex,
case_sensitive: bool,
}
impl WriterGenerator {
fn new(template: FilenameTemplate, case_sensitive: bool) -> WriterGenerator {
WriterGenerator {
template,
counter: 1,
used: HashSet::new(),
non_word_char: Regex::new(r"[^\w_.\-]").unwrap(),
case_sensitive,
}
}
fn writer<P>(&mut self, path: P, key: &[u8]) -> io::Result<BoxedWriter>
where
P: AsRef<Path>,
{
let unique_value = self.unique_value(key);
self.template.writer(path.as_ref(), &unique_value)
}
fn unique_value(&mut self, key: &[u8]) -> String {
let utf8 = String::from_utf8_lossy(key);
let mut safe = self.non_word_char.replace_all(&utf8, "").into_owned();
safe = if safe.is_empty() {
"empty".to_owned()
} else {
safe
};
let mut base = safe.clone();
if !self.case_sensitive {
base = base.to_lowercase();
}
if !self.used.contains(&base) {
self.used.insert(base.clone());
safe
} else {
loop {
let candidate = format!("{}_{}", &safe, self.counter);
self.counter = self.counter.checked_add(1).unwrap_or_else(|| {
panic!("Cannot generate unique value")
});
if !self.used.contains(&candidate) {
self.used.insert(candidate.clone());
return candidate;
}
}
}
}
}