pub fn clean_csv<R: Read, W: Write + Send + Sync + 'static>(
csv_rdr: &mut Reader<R>,
csv_wtr: Writer<W>,
schema_map: HashMap<String, Column>,
buffer_size: usize,
) -> Result<CleansingLog, CSVCleansingError>Expand description
Clean CSV files to conform to a type schema by streaming them through small memory buffers using multiple threads and logging data loss.
ยงExamples
use std::error::Error;
use csv::{Reader,Writer};
use csv_log_cleaner::{clean_csv, ColumnLog, get_schema_from_json_str};
use tempfile::tempdir;
use std::fs;
// Arrange
let dir = tempdir().expect("To be able to create temporary directory");
let input_csv_data = r#"NAME,AGE,DATE_OF_BIRTH
Raul,27,2004-01-31
Duke,27.8,2004-31-01
"#;
let input_path = dir.path().join("input.csv");
let output_path = dir.path().join("output.csv");
fs::write(input_path.clone(), input_csv_data).expect("To be able to write file");
let mut csv_rdr = Reader::from_path(input_path).expect("To be able to create reader");
let mut csv_wtr = Writer::from_path(output_path.clone()).expect("To be able to create writer");
let schema_path = dir.path().join("schema.json");
let schema_path_str = schema_path.to_str().unwrap();
let schema_path_string = String::from(schema_path_str);
let schema_string = r#"{
"columns": [
{
"name": "NAME",
"column_type": "String"
},
{
"name": "AGE",
"column_type": "Int"
},
{
"name": "DATE_OF_BIRTH",
"column_type": "Date",
"format": "%Y-%m-%d"
}
]
}"#;
let schema_map = get_schema_from_json_str(&schema_string).unwrap();
let buffer_size = 1;
let expected_date_of_birth_column_log = ColumnLog {
name: "DATE_OF_BIRTH".to_string(),
invalid_count: 1,
max_invalid: Some("2004-31-01".to_string()),
min_invalid: Some("2004-31-01".to_string()),
};
// Act
let result = clean_csv(&mut csv_rdr, csv_wtr, schema_map, buffer_size);
let output_csv = fs::read_to_string(output_path).expect("To be able to read from file");
let log_map = result.expect("Result to have content").log_map;
// Assert
assert_eq!(log_map.get("DATE_OF_BIRTH").unwrap(), &expected_date_of_birth_column_log);