use std::fs;
use std::io::Result;
use std::path::{Path, PathBuf};
use std::time::Instant;
use async_trait::async_trait;
use datafusion::arrow::datatypes::Schema;
use datafusion::error::DataFusionError;
use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::prelude::*;
pub mod tpcds;
pub mod tpch;
#[async_trait]
pub trait Tpc {
fn generate(
&self,
scale: usize,
partitions: usize,
input_path: &str,
output_path: &str,
) -> Result<()>;
fn get_table_names(&self) -> Vec<&str>;
fn get_table_ext(&self) -> &str;
fn get_schema(&self, table: &str) -> Schema;
}
pub async fn convert_to_parquet(
benchmark: &dyn Tpc,
input_path: &str,
output_path: &str,
) -> datafusion::error::Result<()> {
for table in benchmark.get_table_names() {
println!("Converting table {}", table);
let schema = benchmark.get_schema(table);
let file_ext = format!(".{}", benchmark.get_table_ext());
let options = CsvReadOptions::new()
.schema(&schema)
.delimiter(b'|')
.file_extension(&file_ext);
let path = format!("{}/{}.{}", input_path, table, benchmark.get_table_ext());
let path = Path::new(&path);
if !path.exists() {
panic!("path does not exist: {:?}", path);
}
let output_dir_name = format!("{}/{}.parquet", output_path, table);
let output_dir = Path::new(&output_dir_name);
if output_dir.exists() {
panic!("output dir already exists: {}", output_dir.display());
}
println!("Creating directory: {}", output_dir.display());
fs::create_dir(&output_dir)?;
let x = PathBuf::from(path);
let mut file_vec = vec![];
if x.is_dir() {
let files = fs::read_dir(path)?;
for file in files {
let file = file?;
file_vec.push(file);
}
}
let mut part = 0;
for file in &file_vec {
let stub = file.file_name().to_str().unwrap().to_owned();
let stub = &stub[0..stub.len() - 4]; let output_parts_dir = format!("{}/{}-temp.parquet", output_dir.display(), stub);
println!("Writing {}", output_parts_dir);
let options = options.clone();
convert_tbl(
&file.path(),
&output_parts_dir,
&options,
"parquet",
"snappy",
8192,
)
.await?;
let paths = fs::read_dir(&output_parts_dir)?;
for path in paths {
let path = path?;
let dest_file = format!("{}/part-{}.parquet", output_dir.display(), part);
part += 1;
let dest_path = Path::new(&dest_file);
move_or_copy(&path.path(), &dest_path)?;
}
println!("Removing {}", output_parts_dir);
fs::remove_dir_all(Path::new(&output_parts_dir))?;
}
}
Ok(())
}
pub(crate) fn move_or_copy(
source_path: &Path,
dest_path: &Path,
) -> std::result::Result<(), std::io::Error> {
if is_same_device(&source_path, &dest_path)? {
println!(
"Moving {} to {}",
source_path.display(),
dest_path.display()
);
fs::rename(&source_path, &dest_path)
} else {
println!(
"Copying {} to {}",
source_path.display(),
dest_path.display()
);
fs::copy(&source_path, &dest_path)?;
fs::remove_file(&source_path)
}
}
#[cfg(unix)]
fn is_same_device(path1: &Path, path2: &Path) -> std::result::Result<bool, std::io::Error> {
use std::os::unix::fs::MetadataExt;
let meta1 = fs::metadata(path1)?;
let meta2 = fs::metadata(path2.parent().unwrap())?;
Ok(meta1.dev() == meta2.dev())
}
#[cfg(windows)]
fn is_same_device(path1: &Path, path2: &Path) -> std::result::Result<bool, std::io::Error> {
use std::os::windows::fs::MetadataExt;
let meta1 = fs::metadata(path1)?;
let meta2 = fs::metadata(path2.parent().unwrap())?;
Ok(meta1.volume_serial_number() == meta2.volume_serial_number())
}
pub async fn convert_tbl(
input_path: &Path,
output_filename: &str,
options: &CsvReadOptions<'_>,
file_format: &str,
compression: &str,
batch_size: usize,
) -> datafusion::error::Result<()> {
println!(
"Converting '{}' to {}",
input_path.display(),
output_filename
);
let start = Instant::now();
let config = SessionConfig::new().with_batch_size(batch_size);
let ctx = SessionContext::with_config(config);
let csv_filename = format!("{}", input_path.display());
let df = ctx.read_csv(&csv_filename, options.clone()).await?;
match file_format {
"csv" => df.write_csv(&output_filename).await?,
"parquet" => {
let compression = match compression {
"none" => Compression::UNCOMPRESSED,
"snappy" => Compression::SNAPPY,
"lz4" => Compression::LZ4,
"lz0" => Compression::LZO,
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid compression format: {}",
other
)))
}
};
let props = WriterProperties::builder()
.set_compression(compression)
.build();
df.write_parquet(&output_filename, Some(props)).await?
}
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid output format: {}",
other
)))
}
}
println!("Conversion completed in {} ms", start.elapsed().as_millis());
Ok(())
}