use std::{
any::Any, borrow::Cow, collections::HashMap, fmt::Debug, fmt::Formatter,
fmt::Result as FmtResult, marker::PhantomData, sync::Arc,
};
use crate::file_groups::FileGroup;
#[allow(unused_imports)]
use crate::schema_adapter::SchemaAdapterFactory;
use crate::{
display::FileGroupsDisplay,
file::FileSource,
file_compression_type::FileCompressionType,
file_stream::FileStream,
source::{DataSource, DataSourceExec},
statistics::MinMaxStatistics,
PartitionedFile,
};
use arrow::{
array::{
ArrayData, ArrayRef, BufferBuilder, DictionaryArray, RecordBatch,
RecordBatchOptions,
},
buffer::Buffer,
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
};
use datafusion_common::{
config::ConfigOptions, exec_err, ColumnStatistics, Constraints, Result, Statistics,
};
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::{
expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
PhysicalSortExpr,
};
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
metrics::ExecutionPlanMetricsSet,
projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
DisplayAs, DisplayFormatType, ExecutionPlan,
};
use log::{debug, warn};
#[derive(Clone)]
pub struct FileScanConfig {
pub object_store_url: ObjectStoreUrl,
pub file_schema: SchemaRef,
pub file_groups: Vec<FileGroup>,
pub constraints: Constraints,
pub projection: Option<Vec<usize>>,
pub limit: Option<usize>,
pub table_partition_cols: Vec<Field>,
pub output_ordering: Vec<LexOrdering>,
pub file_compression_type: FileCompressionType,
pub new_lines_in_values: bool,
pub file_source: Arc<dyn FileSource>,
pub batch_size: Option<usize>,
}
#[derive(Clone)]
pub struct FileScanConfigBuilder {
object_store_url: ObjectStoreUrl,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
limit: Option<usize>,
projection: Option<Vec<usize>>,
table_partition_cols: Vec<Field>,
constraints: Option<Constraints>,
file_groups: Vec<FileGroup>,
statistics: Option<Statistics>,
output_ordering: Vec<LexOrdering>,
file_compression_type: Option<FileCompressionType>,
new_lines_in_values: Option<bool>,
batch_size: Option<usize>,
}
impl FileScanConfigBuilder {
pub fn new(
object_store_url: ObjectStoreUrl,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
) -> Self {
Self {
object_store_url,
file_schema,
file_source,
file_groups: vec![],
statistics: None,
output_ordering: vec![],
file_compression_type: None,
new_lines_in_values: None,
limit: None,
projection: None,
table_partition_cols: vec![],
constraints: None,
batch_size: None,
}
}
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
self.file_source = file_source;
self
}
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = Some(constraints);
self
}
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = Some(statistics);
self
}
pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
self.file_groups = file_groups;
self
}
pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
self.file_groups.push(file_group);
self
}
pub fn with_file(self, file: PartitionedFile) -> Self {
self.with_file_group(FileGroup::new(vec![file]))
}
pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
self.output_ordering = output_ordering;
self
}
pub fn with_file_compression_type(
mut self,
file_compression_type: FileCompressionType,
) -> Self {
self.file_compression_type = Some(file_compression_type);
self
}
pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
self.new_lines_in_values = Some(new_lines_in_values);
self
}
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
self.batch_size = batch_size;
self
}
pub fn build(self) -> FileScanConfig {
let Self {
object_store_url,
file_schema,
file_source,
limit,
projection,
table_partition_cols,
constraints,
file_groups,
statistics,
output_ordering,
file_compression_type,
new_lines_in_values,
batch_size,
} = self;
let constraints = constraints.unwrap_or_default();
let statistics =
statistics.unwrap_or_else(|| Statistics::new_unknown(&file_schema));
let file_source = file_source
.with_statistics(statistics.clone())
.with_schema(Arc::clone(&file_schema));
let file_compression_type =
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
let new_lines_in_values = new_lines_in_values.unwrap_or(false);
FileScanConfig {
object_store_url,
file_schema,
file_source,
limit,
projection,
table_partition_cols,
constraints,
file_groups,
output_ordering,
file_compression_type,
new_lines_in_values,
batch_size,
}
}
}
impl From<FileScanConfig> for FileScanConfigBuilder {
fn from(config: FileScanConfig) -> Self {
Self {
object_store_url: config.object_store_url,
file_schema: config.file_schema,
file_source: Arc::<dyn FileSource>::clone(&config.file_source),
file_groups: config.file_groups,
statistics: config.file_source.statistics().ok(),
output_ordering: config.output_ordering,
file_compression_type: Some(config.file_compression_type),
new_lines_in_values: Some(config.new_lines_in_values),
limit: config.limit,
projection: config.projection,
table_partition_cols: config.table_partition_cols,
constraints: Some(config.constraints),
batch_size: config.batch_size,
}
}
}
impl DataSource for FileScanConfig {
fn open(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context.runtime_env().object_store(&self.object_store_url)?;
let batch_size = self
.batch_size
.unwrap_or_else(|| context.session_config().batch_size());
let source = self
.file_source
.with_batch_size(batch_size)
.with_projection(self);
let opener = source.create_file_opener(object_store, self, partition);
let stream = FileStream::new(self, partition, opener, source.metrics())?;
Ok(Box::pin(stream))
}
fn as_any(&self) -> &dyn Any {
self
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let schema = self.projected_schema();
let orderings = get_projected_output_ordering(self, &schema);
write!(f, "file_groups=")?;
FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
if !schema.fields().is_empty() {
write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
}
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}
display_orderings(f, &orderings)?;
if !self.constraints.is_empty() {
write!(f, ", {}", self.constraints)?;
}
self.fmt_file_source(t, f)
}
DisplayFormatType::TreeRender => {
writeln!(f, "format={}", self.file_source.file_type())?;
self.file_source.fmt_extra(t, f)?;
let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
writeln!(f, "files={num_files}")?;
Ok(())
}
}
}
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
let source = self.file_source.repartitioned(
target_partitions,
repartition_file_min_size,
output_ordering,
self,
)?;
Ok(source.map(|s| Arc::new(s) as _))
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.file_groups.len())
}
fn eq_properties(&self) -> EquivalenceProperties {
let (schema, constraints, _, orderings) = self.project();
EquivalenceProperties::new_with_orderings(schema, orderings.as_slice())
.with_constraints(constraints)
}
fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_stats())
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let source = FileScanConfigBuilder::from(self.clone())
.with_limit(limit)
.build();
Some(Arc::new(source))
}
fn fetch(&self) -> Option<usize> {
self.limit
}
fn metrics(&self) -> ExecutionPlanMetricsSet {
self.file_source.metrics().clone()
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let partitioned_columns_in_proj = projection.expr().iter().any(|(expr, _)| {
expr.as_any()
.downcast_ref::<Column>()
.map(|expr| expr.index() >= self.file_schema.fields().len())
.unwrap_or(false)
});
let no_aliases = all_alias_free_columns(projection.expr());
Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
let file_scan = self.clone();
let source = Arc::clone(&file_scan.file_source);
let new_projections = new_projections_for_columns(
projection,
&file_scan
.projection
.clone()
.unwrap_or_else(|| (0..self.file_schema.fields().len()).collect()),
);
DataSourceExec::from_data_source(
FileScanConfigBuilder::from(file_scan)
.with_projection(Some(new_projections))
.with_source(source)
.build(),
) as _
}))
}
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
let result = self.file_source.try_pushdown_filters(filters, config)?;
match result.updated_node {
Some(new_file_source) => {
let file_scan_config = FileScanConfigBuilder::from(self.clone())
.with_source(new_file_source)
.build();
Ok(FilterPushdownPropagation {
filters: result.filters,
updated_node: Some(Arc::new(file_scan_config) as _),
})
}
None => {
Ok(FilterPushdownPropagation {
filters: result.filters,
updated_node: None,
})
}
}
}
}
impl FileScanConfig {
#[allow(deprecated)] pub fn new(
object_store_url: ObjectStoreUrl,
file_schema: SchemaRef,
file_source: Arc<dyn FileSource>,
) -> Self {
let statistics = Statistics::new_unknown(&file_schema);
let file_source = file_source
.with_statistics(statistics.clone())
.with_schema(Arc::clone(&file_schema));
Self {
object_store_url,
file_schema,
file_groups: vec![],
constraints: Constraints::empty(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
new_lines_in_values: false,
file_source: Arc::clone(&file_source),
batch_size: None,
}
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
self.file_source =
file_source.with_statistics(Statistics::new_unknown(&self.file_schema));
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = constraints;
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.file_source = self.file_source.with_statistics(statistics);
self
}
fn projection_indices(&self) -> Vec<usize> {
match &self.projection {
Some(proj) => proj.clone(),
None => (0..self.file_schema.fields().len()
+ self.table_partition_cols.len())
.collect(),
}
}
pub fn projected_stats(&self) -> Statistics {
let statistics = self.file_source.statistics().unwrap();
let table_cols_stats = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
statistics.column_statistics[idx].clone()
} else {
ColumnStatistics::new_unknown()
}
})
.collect();
Statistics {
num_rows: statistics.num_rows,
total_byte_size: statistics.total_byte_size,
column_statistics: table_cols_stats,
}
}
pub fn projected_schema(&self) -> Arc<Schema> {
let table_fields: Vec<_> = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
self.file_schema.field(idx).clone()
} else {
let partition_idx = idx - self.file_schema.fields().len();
self.table_partition_cols[partition_idx].clone()
}
})
.collect();
Arc::new(Schema::new_with_metadata(
table_fields,
self.file_schema.metadata().clone(),
))
}
pub fn projected_constraints(&self) -> Constraints {
let indexes = self.projection_indices();
self.constraints
.project(&indexes)
.unwrap_or_else(Constraints::empty)
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
#[allow(deprecated)]
pub fn with_file(self, file: PartitionedFile) -> Self {
self.with_file_group(FileGroup::new(vec![file]))
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_file_groups(mut self, mut file_groups: Vec<FileGroup>) -> Self {
self.file_groups.append(&mut file_groups);
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
self.file_groups.push(file_group);
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_table_partition_cols(mut self, table_partition_cols: Vec<Field>) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
self.output_ordering = output_ordering;
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_file_compression_type(
mut self,
file_compression_type: FileCompressionType,
) -> Self {
self.file_compression_type = file_compression_type;
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
self.new_lines_in_values = new_lines_in_values;
self
}
#[deprecated(since = "47.0.0", note = "use FileScanConfigBuilder instead")]
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
self.batch_size = batch_size;
self
}
pub fn newlines_in_values(&self) -> bool {
self.new_lines_in_values
}
pub fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
return (
Arc::clone(&self.file_schema),
self.constraints.clone(),
self.file_source.statistics().unwrap().clone(),
self.output_ordering.clone(),
);
}
let schema = self.projected_schema();
let constraints = self.projected_constraints();
let stats = self.projected_stats();
let output_ordering = get_projected_output_ordering(self, &schema);
(schema, constraints, stats, output_ordering)
}
pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
self.projection.as_ref().map(|p| {
p.iter()
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
.map(|col_idx| self.file_schema.field(*col_idx).name())
.cloned()
.collect()
})
}
pub fn projected_file_schema(&self) -> SchemaRef {
let fields = self.file_column_projection_indices().map(|indices| {
indices
.iter()
.map(|col_idx| self.file_schema.field(*col_idx))
.cloned()
.collect::<Vec<_>>()
});
fields.map_or_else(
|| Arc::clone(&self.file_schema),
|f| {
Arc::new(Schema::new_with_metadata(
f,
self.file_schema.metadata.clone(),
))
},
)
}
pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
self.projection.as_ref().map(|p| {
p.iter()
.filter(|col_idx| **col_idx < self.file_schema.fields().len())
.copied()
.collect()
})
}
pub fn split_groups_by_statistics_with_target_partitions(
table_schema: &SchemaRef,
file_groups: &[FileGroup],
sort_order: &LexOrdering,
target_partitions: usize,
) -> Result<Vec<FileGroup>> {
if target_partitions == 0 {
return Err(DataFusionError::Internal(
"target_partitions must be greater than 0".to_string(),
));
}
let flattened_files = file_groups
.iter()
.flat_map(FileGroup::iter)
.collect::<Vec<_>>();
if flattened_files.is_empty() {
return Ok(vec![]);
}
let statistics = MinMaxStatistics::new_from_files(
sort_order,
table_schema,
None,
flattened_files.iter().copied(),
)?;
let indices_sorted_by_min = statistics.min_values_sorted();
let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
for (idx, min) in indices_sorted_by_min {
if let Some((_, group)) = file_groups_indices
.iter_mut()
.enumerate()
.filter(|(_, group)| {
group.is_empty()
|| min
> statistics
.max(*group.last().expect("groups should not be empty"))
})
.min_by_key(|(_, group)| group.len())
{
group.push(idx);
} else {
file_groups_indices.push(vec![idx]);
}
}
file_groups_indices.retain(|group| !group.is_empty());
Ok(file_groups_indices
.into_iter()
.map(|file_group_indices| {
FileGroup::new(
file_group_indices
.into_iter()
.map(|idx| flattened_files[idx].clone())
.collect(),
)
})
.collect())
}
pub fn split_groups_by_statistics(
table_schema: &SchemaRef,
file_groups: &[FileGroup],
sort_order: &LexOrdering,
) -> Result<Vec<FileGroup>> {
let flattened_files = file_groups
.iter()
.flat_map(FileGroup::iter)
.collect::<Vec<_>>();
if flattened_files.is_empty() {
return Ok(vec![]);
}
let statistics = MinMaxStatistics::new_from_files(
sort_order,
table_schema,
None,
flattened_files.iter().copied(),
)
.map_err(|e| {
e.context("construct min/max statistics for split_groups_by_statistics")
})?;
let indices_sorted_by_min = statistics.min_values_sorted();
let mut file_groups_indices: Vec<Vec<usize>> = vec![];
for (idx, min) in indices_sorted_by_min {
let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
min > statistics.max(
*group
.last()
.expect("groups should be nonempty at construction"),
)
});
match file_group_to_insert {
Some(group) => group.push(idx),
None => file_groups_indices.push(vec![idx]),
}
}
Ok(file_groups_indices
.into_iter()
.map(|file_group_indices| {
file_group_indices
.into_iter()
.map(|idx| flattened_files[idx].clone())
.collect()
})
.collect())
}
#[deprecated(since = "47.0.0", note = "use DataSourceExec::new instead")]
pub fn build(self) -> Arc<DataSourceExec> {
DataSourceExec::from_data_source(self)
}
fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
write!(f, ", file_type={}", self.file_source.file_type())?;
self.file_source.fmt_extra(t, f)
}
pub fn file_source(&self) -> &Arc<dyn FileSource> {
&self.file_source
}
}
impl Debug for FileScanConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "FileScanConfig {{")?;
write!(f, "object_store_url={:?}, ", self.object_store_url)?;
write!(
f,
"statistics={:?}, ",
self.file_source.statistics().unwrap()
)?;
DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
write!(f, "}}")
}
}
impl DisplayAs for FileScanConfig {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
let schema = self.projected_schema();
let orderings = get_projected_output_ordering(self, &schema);
write!(f, "file_groups=")?;
FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
if !schema.fields().is_empty() {
write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
}
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}
display_orderings(f, &orderings)?;
if !self.constraints.is_empty() {
write!(f, ", {}", self.constraints)?;
}
Ok(())
}
}
pub struct PartitionColumnProjector {
key_buffer_cache: ZeroBufferGenerators,
projected_partition_indexes: Vec<(usize, usize)>,
projected_schema: SchemaRef,
}
impl PartitionColumnProjector {
pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self {
let mut idx_map = HashMap::new();
for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() {
if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
idx_map.insert(partition_idx, schema_idx);
}
}
let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect();
projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
Self {
projected_partition_indexes,
key_buffer_cache: Default::default(),
projected_schema,
}
}
pub fn project(
&mut self,
file_batch: RecordBatch,
partition_values: &[ScalarValue],
) -> Result<RecordBatch> {
let expected_cols =
self.projected_schema.fields().len() - self.projected_partition_indexes.len();
if file_batch.columns().len() != expected_cols {
return exec_err!(
"Unexpected batch schema from file, expected {} cols but got {}",
expected_cols,
file_batch.columns().len()
);
}
let mut cols = file_batch.columns().to_vec();
for &(pidx, sidx) in &self.projected_partition_indexes {
let p_value =
partition_values
.get(pidx)
.ok_or(DataFusionError::Execution(
"Invalid partitioning found on disk".to_string(),
))?;
let mut partition_value = Cow::Borrowed(p_value);
let field = self.projected_schema.field(sidx);
let expected_data_type = field.data_type();
let actual_data_type = partition_value.data_type();
if let DataType::Dictionary(key_type, _) = expected_data_type {
if !matches!(actual_data_type, DataType::Dictionary(_, _)) {
warn!("Partition value for column {} was not dictionary-encoded, applied auto-fix.", field.name());
partition_value = Cow::Owned(ScalarValue::Dictionary(
key_type.clone(),
Box::new(partition_value.as_ref().clone()),
));
}
}
cols.insert(
sidx,
create_output_array(
&mut self.key_buffer_cache,
partition_value.as_ref(),
file_batch.num_rows(),
)?,
)
}
RecordBatch::try_new_with_options(
Arc::clone(&self.projected_schema),
cols,
&RecordBatchOptions::new().with_row_count(Some(file_batch.num_rows())),
)
.map_err(Into::into)
}
}
#[derive(Debug, Default)]
struct ZeroBufferGenerators {
gen_i8: ZeroBufferGenerator<i8>,
gen_i16: ZeroBufferGenerator<i16>,
gen_i32: ZeroBufferGenerator<i32>,
gen_i64: ZeroBufferGenerator<i64>,
gen_u8: ZeroBufferGenerator<u8>,
gen_u16: ZeroBufferGenerator<u16>,
gen_u32: ZeroBufferGenerator<u32>,
gen_u64: ZeroBufferGenerator<u64>,
}
#[derive(Debug, Default)]
struct ZeroBufferGenerator<T>
where
T: ArrowNativeType,
{
cache: Option<Buffer>,
_t: PhantomData<T>,
}
impl<T> ZeroBufferGenerator<T>
where
T: ArrowNativeType,
{
const SIZE: usize = size_of::<T>();
fn get_buffer(&mut self, n_vals: usize) -> Buffer {
match &mut self.cache {
Some(buf) if buf.len() >= n_vals * Self::SIZE => {
buf.slice_with_length(0, n_vals * Self::SIZE)
}
_ => {
let mut key_buffer_builder = BufferBuilder::<T>::new(n_vals);
key_buffer_builder.advance(n_vals); self.cache.insert(key_buffer_builder.finish()).clone()
}
}
}
}
fn create_dict_array<T>(
buffer_gen: &mut ZeroBufferGenerator<T>,
dict_val: &ScalarValue,
len: usize,
data_type: DataType,
) -> Result<ArrayRef>
where
T: ArrowNativeType,
{
let dict_vals = dict_val.to_array()?;
let sliced_key_buffer = buffer_gen.get_buffer(len);
let mut builder = ArrayData::builder(data_type)
.len(len)
.add_buffer(sliced_key_buffer);
builder = builder.add_child_data(dict_vals.to_data());
Ok(Arc::new(DictionaryArray::<UInt16Type>::from(
builder.build().unwrap(),
)))
}
fn create_output_array(
key_buffer_cache: &mut ZeroBufferGenerators,
val: &ScalarValue,
len: usize,
) -> Result<ArrayRef> {
if let ScalarValue::Dictionary(key_type, dict_val) = &val {
match key_type.as_ref() {
DataType::Int8 => {
return create_dict_array(
&mut key_buffer_cache.gen_i8,
dict_val,
len,
val.data_type(),
);
}
DataType::Int16 => {
return create_dict_array(
&mut key_buffer_cache.gen_i16,
dict_val,
len,
val.data_type(),
);
}
DataType::Int32 => {
return create_dict_array(
&mut key_buffer_cache.gen_i32,
dict_val,
len,
val.data_type(),
);
}
DataType::Int64 => {
return create_dict_array(
&mut key_buffer_cache.gen_i64,
dict_val,
len,
val.data_type(),
);
}
DataType::UInt8 => {
return create_dict_array(
&mut key_buffer_cache.gen_u8,
dict_val,
len,
val.data_type(),
);
}
DataType::UInt16 => {
return create_dict_array(
&mut key_buffer_cache.gen_u16,
dict_val,
len,
val.data_type(),
);
}
DataType::UInt32 => {
return create_dict_array(
&mut key_buffer_cache.gen_u32,
dict_val,
len,
val.data_type(),
);
}
DataType::UInt64 => {
return create_dict_array(
&mut key_buffer_cache.gen_u64,
dict_val,
len,
val.data_type(),
);
}
_ => {}
}
}
val.to_array_of_size(len)
}
fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
) -> Vec<LexOrdering> {
let mut all_orderings = vec![];
for output_ordering in &base_config.output_ordering {
let mut new_ordering = LexOrdering::default();
for PhysicalSortExpr { expr, options } in output_ordering.iter() {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let name = col.name();
if let Some((idx, _)) = projected_schema.column_with_name(name) {
new_ordering.push(PhysicalSortExpr {
expr: Arc::new(Column::new(name, idx)),
options: *options,
});
continue;
}
}
break;
}
if new_ordering.is_empty() {
continue;
}
if base_config.file_groups.iter().any(|group| {
if group.len() <= 1 {
return false;
}
let statistics = match MinMaxStatistics::new_from_files(
&new_ordering,
projected_schema,
base_config.projection.as_deref(),
group.iter(),
) {
Ok(statistics) => statistics,
Err(e) => {
log::trace!("Error fetching statistics for file group: {e}");
return true;
}
};
!statistics.is_sorted()
}) {
debug!(
"Skipping specified output ordering {:?}. \
Some file groups couldn't be determined to be sorted: {:?}",
base_config.output_ordering[0], base_config.file_groups
);
continue;
}
all_orderings.push(new_ordering);
}
all_orderings
}
pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
}
pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
}
#[cfg(test)]
mod tests {
use crate::{
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
verify_sort_integrity,
};
use super::*;
use arrow::{
array::{Int32Array, RecordBatch},
compute::SortOptions,
};
use datafusion_common::stats::Precision;
use datafusion_common::{assert_batches_eq, DFSchema};
use datafusion_expr::{execution_props::ExecutionProps, SortExpr};
use datafusion_physical_expr::create_physical_expr;
use std::collections::HashMap;
fn create_physical_sort_expr(
e: &SortExpr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
let SortExpr {
expr,
asc,
nulls_first,
} = e;
Ok(PhysicalSortExpr {
expr: create_physical_expr(expr, input_dfschema, execution_props)?,
options: SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},
})
}
pub fn columns(schema: &Schema) -> Vec<String> {
schema.fields().iter().map(|f| f.name().clone()).collect()
}
#[test]
fn physical_plan_config_no_projection() {
let file_schema = aggr_test_schema();
let conf = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
to_partition_cols(vec![(
"date".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
)]),
);
let (proj_schema, _, proj_statistics, _) = conf.project();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
proj_schema.field(file_schema.fields().len()).name(),
"date",
"partition columns are the last columns"
);
assert_eq!(
proj_statistics.column_statistics.len(),
file_schema.fields().len() + 1
);
let col_names = conf.projected_file_column_names();
assert_eq!(col_names, None);
let col_indices = conf.file_column_projection_indices();
assert_eq!(col_indices, None);
}
#[test]
fn physical_plan_config_no_projection_tab_cols_as_field() {
let file_schema = aggr_test_schema();
let table_partition_col =
Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
.with_metadata(HashMap::from_iter(vec![(
"key_whatever".to_owned(),
"value_whatever".to_owned(),
)]));
let conf = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![table_partition_col.clone()],
);
let proj_schema = conf.projected_schema();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
*proj_schema.field(file_schema.fields().len()),
table_partition_col,
"partition columns are the last columns and ust have all values defined in created field"
);
}
#[test]
fn physical_plan_config_with_projection() {
let file_schema = aggr_test_schema();
let conf = config_for_projection(
Arc::clone(&file_schema),
Some(vec![file_schema.fields().len(), 0]),
Statistics {
num_rows: Precision::Inexact(10),
column_statistics: (0..file_schema.fields().len())
.map(|i| ColumnStatistics {
distinct_count: Precision::Inexact(i),
..Default::default()
})
.collect(),
total_byte_size: Precision::Absent,
},
to_partition_cols(vec![(
"date".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
)]),
);
let (proj_schema, _, proj_statistics, _) = conf.project();
assert_eq!(
columns(&proj_schema),
vec!["date".to_owned(), "c1".to_owned()]
);
let proj_stat_cols = proj_statistics.column_statistics;
assert_eq!(proj_stat_cols.len(), 2);
assert_eq!(proj_stat_cols[1].distinct_count, Precision::Inexact(0));
let col_names = conf.projected_file_column_names();
assert_eq!(col_names, Some(vec!["c1".to_owned()]));
let col_indices = conf.file_column_projection_indices();
assert_eq!(col_indices, Some(vec![0]));
}
#[test]
fn partition_column_projector() {
let file_batch = build_table_i32(
("a", &vec![0, 1, 2]),
("b", &vec![-2, -1, 0]),
("c", &vec![10, 11, 12]),
);
let partition_cols = vec![
(
"year".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
(
"month".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
(
"day".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
];
let statistics = Statistics {
num_rows: Precision::Inexact(3),
total_byte_size: Precision::Absent,
column_statistics: Statistics::unknown_column(&file_batch.schema()),
};
let conf = config_for_projection(
file_batch.schema(),
Some(vec![
0,
1,
2,
file_batch.schema().fields().len(),
file_batch.schema().fields().len() + 2,
]),
statistics.clone(),
to_partition_cols(partition_cols.clone()),
);
let source_statistics = conf.file_source.statistics().unwrap();
let conf_stats = conf.statistics().unwrap();
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
assert_eq!(conf_stats.column_statistics.len(), 5);
assert_eq!(source_statistics, statistics);
assert_eq!(source_statistics.column_statistics.len(), 3);
let proj_schema = conf.projected_schema();
let mut proj = PartitionColumnProjector::new(
proj_schema,
&partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
);
let projected_batch = proj
.project(
file_batch,
&[
wrap_partition_value_in_dict(ScalarValue::from("2021")),
wrap_partition_value_in_dict(ScalarValue::from("10")),
wrap_partition_value_in_dict(ScalarValue::from("26")),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = [
"+---+----+----+------+-----+",
"| a | b | c | year | day |",
"+---+----+----+------+-----+",
"| 0 | -2 | 10 | 2021 | 26 |",
"| 1 | -1 | 11 | 2021 | 26 |",
"| 2 | 0 | 12 | 2021 | 26 |",
"+---+----+----+------+-----+",
];
assert_batches_eq!(expected, &[projected_batch]);
let file_batch = build_table_i32(
("a", &vec![5, 6, 7, 8, 9]),
("b", &vec![-10, -9, -8, -7, -6]),
("c", &vec![12, 13, 14, 15, 16]),
);
let projected_batch = proj
.project(
file_batch,
&[
wrap_partition_value_in_dict(ScalarValue::from("2021")),
wrap_partition_value_in_dict(ScalarValue::from("10")),
wrap_partition_value_in_dict(ScalarValue::from("27")),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = [
"+---+-----+----+------+-----+",
"| a | b | c | year | day |",
"+---+-----+----+------+-----+",
"| 5 | -10 | 12 | 2021 | 27 |",
"| 6 | -9 | 13 | 2021 | 27 |",
"| 7 | -8 | 14 | 2021 | 27 |",
"| 8 | -7 | 15 | 2021 | 27 |",
"| 9 | -6 | 16 | 2021 | 27 |",
"+---+-----+----+------+-----+",
];
assert_batches_eq!(expected, &[projected_batch]);
let file_batch = build_table_i32(
("a", &vec![0, 1, 3]),
("b", &vec![2, 3, 4]),
("c", &vec![4, 5, 6]),
);
let projected_batch = proj
.project(
file_batch,
&[
wrap_partition_value_in_dict(ScalarValue::from("2021")),
wrap_partition_value_in_dict(ScalarValue::from("10")),
wrap_partition_value_in_dict(ScalarValue::from("28")),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = [
"+---+---+---+------+-----+",
"| a | b | c | year | day |",
"+---+---+---+------+-----+",
"| 0 | 2 | 4 | 2021 | 28 |",
"| 1 | 3 | 5 | 2021 | 28 |",
"| 3 | 4 | 6 | 2021 | 28 |",
"+---+---+---+------+-----+",
];
assert_batches_eq!(expected, &[projected_batch]);
let file_batch = build_table_i32(
("a", &vec![0, 1, 2]),
("b", &vec![-2, -1, 0]),
("c", &vec![10, 11, 12]),
);
let projected_batch = proj
.project(
file_batch,
&[
ScalarValue::from("2021"),
ScalarValue::from("10"),
ScalarValue::from("26"),
],
)
.expect("Projection of partition columns into record batch failed");
let expected = [
"+---+----+----+------+-----+",
"| a | b | c | year | day |",
"+---+----+----+------+-----+",
"| 0 | -2 | 10 | 2021 | 26 |",
"| 1 | -1 | 11 | 2021 | 26 |",
"| 2 | 0 | 12 | 2021 | 26 |",
"+---+----+----+------+-----+",
];
assert_batches_eq!(expected, &[projected_batch]);
}
#[test]
fn test_projected_file_schema_with_partition_col() {
let schema = aggr_test_schema();
let partition_cols = vec![
(
"part1".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
(
"part2".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
];
let projection = config_for_projection(
schema.clone(),
Some(vec![0, 3, 5, schema.fields().len()]),
Statistics::new_unknown(&schema),
to_partition_cols(partition_cols),
)
.projected_file_schema();
let expected_columns = vec!["c1", "c4", "c6"];
let actual_columns = projection
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<Vec<_>>();
assert_eq!(expected_columns, actual_columns);
}
#[test]
fn test_projected_file_schema_without_projection() {
let schema = aggr_test_schema();
let partition_cols = vec![
(
"part1".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
(
"part2".to_owned(),
wrap_partition_type_in_dict(DataType::Utf8),
),
];
let projection = config_for_projection(
schema.clone(),
None,
Statistics::new_unknown(&schema),
to_partition_cols(partition_cols),
)
.projected_file_schema();
assert_eq!(projection.fields(), schema.fields());
}
#[test]
fn test_split_groups_by_statistics() -> Result<()> {
use chrono::TimeZone;
use datafusion_common::DFSchema;
use datafusion_expr::execution_props::ExecutionProps;
use object_store::{path::Path, ObjectMeta};
struct File {
name: &'static str,
date: &'static str,
statistics: Vec<Option<(f64, f64)>>,
}
impl File {
fn new(
name: &'static str,
date: &'static str,
statistics: Vec<Option<(f64, f64)>>,
) -> Self {
Self {
name,
date,
statistics,
}
}
}
struct TestCase {
name: &'static str,
file_schema: Schema,
files: Vec<File>,
sort: Vec<SortExpr>,
expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
}
use datafusion_expr::col;
let cases = vec![
TestCase {
name: "test sort",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "test sort with files ordered differently",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "reverse sort",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(false, true)],
expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
},
TestCase {
name: "no nullable sort columns",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
true, )]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column")
},
TestCase {
name: "all three non-overlapping",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1", "2"]]),
},
TestCase {
name: "all three overlapping",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
},
TestCase {
name: "empty input",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![]),
},
TestCase {
name: "one file missing statistics",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![None]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found"),
},
];
for case in cases {
let table_schema = Arc::new(Schema::new(
case.file_schema
.fields()
.clone()
.into_iter()
.cloned()
.chain(Some(Arc::new(Field::new(
"date".to_string(),
DataType::Utf8,
false,
))))
.collect::<Vec<_>>(),
));
let sort_order = LexOrdering::from(
case.sort
.into_iter()
.map(|expr| {
create_physical_sort_expr(
&expr,
&DFSchema::try_from(table_schema.as_ref().clone())?,
&ExecutionProps::default(),
)
})
.collect::<Result<Vec<_>>>()?,
);
let partitioned_files = FileGroup::new(
case.files.into_iter().map(From::from).collect::<Vec<_>>(),
);
let result = FileScanConfig::split_groups_by_statistics(
&table_schema,
&[partitioned_files.clone()],
&sort_order,
);
let results_by_name = result
.as_ref()
.map(|file_groups| {
file_groups
.iter()
.map(|file_group| {
file_group
.iter()
.map(|file| {
partitioned_files
.iter()
.find_map(|f| {
if f.object_meta == file.object_meta {
Some(
f.object_meta
.location
.as_ref()
.rsplit('/')
.next()
.unwrap()
.trim_end_matches(".parquet"),
)
} else {
None
}
})
.unwrap()
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
})
.map_err(|e| e.strip_backtrace().leak() as &'static str);
assert_eq!(results_by_name, case.expected_result, "{}", case.name);
}
return Ok(());
impl From<File> for PartitionedFile {
fn from(file: File) -> Self {
PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(format!(
"data/date={}/{}.parquet",
file.date, file.name
)),
last_modified: chrono::Utc.timestamp_nanos(0),
size: 0,
e_tag: None,
version: None,
},
partition_values: vec![ScalarValue::from(file.date)],
range: None,
statistics: Some(Arc::new(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: file
.statistics
.into_iter()
.map(|stats| {
stats
.map(|(min, max)| ColumnStatistics {
min_value: Precision::Exact(ScalarValue::from(
min,
)),
max_value: Precision::Exact(ScalarValue::from(
max,
)),
..Default::default()
})
.unwrap_or_default()
})
.collect::<Vec<_>>(),
})),
extensions: None,
metadata_size_hint: None,
}
}
}
}
fn config_for_projection(
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
statistics: Statistics,
table_partition_cols: Vec<Field>,
) -> FileScanConfig {
FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
file_schema,
Arc::new(MockSource::default()),
)
.with_projection(projection)
.with_statistics(statistics)
.with_table_partition_cols(table_partition_cols)
.build()
}
fn to_partition_cols(table_partition_cols: Vec<(String, DataType)>) -> Vec<Field> {
table_partition_cols
.iter()
.map(|(name, dtype)| Field::new(name, dtype.clone(), false))
.collect::<Vec<_>>()
}
pub fn build_table_i32(
a: (&str, &Vec<i32>),
b: (&str, &Vec<i32>),
c: (&str, &Vec<i32>),
) -> RecordBatch {
let schema = Schema::new(vec![
Field::new(a.0, DataType::Int32, false),
Field::new(b.0, DataType::Int32, false),
Field::new(c.0, DataType::Int32, false),
]);
RecordBatch::try_new(
Arc::new(schema),
vec![
Arc::new(Int32Array::from(a.1.clone())),
Arc::new(Int32Array::from(b.1.clone())),
Arc::new(Int32Array::from(c.1.clone())),
],
)
.unwrap()
}
#[test]
fn test_file_scan_config_builder() {
let file_schema = aggr_test_schema();
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
let builder = FileScanConfigBuilder::new(
object_store_url.clone(),
Arc::clone(&file_schema),
Arc::clone(&file_source),
);
let config = builder
.with_limit(Some(1000))
.with_projection(Some(vec![0, 1]))
.with_table_partition_cols(vec![Field::new(
"date",
wrap_partition_type_in_dict(DataType::Utf8),
false,
)])
.with_constraints(Constraints::empty())
.with_statistics(Statistics::new_unknown(&file_schema))
.with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
"test.parquet".to_string(),
1024,
)])])
.with_output_ordering(vec![LexOrdering::default()])
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.with_newlines_in_values(true)
.build();
assert_eq!(config.object_store_url, object_store_url);
assert_eq!(config.file_schema, file_schema);
assert_eq!(config.limit, Some(1000));
assert_eq!(config.projection, Some(vec![0, 1]));
assert_eq!(config.table_partition_cols.len(), 1);
assert_eq!(config.table_partition_cols[0].name(), "date");
assert_eq!(config.file_groups.len(), 1);
assert_eq!(config.file_groups[0].len(), 1);
assert_eq!(
config.file_groups[0][0].object_meta.location.as_ref(),
"test.parquet"
);
assert_eq!(
config.file_compression_type,
FileCompressionType::UNCOMPRESSED
);
assert!(config.new_lines_in_values);
assert_eq!(config.output_ordering.len(), 1);
}
#[test]
fn test_file_scan_config_builder_defaults() {
let file_schema = aggr_test_schema();
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
let config = FileScanConfigBuilder::new(
object_store_url.clone(),
Arc::clone(&file_schema),
Arc::clone(&file_source),
)
.build();
assert_eq!(config.object_store_url, object_store_url);
assert_eq!(config.file_schema, file_schema);
assert_eq!(config.limit, None);
assert_eq!(config.projection, None);
assert!(config.table_partition_cols.is_empty());
assert!(config.file_groups.is_empty());
assert_eq!(
config.file_compression_type,
FileCompressionType::UNCOMPRESSED
);
assert!(!config.new_lines_in_values);
assert!(config.output_ordering.is_empty());
assert!(config.constraints.is_empty());
assert_eq!(
config.file_source.statistics().unwrap().num_rows,
Precision::Absent
);
assert_eq!(
config.file_source.statistics().unwrap().total_byte_size,
Precision::Absent
);
assert_eq!(
config
.file_source
.statistics()
.unwrap()
.column_statistics
.len(),
file_schema.fields().len()
);
for stat in config.file_source.statistics().unwrap().column_statistics {
assert_eq!(stat.distinct_count, Precision::Absent);
assert_eq!(stat.min_value, Precision::Absent);
assert_eq!(stat.max_value, Precision::Absent);
assert_eq!(stat.null_count, Precision::Absent);
}
}
#[test]
fn test_file_scan_config_builder_new_from() {
let schema = aggr_test_schema();
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
let file_source: Arc<dyn FileSource> = Arc::new(MockSource::default());
let partition_cols = vec![Field::new(
"date",
wrap_partition_type_in_dict(DataType::Utf8),
false,
)];
let file = PartitionedFile::new("test_file.parquet", 100);
let original_config = FileScanConfigBuilder::new(
object_store_url.clone(),
Arc::clone(&schema),
Arc::clone(&file_source),
)
.with_projection(Some(vec![0, 2]))
.with_limit(Some(10))
.with_table_partition_cols(partition_cols.clone())
.with_file(file.clone())
.with_constraints(Constraints::default())
.with_newlines_in_values(true)
.build();
let new_builder = FileScanConfigBuilder::from(original_config);
let new_config = new_builder.build();
assert_eq!(new_config.object_store_url, object_store_url);
assert_eq!(new_config.file_schema, schema);
assert_eq!(new_config.projection, Some(vec![0, 2]));
assert_eq!(new_config.limit, Some(10));
assert_eq!(new_config.table_partition_cols, partition_cols);
assert_eq!(new_config.file_groups.len(), 1);
assert_eq!(new_config.file_groups[0].len(), 1);
assert_eq!(
new_config.file_groups[0][0].object_meta.location.as_ref(),
"test_file.parquet"
);
assert_eq!(new_config.constraints, Constraints::default());
assert!(new_config.new_lines_in_values);
}
#[test]
fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
use datafusion_common::DFSchema;
use datafusion_expr::{col, execution_props::ExecutionProps};
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Float64,
false,
)]));
let exec_props = ExecutionProps::new();
let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
let sort_expr = [col("value").sort(true, false)];
let physical_sort_exprs: Vec<_> = sort_expr
.iter()
.map(|expr| create_physical_sort_expr(expr, &df_schema, &exec_props).unwrap())
.collect();
let sort_ordering = LexOrdering::from(physical_sort_exprs);
struct TestCase {
name: String,
file_count: usize,
overlap_factor: f64,
target_partitions: usize,
expected_partition_count: usize,
}
let test_cases = vec![
TestCase {
name: "no_overlap_10_files_4_partitions".to_string(),
file_count: 10,
overlap_factor: 0.0,
target_partitions: 4,
expected_partition_count: 4,
},
TestCase {
name: "medium_overlap_20_files_5_partitions".to_string(),
file_count: 20,
overlap_factor: 0.5,
target_partitions: 5,
expected_partition_count: 5,
},
TestCase {
name: "high_overlap_30_files_3_partitions".to_string(),
file_count: 30,
overlap_factor: 0.8,
target_partitions: 3,
expected_partition_count: 7,
},
TestCase {
name: "fewer_files_than_partitions".to_string(),
file_count: 3,
overlap_factor: 0.0,
target_partitions: 10,
expected_partition_count: 3, },
TestCase {
name: "single_file".to_string(),
file_count: 1,
overlap_factor: 0.0,
target_partitions: 5,
expected_partition_count: 1, },
TestCase {
name: "empty_files".to_string(),
file_count: 0,
overlap_factor: 0.0,
target_partitions: 3,
expected_partition_count: 0, },
];
for case in test_cases {
println!("Running test case: {}", case.name);
let file_groups = generate_test_files(case.file_count, case.overlap_factor);
let result =
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&schema,
&file_groups,
&sort_ordering,
case.target_partitions,
)?;
println!(
"Created {} partitions (target was {})",
result.len(),
case.target_partitions
);
assert_eq!(
result.len(),
case.expected_partition_count,
"Case '{}': Unexpected partition count",
case.name
);
assert!(
verify_sort_integrity(&result),
"Case '{}': Files within partitions are not properly ordered",
case.name
);
if case.file_count > 1 && case.expected_partition_count > 1 {
let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
let max_size = *group_sizes.iter().max().unwrap();
let min_size = *group_sizes.iter().min().unwrap();
let avg_files_per_partition =
case.file_count as f64 / case.expected_partition_count as f64;
assert!(
(max_size as f64) < 2.0 * avg_files_per_partition,
"Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
case.name,
max_size,
avg_files_per_partition
);
println!("Distribution - min files: {min_size}, max files: {max_size}");
}
}
let empty_groups: Vec<FileGroup> = vec![];
let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
&schema,
&empty_groups,
&sort_ordering,
0,
)
.unwrap_err();
assert!(
err.to_string()
.contains("target_partitions must be greater than 0"),
"Expected error for zero target partitions"
);
Ok(())
}
}