use std::collections::HashMap;
use std::fs;
use std::fs::File;
use std::path::{PathBuf};
use std::sync::Arc;
use glob::{glob_with, MatchOptions};
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
pub fn convert_to_parquet(
file_path: &PathBuf,
delimiter: char,
has_header: bool,
sampling_size: u16,
) -> Result<(), Box<dyn std::error::Error>> {
let file = File::open(file_path)?;
let (csv_schema, _) = arrow_csv::reader::Format::default()
.with_header(has_header)
.with_delimiter(delimiter as u8)
.infer_schema(file, Some(sampling_size as usize))?;
let schema_ref = remove_deduplicate_columns(csv_schema);
let file = File::open(file_path)?;
let mut csv = arrow_csv::ReaderBuilder::new(schema_ref.clone())
.with_delimiter(delimiter as u8)
.with_header(has_header)
.build(file)?;
let target_file = file_path.with_extension("parquet");
delete_if_exist(target_file.to_str().unwrap())?;
let mut file = File::create(target_file).unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_created_by("cc2p".to_string())
.build();
let mut parquet_writer = parquet::arrow::ArrowWriter::try_new(&mut file, schema_ref, Some(props))?;
for batch in csv.by_ref() {
match batch {
Ok(batch) => parquet_writer.write(&batch)?,
Err(_error) => {
return Err(Box::new(_error));
}
}
}
parquet_writer.close()?;
Ok(())
}
pub fn delete_if_exist(filename: &str) -> Result<(), Box<dyn std::error::Error>> {
if fs::metadata(filename).is_ok() {
fs::remove_file(filename)?;
}
Ok(())
}
struct Empty {}
pub fn remove_deduplicate_columns(sc: arrow_schema::Schema) -> Arc<arrow_schema::Schema> {
let mut index = 1;
let mut deduplicated_fields = Vec::new();
let mut names = HashMap::new();
for field in sc.fields() {
let field_name = field.name().as_str();
let field_name = clean_column_name(field_name);
if let std::collections::hash_map::Entry::Vacant(e) = names.entry(field_name.clone()) {
e.insert(Empty {});
if field.name().is_empty() {
let name = format!("column_{}", index);
index += 1;
let new_field = <arrow_schema::Field as Clone>::clone(&(*field).clone()).with_name(name);
deduplicated_fields.push(Arc::new(new_field));
} else {
deduplicated_fields.push(field.clone());
}
} else {
let name = format!("{}_{}", field_name, index);
index += 1;
let new_field = <arrow_schema::Field as Clone>::clone(&(*field).clone()).with_name(name);
deduplicated_fields.push(Arc::new(new_field));
}
}
let list_fields: Vec<_> = deduplicated_fields.into_iter().collect();
let deduplicated_schema = arrow_schema::Schema::new_with_metadata(list_fields, sc.metadata);
Arc::new(deduplicated_schema)
}
pub fn find_files(pattern: &str) -> Vec<PathBuf> {
let mut files = vec![];
let options = MatchOptions {
case_sensitive: false,
require_literal_separator: false,
require_literal_leading_dot: false,
};
for entry in glob_with(pattern, options).expect("failed to read file search pattern") {
match entry {
Ok(p) => {
if p.is_file() {
if let Some(ext) = p.extension() {
if ext == "csv" {
files.push(p);
}
}
}
}
Err(e) => eprintln!("{:?}", e),
}
}
files
}
pub fn clean_column_name(column_name: &str) -> String {
let cleaned = regex::Regex::new(r"[^a-zA-Z0-9_\-\s]").unwrap().replace_all(column_name, "");
cleaned.to_string()
}
#[cfg(test)]
mod tests {
use arrow_schema::{Field};
use super::*;
#[test]
fn test_convert_to_parquet() {
let mut source_file = std::env::current_dir().unwrap();
source_file.push("testdata");
source_file.push("sample_empty_header.csv");
let result = convert_to_parquet(&source_file, ',', true, 10);
assert!(result.is_ok());
let parquet_file = PathBuf::from("testdata/sample_empty_header.parquet");
assert!(parquet_file.exists());
fs::remove_file(parquet_file).unwrap();
}
#[test]
fn test_convert_to_parquet_delimiter() {
let mut source_file = std::env::current_dir().unwrap();
source_file.push("testdata");
source_file.push("sample_delimiter.csv");
let result = convert_to_parquet(&source_file, ';', true, 10);
assert!(result.is_ok());
let parquet_file = PathBuf::from("testdata/sample_delimiter.parquet");
assert!(parquet_file.exists());
fs::remove_file(parquet_file).unwrap();
}
#[test]
fn test_convert_to_parquet_no_header() {
let mut source_file = std::env::current_dir().unwrap();
source_file.push("testdata");
source_file.push("sample_no_header.csv");
let result = convert_to_parquet(&source_file, ',', false, 10);
assert!(result.is_ok());
let parquet_file = PathBuf::from("testdata/sample_no_header.parquet");
assert!(parquet_file.exists());
fs::remove_file(parquet_file).unwrap();
}
#[test]
fn test_remove_deduplicate_columns() {
let schema = arrow_schema::Schema::new(vec![
Field::new("name", arrow_schema::DataType::Utf8, false),
Field::new("", arrow_schema::DataType::Utf8, false),
Field::new("age", arrow_schema::DataType::Int32, false),
Field::new("age", arrow_schema::DataType::Int64, false),
]);
let deduplicated_schema = remove_deduplicate_columns(schema);
dbg!(&deduplicated_schema.fields);
assert_eq!(deduplicated_schema.fields().len(), 4);
assert_eq!(deduplicated_schema.fields.first().unwrap().name(), "name");
assert_eq!(deduplicated_schema.fields.get(1).unwrap().name(), "column_1");
assert_eq!(deduplicated_schema.fields.get(2).unwrap().name(), "age");
assert_eq!(deduplicated_schema.fields.get(3).unwrap().name(), "age_2");
}
#[test]
fn test_clean_column_names() {
assert_eq!(clean_column_name("abc"), "abc");
assert_eq!(clean_column_name("ab c"), "ab c");
assert_eq!(clean_column_name("ab.c"), "abc");
assert_eq!(clean_column_name("ab-_c"), "ab-_c");
assert_eq!(clean_column_name("Abc"), "Abc");
assert_eq!(clean_column_name("a8A"), "a8A");
assert_eq!(clean_column_name("a@bc"), "abc");
assert_eq!(clean_column_name("abc#"), "abc");
assert_eq!(clean_column_name("ab}}[}c"), "abc");
assert_eq!(clean_column_name("ab c "), "ab c ");
}
#[test]
fn test_find_files() {
assert_eq!(find_files("testdata/sample.csv").len(), 1);
assert_eq!(find_files("testdata/*.csv").len(), 4);
assert_eq!(find_files("not-exist/*.csv").len(), 0);
assert_eq!(find_files("testdata/*delimi*.csv").len(), 1);
}
}