use std::io::{stdout, Write};
use std::str;
use jiff::Unit;
use simd_csv::ByteRecord;
use crate::collections::ClusteredInsertHashmap;
use crate::config::{Config, Delimiter};
use crate::dates;
use crate::scales::{Extent, ExtentBuilder};
use crate::select::SelectedColumns;
use crate::util;
use crate::CliResult;
static USAGE: &str = r#"
Complete CSV data by adding rows for missing values of a given column.
This command is able to handle either integer or partial dates (year-month-date,
year-month or just year).
A --min and/or --max flag can be used to specify a range to complete. Note that
if input contains values outside of the specified range, they will be filtered
out from the output.
If you know your input is already sorted on the column to complete, you can
leverage the -S/--sorted flag to make the command work faster and use less
memory.
This command is also able to check whether the given column is complete using
the --check flag.
Examples:
Complete integer column named "score" from 1 to 10:
$ xan complete -m 1 -M 10 score input.csv
Complete already sorted date values in column named "date":
$ xan complete -D --sorted date input.csv
Check completeness of values (already sorted in descending order) in "score" column:
$ xan complete --check --sorted --reverse score input.csv
Complete integer column named "score" within groups defined by columns "name" and "category":
$ xan complete --groupby name,category score input.csv
Usage:
xan complete [options] <column> [<input>]
xan complete --help
complete options:
--check Check that the input is complete. When used with
either --min or --max, only checks completeness
within the specified range.
-m, --min <value> Minimum value of range to complete. Note that values
less than this minimum value in the input will be
filtered out.
-M, --max <value> Maximum value of range to complete. Note that values
greater than this maximum value in the input will be
filtered out.
-D, --dates Set to indicate your values are dates (supporting
year, year-month or year-month-day).
-S, --sorted Indicate that the input is already sorted.
-R, --reverse Whether to consider the data in reverse order.
-g, --groupby <cols> Select columns to group by. The completion will be
done independently within each group.
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-n, --no-headers When set, the first row will not be evaled
as headers.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character.
"#;
#[derive(Deserialize, Debug)]
struct Args {
arg_column: SelectedColumns,
arg_input: Option<String>,
flag_min: Option<String>,
flag_max: Option<String>,
flag_output: Option<String>,
flag_no_headers: bool,
flag_delimiter: Option<Delimiter>,
flag_check: bool,
flag_dates: bool,
flag_sorted: bool,
flag_reverse: bool,
flag_groupby: Option<SelectedColumns>,
}
impl Args {
fn get_value_from_str(&self, cell: &str) -> CliResult<Value> {
if self.flag_dates {
Value::new_date(cell)
} else {
Value::new_integer(cell)
}
}
fn get_value_from_bytes(&self, cell: &[u8]) -> CliResult<Value> {
self.get_value_from_str(str::from_utf8(cell).unwrap())
}
}
#[derive(Debug, PartialEq, Clone, Copy)]
enum ValueType {
Integer,
Date(Unit),
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
enum Value {
Integer(i64),
Date(dates::PartialDate),
}
impl Value {
fn new_date(s: &str) -> CliResult<Self> {
Ok(Self::Date(dates::parse_partial_date(s).map_or_else(
|| Err(format!("Invalid date format: {}", s)),
Ok,
)?))
}
fn new_integer(s: &str) -> CliResult<Self> {
Ok(Self::Integer(
s.parse::<i64>()
.map_err(|_| format!("Invalid integer format: {}", s))?,
))
}
fn next(&self) -> Self {
match self {
Self::Integer(i) => Self::Integer(i + 1),
Self::Date(d) => Self::Date(d.next()),
}
}
fn previous(&self) -> Self {
match self {
Self::Integer(i) => Self::Integer(i - 1),
Self::Date(d) => Self::Date(d.previous()),
}
}
fn advance(&self, reverse: bool) -> Self {
if reverse {
self.previous()
} else {
self.next()
}
}
fn to_bytes(self) -> Vec<u8> {
match self {
Self::Integer(i) => i.to_string().into_bytes(),
Self::Date(ref d) => dates::format_partial_date(d.as_unit(), d.as_date()).into_bytes(),
}
}
fn as_type(&self) -> ValueType {
match self {
Self::Integer(_) => ValueType::Integer,
Self::Date(d) => ValueType::Date(d.as_unit()),
}
}
}
fn check_type(expected_value_type: &mut Option<ValueType>, value_type: ValueType) -> CliResult<()> {
if let Some(expected) = expected_value_type {
if value_type == *expected {
Ok(())
} else {
Err(format!(
"Inconsistent value units: first seen was {:?} and then found {:?}",
expected, value_type,
))?
}
} else {
*expected_value_type = Some(value_type);
Ok(())
}
}
fn mutate_record_to_emit(
record: &mut ByteRecord,
len: usize,
completed: (usize, &[u8]),
group_mask: Option<&Vec<bool>>,
group_opt: Option<&ByteRecord>,
) {
record.clear();
let mut group_index: usize = 0;
for i in 0..len {
if matches!(group_mask, Some(mask) if mask[i]) {
record.push_field(&group_opt.unwrap()[group_index]);
group_index += 1;
} else if completed.0 == i {
record.push_field(completed.1);
} else {
record.push_field(b"");
}
}
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
if args.flag_groupby.is_some() && args.flag_sorted {
Err("--groupby cannot be used with --sorted")?;
}
let min: Option<Value> = args
.flag_min
.as_ref()
.map(|m| args.get_value_from_str(m))
.transpose()?;
let max: Option<Value> = args
.flag_max
.as_ref()
.map(|m| args.get_value_from_str(m))
.transpose()?;
let mut expected_value_type: Option<ValueType> = None;
if let (Some(min_v), Some(max_v)) = (&min, &max) {
if min_v.as_type() != max_v.as_type() {
Err(format!(
"min and max have different units: {:?} vs {:?}",
min_v.as_type(),
max_v.as_type(),
))?;
}
if min_v > max_v {
Err("min cannot be greater than max")?;
}
check_type(&mut expected_value_type, min_v.as_type())?;
}
let mut extent_builder = ExtentBuilder::<Value>::new();
if let Some(m) = min {
extent_builder.clamp_min(m);
}
if let Some(m) = max {
extent_builder.clamp_max(m);
}
let mut extent: Option<Extent<Value>> = extent_builder.clone().build();
let rconf = Config::new(&args.arg_input)
.no_headers(args.flag_no_headers)
.select(args.arg_column.clone())
.delimiter(args.flag_delimiter);
let mut wtr_opt = (!(args.flag_check))
.then(|| Config::new(&args.flag_output).simd_writer())
.transpose()?;
let mut output_record = ByteRecord::new();
let mut rdr = rconf.simd_reader()?;
let headers = rdr.byte_headers()?.clone();
let column_to_complete_index = rconf.single_selection(&headers)?;
let groupby_sel_opt = args
.flag_groupby
.as_ref()
.map(|sel| sel.selection(&headers, !rconf.no_headers))
.transpose()?;
let groupby_mask_opt = groupby_sel_opt.as_ref().map(|sel| sel.mask(headers.len()));
if matches!(&groupby_sel_opt, Some(sel) if sel.contains(column_to_complete_index)) {
Err("Cannot complete a column that is also used in --groupby!")?;
}
if !rconf.no_headers {
if let Some(wtr) = wtr_opt.as_mut() {
wtr.write_byte_record(&headers)?;
}
}
let mut record = ByteRecord::new();
let mut records_per_group: ClusteredInsertHashmap<ByteRecord, Vec<ByteRecord>> =
ClusteredInsertHashmap::new();
if !args.flag_sorted {
if let Some(flag_groupby) = &args.flag_groupby {
let group_sel = flag_groupby.selection(&headers, !args.flag_no_headers)?;
while rdr.read_byte_record(&mut record)? {
let value: Value = args.get_value_from_bytes(&record[column_to_complete_index])?;
check_type(&mut expected_value_type, value.as_type())?;
if extent.is_none() {
extent_builder.process(value);
} else if matches!(extent, Some(e) if value < e.min() || value > e.max()) {
continue;
}
let key = group_sel.select(&record).collect::<ByteRecord>();
records_per_group.insert_with_or_else(
key,
|| vec![record.clone()],
|v| v.push(record.clone()),
);
}
} else {
records_per_group.insert_with(ByteRecord::new(), || {
rdr.byte_records().collect::<Result<Vec<_>, _>>().unwrap()
});
};
} else {
records_per_group.insert_with(ByteRecord::new(), Vec::new);
}
if extent.is_none() {
extent = extent_builder.build();
}
let min = min.or_else(|| extent.as_ref().map(|e| e.min()));
let max = max.or_else(|| extent.as_ref().map(|e| e.max()));
let current_value: Option<Value> = if args.flag_reverse { max } else { min };
let mut process_records_in_group = |records: &mut dyn Iterator<
Item = Result<ByteRecord, simd_csv::Error>,
>,
group_key: &ByteRecord|
-> CliResult<()> {
let mut local_current_value: Option<Value> = current_value;
for record in records {
let record = record?;
let value: Value = args.get_value_from_bytes(&record[column_to_complete_index])?;
check_type(&mut expected_value_type, value.as_type())?;
if matches!(min, Some(m) if value < m) {
if args.flag_reverse {
break;
} else {
continue;
}
}
if matches!(max, Some(m) if value > m) {
if args.flag_reverse {
continue;
} else {
break;
}
}
if local_current_value.is_some() {
if let Some(wtr) = wtr_opt.as_mut() {
while match (args.flag_reverse, local_current_value) {
(true, Some(cv)) => cv > value,
(false, Some(cv)) => cv < value,
_ => false,
} {
mutate_record_to_emit(
&mut output_record,
headers.len(),
(
column_to_complete_index,
&local_current_value.unwrap().to_bytes(),
),
groupby_mask_opt.as_ref(),
Some(group_key),
);
wtr.write_byte_record(&output_record)?;
local_current_value =
local_current_value.map(|v| v.advance(args.flag_reverse));
}
} else if value != local_current_value.unwrap() {
if match (args.flag_reverse, local_current_value) {
(true, Some(cv)) => cv < value,
(false, Some(cv)) => cv > value,
_ => false,
} {
continue;
}
Err(format!(
"file is not complete: missing value {:?}",
local_current_value.unwrap()
))?;
}
} else {
local_current_value = Some(value);
}
local_current_value = local_current_value.map(|v| v.advance(args.flag_reverse));
if let Some(wtr) = wtr_opt.as_mut() {
wtr.write_byte_record(&record)?;
}
}
if (args.flag_reverse && min.is_some()) || (!args.flag_reverse && max.is_some()) {
if let Some(wtr) = wtr_opt.as_mut() {
while local_current_value.is_some()
&& match (args.flag_reverse, local_current_value) {
(true, Some(cv)) if cv >= min.unwrap() => true,
(false, Some(cv)) if cv <= max.unwrap() => true,
_ => false,
}
{
mutate_record_to_emit(
&mut output_record,
headers.len(),
(
column_to_complete_index,
&local_current_value.unwrap().to_bytes(),
),
groupby_mask_opt.as_ref(),
Some(group_key),
);
wtr.write_byte_record(&output_record)?;
local_current_value = local_current_value.map(|v| v.advance(args.flag_reverse));
}
} else if match (args.flag_reverse, local_current_value) {
(true, Some(cv)) if cv >= min.unwrap() => true,
(false, Some(cv)) if cv <= max.unwrap() => true,
_ => false,
} {
Err(format!(
"file is not complete: missing value {:?}",
local_current_value.unwrap()
))?;
}
}
Ok(())
};
for (group_key, records) in records_per_group.iter() {
if args.flag_sorted && args.flag_groupby.is_none() {
process_records_in_group(&mut rdr.byte_records(), group_key)?;
} else {
let mut values_and_records = records
.iter()
.map(|record| -> CliResult<(Value, ByteRecord)> {
let value_and_record = (
args.get_value_from_bytes(&record[column_to_complete_index])?,
record.clone(),
);
Ok(value_and_record)
})
.collect::<Result<Vec<_>, _>>()?;
values_and_records.sort_by(|a, b| {
if args.flag_reverse {
b.0.cmp(&a.0)
} else {
a.0.cmp(&b.0)
}
});
let records = values_and_records.iter().map(|(_, r)| r);
let mut records: &mut dyn Iterator<Item = Result<ByteRecord, simd_csv::Error>> =
&mut records.map(|r| Ok(r.clone()));
process_records_in_group(&mut records, group_key)?;
}
}
if let Some(wtr) = wtr_opt.as_mut() {
Ok(wtr.flush()?)
} else {
writeln!(&mut stdout(), "file is complete!")?;
Ok(())
}
}