use bdt::compare::ComparisonResult;
use bdt::convert::convert_files;
use bdt::parquet::view_parquet_meta;
use bdt::utils::{parse_filename, register_table, sanitize_table_name};
use bdt::{compare, Error};
use datafusion::common::DataFusionError;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::prelude::*;
use std::fs;
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(name = "bdt", about = "Boring Data Tool")]
enum Command {
View {
#[structopt(parse(from_os_str))]
filename: PathBuf,
#[structopt(short, long)]
limit: Option<usize>,
},
Schema {
#[structopt(parse(from_os_str))]
filename: PathBuf,
},
Convert {
#[structopt(short, long)]
single_file: bool,
#[structopt(short, long)]
zstd: bool,
#[structopt(parse(from_os_str))]
input: PathBuf,
#[structopt(parse(from_os_str))]
output: PathBuf,
},
Count {
#[structopt(parse(from_os_str), long)]
table: PathBuf,
},
Query {
#[structopt(parse(from_os_str), long)]
table: Vec<PathBuf>,
#[structopt(parse(from_os_str), long)]
tables: Option<PathBuf>,
#[structopt(long)]
sql: Option<String>,
#[structopt(parse(from_os_str), long)]
sql_file: Option<PathBuf>,
#[structopt(parse(from_os_str), long)]
output: Option<PathBuf>,
#[structopt(short, long)]
verbose: bool,
},
ViewParquetMeta {
#[structopt(parse(from_os_str))]
input: PathBuf,
},
Compare {
#[structopt(parse(from_os_str))]
input1: PathBuf,
#[structopt(parse(from_os_str))]
input2: PathBuf,
#[structopt(short, long)]
epsilon: Option<f64>,
#[structopt(short, long)]
no_header_row: bool,
},
}
#[tokio::main]
async fn main() {
let cmd = Command::from_args();
if let Err(e) = execute_command(cmd).await {
println!("{:?}", e);
std::process::exit(-1);
}
}
async fn execute_command(cmd: Command) -> Result<(), Error> {
let config = SessionConfig::new().with_information_schema(true);
let ctx = SessionContext::new_with_config(config);
match cmd {
Command::View { filename, limit } => {
let filename = parse_filename(&filename)?;
let df = register_table(&ctx, "t", filename).await?;
let limit = limit.unwrap_or(10);
if limit > 0 {
df.show_limit(limit).await?;
println!(
"Limiting to {} rows. Run with --limit 0 to remove limit.",
limit
);
} else {
df.show().await?;
}
}
Command::Schema { filename } => {
let filename = parse_filename(&filename)?;
let _ = register_table(&ctx, "t", filename).await?;
let sql = "SELECT column_name, data_type, is_nullable \
FROM information_schema.columns WHERE table_name = 't'";
let df = ctx.sql(sql).await?;
df.show().await?;
}
Command::Convert {
single_file,
input,
output,
zstd,
} => {
let input_filename = parse_filename(&input)?;
let output_filename = parse_filename(&output)?;
convert_files(&ctx, input_filename, output_filename, single_file, zstd).await?;
}
Command::Query {
table,
tables,
sql,
sql_file,
output,
verbose,
} => {
if let Some(dir) = tables {
let paths = fs::read_dir(&dir)?;
for path in paths {
let path = path?.path();
let file_name =
path.file_stem().unwrap().to_str().ok_or_else(|| {
DataFusionError::Internal("Invalid filename".to_string())
})?;
let table_name = sanitize_table_name(file_name);
println!("Registering table '{}' for {}", table_name, path.display());
register_table(&ctx, &table_name, parse_filename(&path)?).await?;
}
}
for table in &table {
let file_name = table
.file_stem()
.unwrap()
.to_str()
.ok_or_else(|| DataFusionError::Internal("Invalid filename".to_string()))?;
let table_name = sanitize_table_name(file_name);
println!("Registering table '{}' for {}", table_name, table.display());
register_table(&ctx, &table_name, parse_filename(table)?).await?;
}
let sql = match (sql, sql_file) {
(Some(text), None) => text,
(None, Some(file)) => fs::read_to_string(file)?,
_ => panic!("Must specify either sql or sql_file, but not both"),
};
let df = ctx.sql(&sql).await?;
if verbose {
let explain = df.clone().explain(false, false)?;
explain.show().await?;
}
if let Some(path) = output {
match path.extension() {
Some(x) => match x.to_str().unwrap() {
"csv" => {
println!("Writing results in CSV format to {}", path.display());
let _ = df
.write_csv(
path.to_str().unwrap(),
DataFrameWriteOptions::default(),
None,
)
.await?;
}
"parquet" => {
println!("Writing results in Parquet format to {}", path.display());
let _ = df
.write_parquet(
path.to_str().unwrap(),
DataFrameWriteOptions::default(),
None,
)
.await?;
}
_ => {
return Err(Error::General(
"Unsupported file format for saving query results".to_string(),
))
}
},
_ => {
return Err(Error::General(
"Unsupported file format for saving query results".to_string(),
))
}
}
} else {
df.show().await?;
}
}
Command::Count { table } => {
let table_name = "__t1__";
register_table(&ctx, table_name, parse_filename(&table)?).await?;
let sql = format!("SELECT COUNT(*) FROM {}", table_name);
let df = ctx.sql(&sql).await?;
df.show().await?;
}
Command::ViewParquetMeta { input } => {
view_parquet_meta(input)?;
}
Command::Compare {
input1,
input2,
epsilon,
no_header_row,
} => match compare::compare_files(input1, input2, !no_header_row, epsilon).await? {
ComparisonResult::Ok => {
println!("Files match");
}
diff => return Err(Error::General(format!("{}", diff))),
},
}
Ok(())
}