use std::fs;
use pariter::IteratorExt;
use crate::config::{Config, Delimiter};
use crate::moonblade::{DynamicValue, Program};
use crate::select::SelectedColumns;
use crate::util;
use crate::CliResult;
static USAGE: &str = r#"
The transform command can be used to edit a selection of columns for each row
of a CSV file using a custom expression.
For instance, given the following CSV file:
name,surname
john,davis
mary,sue
The following command (notice how `_` is used as a reference to the currently
edited column):
$ xan transform surname 'upper(_)'
Will produce the following result:
name,surname
john,DAVIS
mary,SUE
When using unary functions, the above command can be written even shorter:
$ xan transform surname upper
The above example work on a single column but the command is perfectly able to
transform multiple columns at once using a selection:
$ xan transform name,surname,fullname upper
The expression can optionally be read from a file using the -f/--evaluate-file flag:
$ xan transform name -f expr.moonblade file.csv > result.csv
For a quick review of the capabilities of the expression language,
check out the `xan help cheatsheet` command.
For a list of available functions, use `xan help functions`.
Usage:
xan transform [options] <column> <expression> [<input>]
xan transform --help
transform options:
-f, --evaluate-file Read evaluation expression from a file instead.
-r, --rename <name> New name for the transformed column.
-p, --parallel Whether to use parallelization to speed up computations.
Will automatically select a suitable number of threads to use
based on your number of cores. Use -t, --threads if you want to
indicate the number of threads yourself.
-t, --threads <threads> Parellize computations using this many threads. Use -p, --parallel
if you want the number of threads to be automatically chosen instead.
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-n, --no-headers When set, the first row will not be evaled
as headers.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character.
"#;
#[derive(Deserialize)]
struct Args {
arg_column: SelectedColumns,
arg_expression: String,
arg_input: Option<String>,
flag_rename: Option<String>,
flag_output: Option<String>,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_parallel: bool,
flag_threads: Option<usize>,
flag_evaluate_file: bool,
}
impl Args {
fn resolve(&mut self) -> CliResult<()> {
if self.flag_evaluate_file {
self.arg_expression = fs::read_to_string(&self.arg_expression)?;
}
Ok(())
}
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let mut args: Args = util::get_args(USAGE, argv)?;
args.resolve()?;
let rconf = Config::new(&args.arg_input)
.no_headers(args.flag_no_headers)
.delimiter(args.flag_delimiter)
.select(args.arg_column);
let parallelization = match (args.flag_parallel, args.flag_threads) {
(true, None) => Some(None),
(_, Some(count)) => Some(Some(count)),
_ => None,
};
let mut wtr = Config::new(&args.flag_output).simd_writer()?;
let mut rdr = rconf.simd_reader()?;
let headers = rdr.byte_headers()?.clone();
let mut sel = rconf.selection(&headers)?;
sel.dedup();
let mask = sel.indexed_mask(headers.len());
let programs = sel
.iter()
.map(|i| Program::parse(&format!("col({}) | {}", i, &args.arg_expression), &headers))
.collect::<Result<Vec<_>, _>>()?;
if !rconf.no_headers {
let output_headers = if let Some(new_names) = &args.flag_rename {
let renamed = util::str_to_csv_byte_record(new_names);
if renamed.len() != sel.len() {
Err(format!(
"Renamed columns alignment error. Expected {} names and got {}.",
sel.len(),
renamed.len(),
))?;
}
mask.iter()
.zip(headers.iter())
.map(|(o, h)| if let Some(i) = o { &renamed[*i] } else { h })
.collect()
} else {
headers.clone()
};
wtr.write_record(&output_headers)?;
}
if let Some(threads) = parallelization {
for result in rdr.into_byte_records().enumerate().parallel_map_custom(
|o| o.threads(threads.unwrap_or_else(crate::util::default_num_cpus)),
move |(index, record)| -> CliResult<simd_csv::ByteRecord> {
let record = record?;
let mut output_record = simd_csv::ByteRecord::new();
let mut last_value = DynamicValue::empty_bytes();
for (m, cell) in mask.iter().copied().zip(record.iter()) {
if let Some(i) = m {
last_value.set_bytes(cell);
let value = programs[i].run_with_record_and_last_value(
index,
&record,
last_value.clone(),
)?;
output_record.push_field(&value.serialize_as_bytes());
} else {
output_record.push_field(cell);
}
}
Ok(output_record)
},
) {
wtr.write_byte_record(&result?)?;
}
} else {
let mut record = simd_csv::ByteRecord::new();
let mut output_record = simd_csv::ByteRecord::new();
let mut last_value = DynamicValue::empty_bytes();
let mut index: usize = 0;
while rdr.read_byte_record(&mut record)? {
output_record.clear();
for (m, cell) in mask.iter().copied().zip(record.iter()) {
if let Some(i) = m {
last_value.set_bytes(cell);
let value = programs[i].run_with_record_and_last_value(
index,
&record,
last_value.clone(),
)?;
output_record.push_field(&value.serialize_as_bytes());
} else {
output_record.push_field(cell);
}
}
wtr.write_byte_record(&output_record)?;
index += 1;
}
}
Ok(wtr.flush()?)
}