use std::path::Path;
use indicatif::ProgressBar;
use super::run::Pipeline;
use crate::Error;
use crate::FileType;
use crate::Result;
use crate::cli::DisplayOutputFormat;
use crate::errors::PipelinePlanningError;
use crate::pipeline::dataframe::DataFramePipeline;
use crate::pipeline::dataframe::DataFrameSink;
use crate::pipeline::record_batch::RecordBatchPipeline;
use crate::pipeline::record_batch::RecordBatchSink;
use crate::pipeline::spec::ColumnSpec;
use crate::pipeline::spec::DisplaySlice;
use crate::pipeline::spec::SelectItem;
use crate::pipeline::spec::SelectSpec;
use crate::resolve_file_type;
pub struct PipelineBuilder {
read: Option<String>,
select: Option<SelectSpec>,
head: Option<usize>,
tail: Option<usize>,
sample: Option<usize>,
display_row_count: bool,
write: Option<String>,
schema: bool,
input_type_override: Option<FileType>,
output_type_override: Option<FileType>,
csv_has_header: Option<bool>,
sparse: bool,
json_pretty: bool,
progress: Option<ProgressBar>,
display_output_format: Option<DisplayOutputFormat>,
display_csv_headers: Option<bool>,
}
impl Default for PipelineBuilder {
fn default() -> Self {
Self {
read: None,
select: None,
head: None,
tail: None,
sample: None,
display_row_count: false,
write: None,
schema: false,
input_type_override: None,
output_type_override: None,
csv_has_header: None,
sparse: true,
json_pretty: false,
progress: None,
display_output_format: None,
display_csv_headers: None,
}
}
}
impl PipelineBuilder {
pub fn new() -> Self {
Self::default()
}
}
impl PipelineBuilder {
pub fn read(&mut self, path: &str) -> &mut Self {
self.read = Some(path.to_string());
self
}
pub fn select(&mut self, columns: &[&str]) -> &mut Self {
self.select = Some(SelectSpec {
columns: columns
.iter()
.map(|c| SelectItem::Column(ColumnSpec::Exact(c.to_string())))
.collect(),
group_by: None,
});
self
}
pub fn select_spec(&mut self, spec: SelectSpec) -> &mut Self {
self.select = Some(spec);
self
}
pub fn write<P: AsRef<Path>>(&mut self, path: P) -> &mut Self {
self.write = Some(path.as_ref().to_string_lossy().to_string());
self
}
pub fn head(&mut self, n: usize) -> &mut Self {
self.head = Some(n);
self
}
pub fn tail(&mut self, n: usize) -> &mut Self {
self.tail = Some(n);
self
}
pub fn sample(&mut self, n: usize) -> &mut Self {
self.sample = Some(n);
self
}
pub fn row_count(&mut self) -> &mut Self {
self.display_row_count = true;
self
}
pub fn schema(&mut self) -> &mut Self {
self.schema = true;
self
}
pub fn input_type(&mut self, file_type: Option<FileType>) -> &mut Self {
self.input_type_override = file_type;
self
}
pub fn output_type(&mut self, file_type: Option<FileType>) -> &mut Self {
self.output_type_override = file_type;
self
}
pub fn csv_has_header(&mut self, has_header: Option<bool>) -> &mut Self {
self.csv_has_header = has_header;
self
}
pub fn sparse(&mut self, sparse: bool) -> &mut Self {
self.sparse = sparse;
self
}
pub fn json_pretty(&mut self, json_pretty: bool) -> &mut Self {
self.json_pretty = json_pretty;
self
}
pub fn progress(&mut self, progress: Option<ProgressBar>) -> &mut Self {
self.progress = progress;
self
}
pub fn display_format(&mut self, format: DisplayOutputFormat) -> &mut Self {
self.display_output_format = Some(format);
self
}
pub fn display_csv_headers(&mut self, headers: bool) -> &mut Self {
self.display_csv_headers = Some(headers);
self
}
fn build_write_pipeline(
&self,
input_path: &str,
output_path: &str,
select: Option<SelectSpec>,
) -> Result<Pipeline> {
ensure_at_most_one_of_head_tail_sample(self.head, self.tail, self.sample)?;
let input_file_type = resolve_file_type(self.input_type_override, input_path)?;
let output_file_type = resolve_file_type(self.output_type_override, output_path)?;
if !input_file_type.supports_pipeline_conversion_input() {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::UnsupportedInputFileType(input_file_type.to_string()),
));
}
if !output_file_type.supports_pipeline_conversion_output() {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::UnsupportedOutputFileType(output_file_type.to_string()),
));
}
reject_orc_with_aggregates(input_file_type, &select)?;
let slice = slice_from_head_tail_sample(self.head, self.tail, self.sample);
Ok(dispatch_pipeline(
input_path.to_string(),
input_file_type,
select,
slice,
self.sparse,
self.csv_has_header,
UnifiedSink::Write {
output_path: output_path.to_string(),
output_file_type,
json_pretty: self.json_pretty,
progress: self.progress.clone(),
},
))
}
fn build_schema_display_pipeline(
&self,
input_path: &str,
select: Option<SelectSpec>,
) -> Result<Pipeline> {
let input_file_type = resolve_file_type(self.input_type_override, input_path)?;
if !input_file_type.supports_pipeline_display_input() {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::UnsupportedInputFileType(input_file_type.to_string()),
));
}
let output_format = self
.display_output_format
.unwrap_or(DisplayOutputFormat::Csv);
let sparse = self.sparse;
let input_path = input_path.to_string();
let csv_has_header = self.csv_has_header;
reject_orc_with_aggregates(input_file_type, &select)?;
Ok(dispatch_pipeline(
input_path,
input_file_type,
select,
None,
sparse,
csv_has_header,
UnifiedSink::Schema {
output_format,
sparse,
},
))
}
fn build_row_count_display_pipeline(
&self,
input_path: &str,
select: Option<SelectSpec>,
) -> Result<Pipeline> {
let input_file_type = resolve_file_type(self.input_type_override, input_path)?;
if !input_file_type.supports_pipeline_display_input() {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::UnsupportedInputFileType(input_file_type.to_string()),
));
}
let input_path = input_path.to_string();
let csv_has_header = self.csv_has_header;
let sparse = self.sparse;
reject_orc_with_aggregates(input_file_type, &select)?;
Ok(dispatch_pipeline(
input_path,
input_file_type,
select,
None,
sparse,
csv_has_header,
UnifiedSink::Count,
))
}
fn build_display_pipeline(
&self,
input_path: &str,
select: Option<SelectSpec>,
) -> Result<Pipeline> {
let slice =
slice_from_head_tail_sample(self.head, self.tail, self.sample).ok_or_else(|| {
Error::PipelinePlanningError(PipelinePlanningError::MissingRequiredStage(
"head(n), tail(n), or sample(n)".to_string(),
))
})?;
let input_file_type = resolve_file_type(self.input_type_override, input_path)?;
if !input_file_type.supports_pipeline_display_input() {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::UnsupportedInputFileType(input_file_type.to_string()),
));
}
let output_format = self
.display_output_format
.unwrap_or(DisplayOutputFormat::Csv);
let csv_stdout_headers = self.display_csv_headers.unwrap_or(true);
let input_path = input_path.to_string();
let csv_has_header = self.csv_has_header;
let sparse = self.sparse;
reject_orc_with_aggregates(input_file_type, &select)?;
Ok(dispatch_pipeline(
input_path,
input_file_type,
select,
Some(slice),
sparse,
csv_has_header,
UnifiedSink::Display {
output_format,
csv_stdout_headers,
},
))
}
fn build_aggregate_display_pipeline(
&self,
input_path: &str,
select: Option<SelectSpec>,
) -> Result<Pipeline> {
let input_file_type = resolve_file_type(self.input_type_override, input_path)?;
if !input_file_type.supports_pipeline_display_input() {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::UnsupportedInputFileType(input_file_type.to_string()),
));
}
if input_file_type == FileType::Orc
&& select.as_ref().is_some_and(SelectSpec::has_aggregates)
{
return Err(Error::PipelinePlanningError(
PipelinePlanningError::AggregatesNotSupportedForOrc,
));
}
let output_format = self
.display_output_format
.unwrap_or(DisplayOutputFormat::Csv);
let csv_stdout_headers = self.display_csv_headers.unwrap_or(true);
let input_path = input_path.to_string();
let csv_has_header = self.csv_has_header;
let sparse = self.sparse;
Ok(Pipeline::DataFrame(DataFramePipeline {
input_path,
input_file_type,
select,
slice: None,
csv_has_header,
sparse,
sink: DataFrameSink::Display {
output_format,
csv_stdout_headers,
},
}))
}
pub fn build(&mut self) -> Result<Pipeline> {
let input_path = self.read.as_ref().ok_or_else(|| {
Error::PipelinePlanningError(PipelinePlanningError::MissingRequiredStage(
"read(path)".to_string(),
))
})?;
if self.write.is_some() && (self.schema || self.display_row_count) {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::ConflictingOptions(
"schema and row count are not supported with write(path)".to_string(),
),
));
}
ensure_display_stage_exclusivity(
self.head,
self.tail,
self.sample,
self.schema,
self.display_row_count,
)?;
let select = self.select.clone();
if let Some(spec) = &select
&& spec.is_empty()
{
return Err(Error::PipelinePlanningError(
PipelinePlanningError::SelectEmpty,
));
}
if let Some(output_path) = &self.write {
self.build_write_pipeline(input_path, output_path, select)
} else if self.schema {
self.build_schema_display_pipeline(input_path, select)
} else if self.display_row_count {
self.build_row_count_display_pipeline(input_path, select)
} else if self.head.is_none()
&& self.tail.is_none()
&& self.sample.is_none()
&& select.as_ref().is_some_and(|spec| {
spec.is_aggregate_only() || (spec.has_group_by() && !spec.is_empty())
})
{
self.build_aggregate_display_pipeline(input_path, select)
} else {
self.build_display_pipeline(input_path, select)
}
}
}
enum UnifiedSink {
Write {
output_path: String,
output_file_type: FileType,
json_pretty: bool,
progress: Option<ProgressBar>,
},
Display {
output_format: DisplayOutputFormat,
csv_stdout_headers: bool,
},
Schema {
output_format: DisplayOutputFormat,
sparse: bool,
},
Count,
}
fn dispatch_pipeline(
input_path: String,
input_file_type: FileType,
select: Option<SelectSpec>,
slice: Option<DisplaySlice>,
sparse: bool,
csv_has_header: Option<bool>,
sink: UnifiedSink,
) -> Pipeline {
if input_file_type == FileType::Orc {
Pipeline::RecordBatch(RecordBatchPipeline {
input_path,
input_file_type,
select,
slice,
sparse,
sink: unified_to_record_batch_sink(sink),
})
} else {
Pipeline::DataFrame(DataFramePipeline {
input_path,
input_file_type,
select,
slice,
csv_has_header,
sparse,
sink: unified_to_dataframe_sink(sink),
})
}
}
fn unified_to_dataframe_sink(sink: UnifiedSink) -> DataFrameSink {
match sink {
UnifiedSink::Write {
output_path,
output_file_type,
json_pretty,
progress,
} => DataFrameSink::Write {
output_path,
output_file_type,
json_pretty,
progress,
},
UnifiedSink::Display {
output_format,
csv_stdout_headers,
} => DataFrameSink::Display {
output_format,
csv_stdout_headers,
},
UnifiedSink::Schema {
output_format,
sparse,
} => DataFrameSink::Schema {
output_format,
sparse,
},
UnifiedSink::Count => DataFrameSink::Count,
}
}
fn unified_to_record_batch_sink(sink: UnifiedSink) -> RecordBatchSink {
match sink {
UnifiedSink::Write {
output_path,
output_file_type,
json_pretty,
progress,
} => RecordBatchSink::Write {
output_path,
output_file_type,
json_pretty,
progress,
},
UnifiedSink::Display {
output_format,
csv_stdout_headers,
} => RecordBatchSink::Display {
output_format,
csv_stdout_headers,
},
UnifiedSink::Schema {
output_format,
sparse,
} => RecordBatchSink::Schema {
output_format,
sparse,
},
UnifiedSink::Count => RecordBatchSink::Count,
}
}
fn reject_orc_with_aggregates(
input_file_type: FileType,
select: &Option<SelectSpec>,
) -> Result<()> {
if input_file_type == FileType::Orc && select.as_ref().is_some_and(SelectSpec::has_aggregates) {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::AggregatesNotSupportedForOrc,
));
}
Ok(())
}
fn slice_from_head_tail_sample(
head: Option<usize>,
tail: Option<usize>,
sample: Option<usize>,
) -> Option<DisplaySlice> {
sample
.map(DisplaySlice::Sample)
.or_else(|| tail.map(DisplaySlice::Tail))
.or_else(|| head.map(DisplaySlice::Head))
}
fn ensure_at_most_one_of_head_tail_sample(
head: Option<usize>,
tail: Option<usize>,
sample: Option<usize>,
) -> Result<()> {
let slice_count =
usize::from(head.is_some()) + usize::from(tail.is_some()) + usize::from(sample.is_some());
if slice_count > 1 {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::ConflictingOptions(
"only one of head(n), tail(n), or sample(n) may be set".to_string(),
),
));
}
Ok(())
}
fn ensure_display_stage_exclusivity(
head: Option<usize>,
tail: Option<usize>,
sample: Option<usize>,
schema: bool,
display_row_count: bool,
) -> Result<()> {
ensure_at_most_one_of_head_tail_sample(head, tail, sample)?;
let has_slice = head.is_some() || tail.is_some() || sample.is_some();
let meta_stages = usize::from(schema) + usize::from(display_row_count);
if meta_stages > 1 {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::ConflictingOptions(
"only one of schema(), head(n), tail(n), sample(n), or row count may be set"
.to_string(),
),
));
}
if has_slice && meta_stages > 0 {
return Err(Error::PipelinePlanningError(
PipelinePlanningError::ConflictingOptions(
"schema and row count cannot be combined with head(n), tail(n), or sample(n)"
.to_string(),
),
));
}
Ok(())
}