use crate::tpch_cli::plan::GenerationPlan;
use crate::tpch_cli::{OutputFormat, Table};
use log::debug;
use parquet::basic::Compression;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::io;
use std::path::PathBuf;
#[derive(Debug, Clone, PartialEq)]
pub enum OutputLocation {
File(PathBuf),
Stdout,
}
impl Display for OutputLocation {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
OutputLocation::File(path) => {
let Some(file) = path.file_name() else {
return write!(f, "{}", path.display());
};
write!(f, "{}", file.to_string_lossy())
}
OutputLocation::Stdout => write!(f, "Stdout"),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct OutputPlan {
table: Table,
scale_factor: f64,
output_format: OutputFormat,
parquet_compression: Compression,
output_location: OutputLocation,
generation_plan: GenerationPlan,
csv_delimiter: char,
}
impl OutputPlan {
pub fn new(
table: Table,
scale_factor: f64,
output_format: OutputFormat,
parquet_compression: Compression,
output_location: OutputLocation,
generation_plan: GenerationPlan,
csv_delimiter: char,
) -> Self {
Self {
table,
scale_factor,
output_format,
parquet_compression,
output_location,
generation_plan,
csv_delimiter,
}
}
pub fn table(&self) -> Table {
self.table
}
pub fn scale_factor(&self) -> f64 {
self.scale_factor
}
pub fn output_format(&self) -> OutputFormat {
self.output_format
}
pub fn output_location(&self) -> &OutputLocation {
&self.output_location
}
pub fn parquet_compression(&self) -> Compression {
self.parquet_compression
}
pub fn chunk_count(&self) -> usize {
self.generation_plan.chunk_count()
}
pub fn generation_plan(&self) -> &GenerationPlan {
&self.generation_plan
}
pub fn csv_delimiter(&self) -> char {
self.csv_delimiter
}
}
impl Display for OutputPlan {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"table {} (SF={}, {} chunks) to {}",
self.table,
self.scale_factor,
self.chunk_count(),
self.output_location
)
}
}
pub struct OutputPlanGenerator {
format: OutputFormat,
scale_factor: f64,
parquet_compression: Compression,
parquet_row_group_bytes: i64,
stdout: bool,
output_dir: PathBuf,
csv_delimiter: char,
output_plans: Vec<OutputPlan>,
created_directories: HashSet<PathBuf>,
}
impl OutputPlanGenerator {
pub fn new(
format: OutputFormat,
scale_factor: f64,
parquet_compression: Compression,
parquet_row_group_bytes: i64,
stdout: bool,
output_dir: PathBuf,
csv_delimiter: char,
) -> Self {
Self {
format,
scale_factor,
parquet_compression,
parquet_row_group_bytes,
stdout,
output_dir,
csv_delimiter,
output_plans: Vec::new(),
created_directories: HashSet::new(),
}
}
pub fn generate_plans(
&mut self,
table: Table,
cli_part: Option<i32>,
cli_part_count: Option<i32>,
) -> io::Result<()> {
if let (None, Some(part_count)) = (cli_part, cli_part_count) {
if GenerationPlan::partitioned_table(table) {
debug!("Generating all partitions for table {table} with part count {part_count}");
for part in 1..=part_count {
self.generate_plan_inner(table, Some(part), Some(part_count))?;
}
} else {
debug!("Generating single partition for table {table}");
self.generate_plan_inner(table, Some(1), Some(1))?;
}
} else {
self.generate_plan_inner(table, cli_part, cli_part_count)?;
}
Ok(())
}
fn generate_plan_inner(
&mut self,
table: Table,
cli_part: Option<i32>,
cli_part_count: Option<i32>,
) -> io::Result<()> {
let generation_plan = GenerationPlan::try_new(
table,
self.format,
self.scale_factor,
cli_part,
cli_part_count,
self.parquet_row_group_bytes,
)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let output_location = self.output_location(table, cli_part)?;
let plan = OutputPlan::new(
table,
self.scale_factor,
self.format,
self.parquet_compression,
output_location,
generation_plan,
self.csv_delimiter,
);
self.output_plans.push(plan);
Ok(())
}
fn output_location(&mut self, table: Table, part: Option<i32>) -> io::Result<OutputLocation> {
if self.stdout {
Ok(OutputLocation::Stdout)
} else {
let extension = match self.format {
OutputFormat::Tbl => "tbl",
OutputFormat::Csv => "csv",
OutputFormat::Parquet => "parquet",
};
let mut output_path = self.output_dir.clone();
if let Some(part) = part {
output_path.push(table.to_string());
self.ensure_directory_exists(&output_path)?;
output_path.push(format!("{table}.{part}.{extension}"));
} else {
output_path.push(format!("{table}.{extension}"));
}
Ok(OutputLocation::File(output_path))
}
}
fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
if self.created_directories.contains(dir) {
return Ok(());
}
std::fs::create_dir_all(dir).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!("Error creating directory {}: {}", dir.display(), e),
)
})?;
self.created_directories.insert(dir.clone());
Ok(())
}
pub fn build(self) -> Vec<OutputPlan> {
self.output_plans
}
}