#![warn(clippy::all)]
#![forbid(unsafe_code)]
use csv::ByteRecord;
use humansize::{file_size_opts, FileSize};
use lazy_static::lazy_static;
use log::debug;
use regex::{bytes::Regex as BytesRegex, Regex};
use std::{
borrow::Cow,
fs,
io::{self, prelude::*},
path::PathBuf,
process,
time::Instant,
};
use structopt::StructOpt;
mod clean_column_names;
#[macro_use]
mod errors;
mod util;
use crate::clean_column_names::ColumnNameCleanerType;
use crate::errors::*;
use crate::util::CharSpecifier;
const BUFFER_SIZE: usize = 256 * 1024;
#[derive(Debug, StructOpt)]
#[structopt(
name = "scrubcsv",
about = "Clean and normalize a CSV file.",
after_help = "Read a CSV file, normalize the \"good\" lines, and print them to standard
output. Discard any lines with the wrong number of columns.
Regular expressions use Rust syntax, as described here:
https://doc.rust-lang.org/regex/regex/index.html#syntax
scrubcsv should work with any ASCII-compatible encoding, but it will not
attempt to transcode.
Exit code:
0 on success
1 on error
2 if more than 10% of rows were bad"
)]
struct Opt {
input: Option<PathBuf>,
#[structopt(
value_name = "CHAR",
short = "d",
long = "delimiter",
default_value = ","
)]
delimiter: CharSpecifier,
#[structopt(value_name = "NULL_REGEX", short = "n", long = "null")]
null: Option<String>,
#[structopt(long = "replace-newlines")]
replace_newlines: bool,
#[structopt(long = "trim-whitespace")]
trim_whitespace: bool,
#[structopt(value_name = "CLEANER_TYPE", long = "clean-column-names")]
clean_column_names: Option<Option<ColumnNameCleanerType>>,
#[structopt(long = "reserve-column-names")]
reserve_column_names: Option<Regex>,
#[structopt(value_name = "COL", long = "drop-row-if-null")]
drop_row_if_null: Vec<String>,
#[structopt(short = "q", long = "quiet")]
quiet: bool,
#[structopt(value_name = "CHAR", long = "quote", default_value = "\"")]
quote: CharSpecifier,
}
impl Opt {
fn column_name_cleaner_type(&self) -> Option<ColumnNameCleanerType> {
match self.clean_column_names {
Some(Some(cleaner_type)) => Some(cleaner_type),
Some(None) => Some(ColumnNameCleanerType::Unique),
None => None,
}
}
}
lazy_static! {
static ref NEWLINE_RE: BytesRegex = BytesRegex::new(r#"\n|\r\n?"#)
.expect("regex in source code is unparseable");
}
fn run() -> Result<()> {
env_logger::init();
let opt: Opt = Opt::from_args();
debug!("Options: {:#?}", opt);
let start_time = Instant::now();
let null_re = if let Some(null_re_str) = opt.null.as_ref() {
let s = format!("^{}$", null_re_str);
let re = BytesRegex::new(&s).context("can't compile regular expression")?;
Some(re)
} else {
None
};
let stdin = io::stdin();
let input: Box<dyn Read> = if let Some(ref path) = opt.input {
Box::new(
fs::File::open(path)
.with_context(|_| format!("cannot open {}", path.display()))?,
)
} else {
Box::new(stdin.lock())
};
let mut rdr_builder = csv::ReaderBuilder::new();
rdr_builder.buffer_capacity(BUFFER_SIZE);
rdr_builder.has_headers(true);
rdr_builder.flexible(true);
if let Some(delimiter) = opt.delimiter.char() {
rdr_builder.delimiter(delimiter);
} else {
return Err(format_err!("field delimiter is required"));
}
if let Some(quote) = opt.quote.char() {
rdr_builder.quote(quote);
} else {
rdr_builder.quoting(false);
}
let mut rdr = rdr_builder.from_reader(input);
let stdout = io::stdout();
let output = stdout.lock();
let mut wtr = csv::WriterBuilder::new()
.buffer_capacity(BUFFER_SIZE)
.from_writer(output);
let mut hdr = rdr
.byte_headers()
.context("cannot read headers")?
.to_owned();
if let Some(cleaner_type) = opt.column_name_cleaner_type() {
let mut cleaner = cleaner_type.build_cleaner();
let mut new_hdr = ByteRecord::default();
for col in hdr.into_iter() {
let col = String::from_utf8_lossy(col);
let col = cleaner.unique_id_for(&col)?.to_owned();
if let Some(reserved_re) = &opt.reserve_column_names {
if reserved_re.is_match(&col[..]) {
return Err(format_err!(
"file used reserved column name {:?}",
col
));
}
}
new_hdr.push_field(col.as_bytes());
}
hdr = new_hdr;
}
wtr.write_byte_record(&hdr)
.context("cannot write headers")?;
let expected_cols = hdr.len();
let required_cols = hdr
.iter()
.map(|name| -> bool {
opt.drop_row_if_null
.iter()
.any(|requried_name| requried_name.as_bytes() == name)
})
.collect::<Vec<bool>>();
let mut rows: u64 = 1;
let mut bad_rows: u64 = 0;
let use_fast_path = null_re.is_none()
&& !opt.replace_newlines
&& !opt.trim_whitespace
&& opt.drop_row_if_null.is_empty();
'next_row: for record in rdr.byte_records() {
let record = record.context("cannot read record")?;
rows += 1;
if record.len() != expected_cols {
bad_rows += 1;
debug!(
"row {}: expected {} columns, found {}",
rows,
expected_cols,
record.len(),
);
continue 'next_row;
}
if use_fast_path {
wtr.write_record(record.into_iter())
.context("cannot write record")?;
} else {
let cleaned = record.into_iter().map(|mut val: &[u8]| -> Cow<'_, [u8]> {
if let Some(ref null_re) = null_re {
if null_re.is_match(val) {
val = &[]
}
}
if opt.trim_whitespace {
let first = val.iter().position(|c| !c.is_ascii_whitespace());
let last = val.iter().rposition(|c| !c.is_ascii_whitespace());
val = match (first, last) {
(Some(first), Some(last)) if first <= last => {
&val[first..=last]
}
(None, None) => &[],
_ => panic!(
"tried to trim {:?}, got impossible indices {:?} {:?}",
val, first, last,
),
};
}
if opt.replace_newlines
&& (val.contains(&b'\n') || val.contains(&b'\r'))
{
NEWLINE_RE.replace_all(val, &b" "[..])
} else {
Cow::Borrowed(val)
}
});
if opt.drop_row_if_null.is_empty() {
wtr.write_record(cleaned).context("cannot write record")?;
} else {
let row = cleaned.collect::<Vec<Cow<'_, [u8]>>>();
for (value, &is_required_col) in row.iter().zip(required_cols.iter()) {
if is_required_col && value.is_empty() {
bad_rows += 1;
debug!("row {}: required column is empty", rows);
continue 'next_row;
}
}
wtr.write_record(row).context("cannot write record")?;
}
}
}
wtr.flush().context("error writing records")?;
if !opt.quiet {
let ellapsed = start_time.elapsed().as_secs_f64();
let bytes_per_second = (rdr.position().byte() as f64 / ellapsed) as i64;
eprintln!(
"{} rows ({} bad) in {:.2} seconds, {}/sec",
rows,
bad_rows,
ellapsed,
bytes_per_second.file_size(file_size_opts::BINARY)?,
);
}
if bad_rows.checked_mul(10).expect("multiplication overflow") > rows {
eprintln!("Too many rows ({} of {}) were bad", bad_rows, rows);
process::exit(2);
}
Ok(())
}
fn main() {
if let Err(err) = run() {
eprintln!("ERROR: {}", err);
let mut source = err.source();
while let Some(cause) = source {
eprintln!(" caused by: {}", cause);
source = cause.source();
}
process::exit(1);
}
}