use crate::file_groups::FileGroup;
use crate::{
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
file_compression_type::FileCompressionType, file_stream::FileStream,
source::DataSource, statistics::MinMaxStatistics,
};
use arrow::datatypes::FieldRef;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
Constraints, Result, ScalarValue, Statistics, internal_datafusion_err, internal_err,
};
use datafusion_execution::{
SendableRecordBatchStream, TaskContext, object_store::ObjectStoreUrl,
};
use datafusion_expr::Operator;
use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, split_conjunction};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::coop::cooperative;
use datafusion_physical_plan::execution_plan::SchedulingType;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType,
display::{ProjectSchemaDisplay, display_orderings},
filter_pushdown::FilterPushdownPropagation,
metrics::ExecutionPlanMetricsSet,
};
use log::{debug, warn};
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
#[derive(Clone)]
pub struct FileScanConfig {
pub object_store_url: ObjectStoreUrl,
pub file_groups: Vec<FileGroup>,
pub constraints: Constraints,
pub limit: Option<usize>,
pub preserve_order: bool,
pub output_ordering: Vec<LexOrdering>,
pub file_compression_type: FileCompressionType,
pub file_source: Arc<dyn FileSource>,
pub batch_size: Option<usize>,
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
pub(crate) statistics: Statistics,
pub partitioned_by_file_group: bool,
}
#[derive(Clone)]
pub struct FileScanConfigBuilder {
object_store_url: ObjectStoreUrl,
file_source: Arc<dyn FileSource>,
limit: Option<usize>,
preserve_order: bool,
constraints: Option<Constraints>,
file_groups: Vec<FileGroup>,
statistics: Option<Statistics>,
output_ordering: Vec<LexOrdering>,
file_compression_type: Option<FileCompressionType>,
batch_size: Option<usize>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
partitioned_by_file_group: bool,
}
impl FileScanConfigBuilder {
pub fn new(
object_store_url: ObjectStoreUrl,
file_source: Arc<dyn FileSource>,
) -> Self {
Self {
object_store_url,
file_source,
file_groups: vec![],
statistics: None,
output_ordering: vec![],
file_compression_type: None,
limit: None,
preserve_order: false,
constraints: None,
batch_size: None,
expr_adapter_factory: None,
partitioned_by_file_group: false,
}
}
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}
pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
self.preserve_order = order_sensitive;
self
}
pub fn with_source(mut self, file_source: Arc<dyn FileSource>) -> Self {
self.file_source = file_source;
self
}
pub fn table_schema(&self) -> &SchemaRef {
self.file_source.table_schema().table_schema()
}
#[deprecated(since = "51.0.0", note = "Use with_projection_indices instead")]
pub fn with_projection(self, indices: Option<Vec<usize>>) -> Self {
match self.clone().with_projection_indices(indices) {
Ok(builder) => builder,
Err(e) => {
warn!(
"Failed to push down projection in FileScanConfigBuilder::with_projection: {e}"
);
self
}
}
}
pub fn with_projection_indices(
mut self,
indices: Option<Vec<usize>>,
) -> Result<Self> {
let projection_exprs = indices.map(|indices| {
ProjectionExprs::from_indices(
&indices,
self.file_source.table_schema().table_schema(),
)
});
let Some(projection_exprs) = projection_exprs else {
return Ok(self);
};
let new_source = self
.file_source
.try_pushdown_projection(&projection_exprs)
.map_err(|e| {
internal_datafusion_err!(
"Failed to push down projection in FileScanConfigBuilder::build: {e}"
)
})?;
if let Some(new_source) = new_source {
self.file_source = new_source;
} else {
internal_err!(
"FileSource {} does not support projection pushdown",
self.file_source.file_type()
)?;
}
Ok(self)
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.constraints = Some(constraints);
self
}
pub fn with_statistics(mut self, statistics: Statistics) -> Self {
self.statistics = Some(statistics);
self
}
pub fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
self.file_groups = file_groups;
self
}
pub fn with_file_group(mut self, file_group: FileGroup) -> Self {
self.file_groups.push(file_group);
self
}
pub fn with_file(self, partitioned_file: PartitionedFile) -> Self {
self.with_file_group(FileGroup::new(vec![partitioned_file]))
}
pub fn with_output_ordering(mut self, output_ordering: Vec<LexOrdering>) -> Self {
self.output_ordering = output_ordering;
self
}
pub fn with_file_compression_type(
mut self,
file_compression_type: FileCompressionType,
) -> Self {
self.file_compression_type = Some(file_compression_type);
self
}
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_expr_adapter(
mut self,
expr_adapter: Option<Arc<dyn PhysicalExprAdapterFactory>>,
) -> Self {
self.expr_adapter_factory = expr_adapter;
self
}
pub fn with_partitioned_by_file_group(
mut self,
partitioned_by_file_group: bool,
) -> Self {
self.partitioned_by_file_group = partitioned_by_file_group;
self
}
pub fn build(self) -> FileScanConfig {
let Self {
object_store_url,
file_source,
limit,
preserve_order,
constraints,
file_groups,
statistics,
output_ordering,
file_compression_type,
batch_size,
expr_adapter_factory: expr_adapter,
partitioned_by_file_group,
} = self;
let constraints = constraints.unwrap_or_default();
let statistics = statistics.unwrap_or_else(|| {
Statistics::new_unknown(file_source.table_schema().table_schema())
});
let file_compression_type =
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
let preserve_order = preserve_order || !output_ordering.is_empty();
FileScanConfig {
object_store_url,
file_source,
limit,
preserve_order,
constraints,
file_groups,
output_ordering,
file_compression_type,
batch_size,
expr_adapter_factory: expr_adapter,
statistics,
partitioned_by_file_group,
}
}
}
impl From<FileScanConfig> for FileScanConfigBuilder {
fn from(config: FileScanConfig) -> Self {
Self {
object_store_url: config.object_store_url,
file_source: Arc::<dyn FileSource>::clone(&config.file_source),
file_groups: config.file_groups,
statistics: Some(config.statistics),
output_ordering: config.output_ordering,
file_compression_type: Some(config.file_compression_type),
limit: config.limit,
preserve_order: config.preserve_order,
constraints: Some(config.constraints),
batch_size: config.batch_size,
expr_adapter_factory: config.expr_adapter_factory,
partitioned_by_file_group: config.partitioned_by_file_group,
}
}
}
impl DataSource for FileScanConfig {
fn open(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context.runtime_env().object_store(&self.object_store_url)?;
let batch_size = self
.batch_size
.unwrap_or_else(|| context.session_config().batch_size());
let source = self.file_source.with_batch_size(batch_size);
let opener = source.create_file_opener(object_store, self, partition)?;
let stream = FileStream::new(self, partition, opener, source.metrics())?;
Ok(Box::pin(cooperative(stream)))
}
fn as_any(&self) -> &dyn Any {
self
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
let orderings = get_projected_output_ordering(self, &schema);
write!(f, "file_groups=")?;
FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
if !schema.fields().is_empty() {
if let Some(projection) = self.file_source.projection() {
let expr: Vec<String> = projection
.as_ref()
.iter()
.map(|proj_expr| {
if let Some(column) =
proj_expr.expr.as_any().downcast_ref::<Column>()
{
if column.name() == proj_expr.alias {
column.name().to_string()
} else {
format!(
"{} as {}",
proj_expr.expr, proj_expr.alias
)
}
} else {
format!("{} as {}", proj_expr.expr, proj_expr.alias)
}
})
.collect();
write!(f, ", projection=[{}]", expr.join(", "))?;
} else {
write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
}
}
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}
display_orderings(f, &orderings)?;
if !self.constraints.is_empty() {
write!(f, ", {}", self.constraints)?;
}
self.fmt_file_source(t, f)
}
DisplayFormatType::TreeRender => {
writeln!(f, "format={}", self.file_source.file_type())?;
self.file_source.fmt_extra(t, f)?;
let num_files = self.file_groups.iter().map(|fg| fg.len()).sum::<usize>();
writeln!(f, "files={num_files}")?;
Ok(())
}
}
}
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> Result<Option<Arc<dyn DataSource>>> {
if self.partitioned_by_file_group {
return Ok(None);
}
let source = self.file_source.repartitioned(
target_partitions,
repartition_file_min_size,
output_ordering,
self,
)?;
Ok(source.map(|s| Arc::new(s) as _))
}
fn output_partitioning(&self) -> Partitioning {
if self.partitioned_by_file_group {
let partition_cols = self.table_partition_cols();
if !partition_cols.is_empty() {
let projected_schema = match self.projected_schema() {
Ok(schema) => schema,
Err(_) => {
debug!(
"Could not get projected schema, falling back to UnknownPartitioning."
);
return Partitioning::UnknownPartitioning(self.file_groups.len());
}
};
let mut exprs: Vec<Arc<dyn PhysicalExpr>> = Vec::new();
for partition_col in partition_cols {
if let Some((idx, _)) = projected_schema
.fields()
.iter()
.enumerate()
.find(|(_, f)| f.name() == partition_col.name())
{
exprs.push(Arc::new(Column::new(partition_col.name(), idx)));
}
}
if exprs.len() == partition_cols.len() {
return Partitioning::Hash(exprs, self.file_groups.len());
}
}
}
Partitioning::UnknownPartitioning(self.file_groups.len())
}
fn eq_properties(&self) -> EquivalenceProperties {
let schema = self.file_source.table_schema().table_schema();
let mut eq_properties = EquivalenceProperties::new_with_orderings(
Arc::clone(schema),
self.validated_output_ordering(),
)
.with_constraints(self.constraints.clone());
if let Some(filter) = self.file_source.filter() {
match Self::add_filter_equivalence_info(&filter, &mut eq_properties, schema) {
Ok(()) => {}
Err(e) => {
warn!("Failed to add filter equivalence info: {e}");
#[cfg(debug_assertions)]
panic!("Failed to add filter equivalence info: {e}");
}
}
}
if let Some(projection) = self.file_source.projection() {
match (
projection.project_schema(schema),
projection.projection_mapping(schema),
) {
(Ok(output_schema), Ok(mapping)) => {
eq_properties =
eq_properties.project(&mapping, Arc::new(output_schema));
}
(Err(e), _) | (_, Err(e)) => {
warn!("Failed to project equivalence properties: {e}");
#[cfg(debug_assertions)]
panic!("Failed to project equivalence properties: {e}");
}
}
}
eq_properties
}
fn scheduling_type(&self) -> SchedulingType {
SchedulingType::Cooperative
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(partition) = partition {
if let Some(file_group) = self.file_groups.get(partition)
&& let Some(stat) = file_group.file_statistics(None)
{
let output_schema = self.projected_schema()?;
return if let Some(projection) = self.file_source.projection() {
projection.project_statistics(stat.clone(), &output_schema)
} else {
Ok(stat.clone())
};
}
Ok(Statistics::new_unknown(self.projected_schema()?.as_ref()))
} else {
let statistics = self.statistics();
let projection = self.file_source.projection();
let output_schema = self.projected_schema()?;
if let Some(projection) = &projection {
projection.project_statistics(statistics.clone(), &output_schema)
} else {
Ok(statistics)
}
}
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let source = FileScanConfigBuilder::from(self.clone())
.with_limit(limit)
.build();
Some(Arc::new(source))
}
fn fetch(&self) -> Option<usize> {
self.limit
}
fn metrics(&self) -> ExecutionPlanMetricsSet {
self.file_source.metrics().clone()
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn DataSource>>> {
match self.file_source.try_pushdown_projection(projection)? {
Some(new_source) => {
let mut new_file_scan_config = self.clone();
new_file_scan_config.file_source = new_source;
Ok(Some(Arc::new(new_file_scan_config) as Arc<dyn DataSource>))
}
None => Ok(None),
}
}
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
let table_schema = self.file_source.table_schema().table_schema();
let filters_to_remap = if let Some(projection) = self.file_source.projection() {
filters
.into_iter()
.map(|filter| projection.unproject_expr(&filter))
.collect::<Result<Vec<_>>>()?
} else {
filters
};
let remapped_filters = filters_to_remap
.into_iter()
.map(|filter| reassign_expr_columns(filter, table_schema))
.collect::<Result<Vec<_>>>()?;
let result = self
.file_source
.try_pushdown_filters(remapped_filters, config)?;
match result.updated_node {
Some(new_file_source) => {
let mut new_file_scan_config = self.clone();
new_file_scan_config.file_source = new_file_source;
Ok(FilterPushdownPropagation {
filters: result.filters,
updated_node: Some(Arc::new(new_file_scan_config) as _),
})
}
None => {
Ok(FilterPushdownPropagation {
filters: result.filters,
updated_node: None,
})
}
}
}
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
let pushdown_result = self
.file_source
.try_pushdown_sort(order, &self.eq_properties())?;
match pushdown_result {
SortOrderPushdownResult::Exact { inner } => {
Ok(SortOrderPushdownResult::Exact {
inner: self.rebuild_with_source(inner, true, order)?,
})
}
SortOrderPushdownResult::Inexact { inner } => {
Ok(SortOrderPushdownResult::Inexact {
inner: self.rebuild_with_source(inner, false, order)?,
})
}
SortOrderPushdownResult::Unsupported => {
Ok(SortOrderPushdownResult::Unsupported)
}
}
}
fn with_preserve_order(&self, preserve_order: bool) -> Option<Arc<dyn DataSource>> {
if self.preserve_order == preserve_order {
return Some(Arc::new(self.clone()));
}
let new_config = FileScanConfig {
preserve_order,
..self.clone()
};
Some(Arc::new(new_config))
}
}
impl FileScanConfig {
fn validated_output_ordering(&self) -> Vec<LexOrdering> {
let schema = self.file_source.table_schema().table_schema();
validate_orderings(&self.output_ordering, schema, &self.file_groups, None)
}
pub fn file_schema(&self) -> &SchemaRef {
self.file_source.table_schema().file_schema()
}
pub fn table_partition_cols(&self) -> &Vec<FieldRef> {
self.file_source.table_schema().table_partition_cols()
}
pub fn statistics(&self) -> Statistics {
if self.file_source.filter().is_some() {
self.statistics.clone().to_inexact()
} else {
self.statistics.clone()
}
}
pub fn projected_schema(&self) -> Result<Arc<Schema>> {
let schema = self.file_source.table_schema().table_schema();
match self.file_source.projection() {
Some(proj) => Ok(Arc::new(proj.project_schema(schema)?)),
None => Ok(Arc::clone(schema)),
}
}
fn add_filter_equivalence_info(
filter: &Arc<dyn PhysicalExpr>,
eq_properties: &mut EquivalenceProperties,
schema: &Schema,
) -> Result<()> {
let equal_pairs = split_conjunction(filter).into_iter().filter_map(|expr| {
reassign_expr_columns(Arc::clone(expr), schema)
.ok()
.and_then(|expr| match expr.as_any().downcast_ref::<BinaryExpr>() {
Some(expr) if expr.op() == &Operator::Eq => {
Some((Arc::clone(expr.left()), Arc::clone(expr.right())))
}
_ => None,
})
});
for (lhs, rhs) in equal_pairs {
eq_properties.add_equal_conditions(lhs, rhs)?
}
Ok(())
}
#[deprecated(
since = "52.0.0",
note = "newlines_in_values has moved to CsvSource. Access it via CsvSource::csv_options().newlines_in_values instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
)]
pub fn newlines_in_values(&self) -> bool {
false
}
#[deprecated(
since = "52.0.0",
note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
)]
pub fn projected_constraints(&self) -> Constraints {
let props = self.eq_properties();
props.constraints().clone()
}
#[deprecated(
since = "52.0.0",
note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
)]
pub fn file_column_projection_indices(&self) -> Option<Vec<usize>> {
#[expect(deprecated)]
self.file_source.projection().as_ref().map(|p| {
p.ordered_column_indices()
.into_iter()
.filter(|&i| i < self.file_schema().fields().len())
.collect::<Vec<_>>()
})
}
pub fn split_groups_by_statistics_with_target_partitions(
table_schema: &SchemaRef,
file_groups: &[FileGroup],
sort_order: &LexOrdering,
target_partitions: usize,
) -> Result<Vec<FileGroup>> {
if target_partitions == 0 {
return Err(internal_datafusion_err!(
"target_partitions must be greater than 0"
));
}
let flattened_files = file_groups
.iter()
.flat_map(FileGroup::iter)
.collect::<Vec<_>>();
if flattened_files.is_empty() {
return Ok(vec![]);
}
let statistics = MinMaxStatistics::new_from_files(
sort_order,
table_schema,
None,
flattened_files.iter().copied(),
)?;
let indices_sorted_by_min = statistics.min_values_sorted();
let mut file_groups_indices: Vec<Vec<usize>> = vec![vec![]; target_partitions];
for (idx, min) in indices_sorted_by_min {
if let Some((_, group)) = file_groups_indices
.iter_mut()
.enumerate()
.filter(|(_, group)| {
group.is_empty()
|| min
> statistics
.max(*group.last().expect("groups should not be empty"))
})
.min_by_key(|(_, group)| group.len())
{
group.push(idx);
} else {
file_groups_indices.push(vec![idx]);
}
}
file_groups_indices.retain(|group| !group.is_empty());
Ok(file_groups_indices
.into_iter()
.map(|file_group_indices| {
FileGroup::new(
file_group_indices
.into_iter()
.map(|idx| flattened_files[idx].clone())
.collect(),
)
})
.collect())
}
pub fn split_groups_by_statistics(
table_schema: &SchemaRef,
file_groups: &[FileGroup],
sort_order: &LexOrdering,
) -> Result<Vec<FileGroup>> {
let flattened_files = file_groups
.iter()
.flat_map(FileGroup::iter)
.collect::<Vec<_>>();
if flattened_files.is_empty() {
return Ok(vec![]);
}
let statistics = MinMaxStatistics::new_from_files(
sort_order,
table_schema,
None,
flattened_files.iter().copied(),
)
.map_err(|e| {
e.context("construct min/max statistics for split_groups_by_statistics")
})?;
let indices_sorted_by_min = statistics.min_values_sorted();
let mut file_groups_indices: Vec<Vec<usize>> = vec![];
for (idx, min) in indices_sorted_by_min {
let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
min > statistics.max(
*group
.last()
.expect("groups should be nonempty at construction"),
)
});
match file_group_to_insert {
Some(group) => group.push(idx),
None => file_groups_indices.push(vec![idx]),
}
}
Ok(file_groups_indices
.into_iter()
.map(|file_group_indices| {
file_group_indices
.into_iter()
.map(|idx| flattened_files[idx].clone())
.collect()
})
.collect())
}
fn fmt_file_source(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
write!(f, ", file_type={}", self.file_source.file_type())?;
self.file_source.fmt_extra(t, f)
}
pub fn file_source(&self) -> &Arc<dyn FileSource> {
&self.file_source
}
fn rebuild_with_source(
&self,
new_file_source: Arc<dyn FileSource>,
is_exact: bool,
order: &[PhysicalSortExpr],
) -> Result<Arc<dyn DataSource>> {
let mut new_config = self.clone();
let reverse_file_groups = if self.output_ordering.is_empty() {
false
} else if let Some(requested) = LexOrdering::new(order.iter().cloned()) {
let projected_schema = self.projected_schema()?;
let orderings = project_orderings(&self.output_ordering, &projected_schema);
orderings
.iter()
.any(|ordering| ordering.is_reverse(&requested))
} else {
false
};
if reverse_file_groups {
new_config.file_groups = new_config
.file_groups
.into_iter()
.map(|group| {
let mut files = group.into_inner();
files.reverse();
files.into()
})
.collect();
}
new_config.file_source = new_file_source;
if !is_exact {
new_config.output_ordering = vec![];
}
Ok(Arc::new(new_config))
}
}
impl Debug for FileScanConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(f, "FileScanConfig {{")?;
write!(f, "object_store_url={:?}, ", self.object_store_url)?;
write!(f, "statistics={:?}, ", self.statistics())?;
DisplayAs::fmt_as(self, DisplayFormatType::Verbose, f)?;
write!(f, "}}")
}
}
impl DisplayAs for FileScanConfig {
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> FmtResult {
let schema = self.projected_schema().map_err(|_| std::fmt::Error {})?;
let orderings = get_projected_output_ordering(self, &schema);
write!(f, "file_groups=")?;
FileGroupsDisplay(&self.file_groups).fmt_as(t, f)?;
if !schema.fields().is_empty() {
write!(f, ", projection={}", ProjectSchemaDisplay(&schema))?;
}
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}
display_orderings(f, &orderings)?;
if !self.constraints.is_empty() {
write!(f, ", {}", self.constraints)?;
}
Ok(())
}
}
fn ordered_column_indices_from_projection(
projection: &ProjectionExprs,
) -> Option<Vec<usize>> {
projection
.expr_iter()
.map(|e| {
let index = e.as_any().downcast_ref::<Column>()?.index();
Some(index)
})
.collect::<Option<Vec<usize>>>()
}
fn is_ordering_valid_for_file_groups(
file_groups: &[FileGroup],
ordering: &LexOrdering,
schema: &SchemaRef,
projection: Option<&[usize]>,
) -> bool {
file_groups.iter().all(|group| {
if group.len() <= 1 {
return true; }
match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
{
Ok(stats) => stats.is_sorted(),
Err(_) => false, }
})
}
fn validate_orderings(
orderings: &[LexOrdering],
schema: &SchemaRef,
file_groups: &[FileGroup],
projection: Option<&[usize]>,
) -> Vec<LexOrdering> {
orderings
.iter()
.filter(|ordering| {
is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
})
.cloned()
.collect()
}
fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
) -> Vec<LexOrdering> {
let projected_orderings =
project_orderings(&base_config.output_ordering, projected_schema);
let indices = base_config
.file_source
.projection()
.as_ref()
.map(|p| ordered_column_indices_from_projection(p));
match indices {
Some(Some(indices)) => {
validate_orderings(
&projected_orderings,
projected_schema,
&base_config.file_groups,
Some(indices.as_slice()),
)
}
None => {
validate_orderings(
&projected_orderings,
projected_schema,
&base_config.file_groups,
None,
)
}
Some(None) => {
if base_config.file_groups.iter().all(|g| g.len() <= 1) {
projected_orderings
} else {
debug!(
"Skipping specified output orderings. \
Some file groups couldn't be determined to be sorted: {:?}",
base_config.file_groups
);
vec![]
}
}
}
}
pub fn wrap_partition_type_in_dict(val_type: DataType) -> DataType {
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
}
pub fn wrap_partition_value_in_dict(val: ScalarValue) -> ScalarValue {
ScalarValue::Dictionary(Box::new(DataType::UInt16), Box::new(val))
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::*;
use crate::TableSchema;
use crate::test_util::col;
use crate::{
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
verify_sort_integrity,
};
use arrow::datatypes::Field;
use datafusion_common::stats::Precision;
use datafusion_common::{ColumnStatistics, internal_err};
use datafusion_expr::{Operator, SortExpr};
use datafusion_physical_expr::create_physical_sort_expr;
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use datafusion_physical_expr::projection::ProjectionExpr;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
#[derive(Clone)]
struct InexactSortPushdownSource {
metrics: ExecutionPlanMetricsSet,
table_schema: TableSchema,
}
impl InexactSortPushdownSource {
fn new(table_schema: TableSchema) -> Self {
Self {
metrics: ExecutionPlanMetricsSet::new(),
table_schema,
}
}
}
impl FileSource for InexactSortPushdownSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn object_store::ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
unimplemented!()
}
fn as_any(&self) -> &dyn Any {
self
}
fn table_schema(&self) -> &TableSchema {
&self.table_schema
}
fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
Arc::new(self.clone())
}
fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
fn file_type(&self) -> &str {
"mock"
}
fn try_pushdown_sort(
&self,
_order: &[PhysicalSortExpr],
_eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
})
}
}
#[test]
fn physical_plan_config_no_projection_tab_cols_as_field() {
let file_schema = aggr_test_schema();
let table_partition_col =
Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), true)
.with_metadata(HashMap::from_iter(vec![(
"key_whatever".to_owned(),
"value_whatever".to_owned(),
)]));
let conf = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![table_partition_col.clone()],
);
let proj_schema = conf.projected_schema().unwrap();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
*proj_schema.field(file_schema.fields().len()),
table_partition_col,
"partition columns are the last columns and ust have all values defined in created field"
);
}
#[test]
fn test_split_groups_by_statistics() -> Result<()> {
use chrono::TimeZone;
use datafusion_common::DFSchema;
use datafusion_expr::execution_props::ExecutionProps;
use object_store::{ObjectMeta, path::Path};
struct File {
name: &'static str,
date: &'static str,
statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
}
impl File {
fn new(
name: &'static str,
date: &'static str,
statistics: Vec<Option<(f64, f64)>>,
) -> Self {
Self::new_nullable(
name,
date,
statistics
.into_iter()
.map(|opt| opt.map(|(min, max)| (Some(min), Some(max))))
.collect(),
)
}
fn new_nullable(
name: &'static str,
date: &'static str,
statistics: Vec<Option<(Option<f64>, Option<f64>)>>,
) -> Self {
Self {
name,
date,
statistics,
}
}
}
struct TestCase {
name: &'static str,
file_schema: Schema,
files: Vec<File>,
sort: Vec<SortExpr>,
expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
}
use datafusion_expr::col;
let cases = vec![
TestCase {
name: "test sort",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "test sort with files ordered differently",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "reverse sort",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
],
sort: vec![col("value").sort(false, true)],
expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
},
TestCase {
name: "nullable sort columns, nulls last",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
true,
)]),
files: vec![
File::new_nullable(
"0",
"2023-01-01",
vec![Some((Some(0.00), Some(0.49)))],
),
File::new_nullable("1", "2023-01-01", vec![Some((Some(0.50), None))]),
File::new_nullable("2", "2023-01-02", vec![Some((Some(0.00), None))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "nullable sort columns, nulls first",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
true,
)]),
files: vec![
File::new_nullable("0", "2023-01-01", vec![Some((None, Some(0.49)))]),
File::new_nullable(
"1",
"2023-01-01",
vec![Some((Some(0.50), Some(1.00)))],
),
File::new_nullable("2", "2023-01-02", vec![Some((None, Some(1.00)))]),
],
sort: vec![col("value").sort(true, true)],
expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
},
TestCase {
name: "all three non-overlapping",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0", "1", "2"]]),
},
TestCase {
name: "all three overlapping",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
},
TestCase {
name: "empty input",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![],
sort: vec![col("value").sort(true, false)],
expected_result: Ok(vec![]),
},
TestCase {
name: "one file missing statistics",
file_schema: Schema::new(vec![Field::new(
"value".to_string(),
DataType::Float64,
false,
)]),
files: vec![
File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
File::new("2", "2023-01-02", vec![None]),
],
sort: vec![col("value").sort(true, false)],
expected_result: Err(
"construct min/max statistics for split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget min/max for column: 'value'\ncaused by\nError during planning: statistics not found",
),
},
];
for case in cases {
let table_schema = Arc::new(Schema::new(
case.file_schema
.fields()
.clone()
.into_iter()
.cloned()
.chain(Some(Arc::new(Field::new(
"date".to_string(),
DataType::Utf8,
false,
))))
.collect::<Vec<_>>(),
));
let Some(sort_order) = LexOrdering::new(
case.sort
.into_iter()
.map(|expr| {
create_physical_sort_expr(
&expr,
&DFSchema::try_from(Arc::clone(&table_schema))?,
&ExecutionProps::default(),
)
})
.collect::<Result<Vec<_>>>()?,
) else {
return internal_err!("This test should always use an ordering");
};
let partitioned_files = FileGroup::new(
case.files.into_iter().map(From::from).collect::<Vec<_>>(),
);
let result = FileScanConfig::split_groups_by_statistics(
&table_schema,
std::slice::from_ref(&partitioned_files),
&sort_order,
);
let results_by_name = result
.as_ref()
.map(|file_groups| {
file_groups
.iter()
.map(|file_group| {
file_group
.iter()
.map(|file| {
partitioned_files
.iter()
.find_map(|f| {
if f.object_meta == file.object_meta {
Some(
f.object_meta
.location
.as_ref()
.rsplit('/')
.next()
.unwrap()
.trim_end_matches(".parquet"),
)
} else {
None
}
})
.unwrap()
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
})
.map_err(|e| e.strip_backtrace().leak() as &'static str);
assert_eq!(results_by_name, case.expected_result, "{}", case.name);
}
return Ok(());
impl From<File> for PartitionedFile {
fn from(file: File) -> Self {
let object_meta = ObjectMeta {
location: Path::from(format!(
"data/date={}/{}.parquet",
file.date, file.name
)),
last_modified: chrono::Utc.timestamp_nanos(0),
size: 0,
e_tag: None,
version: None,
};
let statistics = Arc::new(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: file
.statistics
.into_iter()
.map(|stats| {
stats
.map(|(min, max)| ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Float64(
min,
)),
max_value: Precision::Exact(ScalarValue::Float64(
max,
)),
..Default::default()
})
.unwrap_or_default()
})
.collect::<Vec<_>>(),
});
PartitionedFile::new_from_meta(object_meta)
.with_partition_values(vec![ScalarValue::from(file.date)])
.with_statistics(statistics)
}
}
}
fn config_for_projection(
file_schema: SchemaRef,
projection: Option<Vec<usize>>,
statistics: Statistics,
table_partition_cols: Vec<Field>,
) -> FileScanConfig {
let table_schema = TableSchema::new(
file_schema,
table_partition_cols.into_iter().map(Arc::new).collect(),
);
FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(MockSource::new(table_schema.clone())),
)
.with_projection_indices(projection)
.unwrap()
.with_statistics(statistics)
.build()
}
#[test]
fn test_file_scan_config_builder() {
let file_schema = aggr_test_schema();
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
let table_schema = TableSchema::new(
Arc::clone(&file_schema),
vec![Arc::new(Field::new(
"date",
wrap_partition_type_in_dict(DataType::Utf8),
false,
))],
);
let file_source: Arc<dyn FileSource> =
Arc::new(MockSource::new(table_schema.clone()));
let builder = FileScanConfigBuilder::new(
object_store_url.clone(),
Arc::clone(&file_source),
);
let config = builder
.with_limit(Some(1000))
.with_projection_indices(Some(vec![0, 1]))
.unwrap()
.with_statistics(Statistics::new_unknown(&file_schema))
.with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
"test.parquet".to_string(),
1024,
)])])
.with_output_ordering(vec![
[PhysicalSortExpr::new_default(Arc::new(Column::new(
"date", 0,
)))]
.into(),
])
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build();
assert_eq!(config.object_store_url, object_store_url);
assert_eq!(*config.file_schema(), file_schema);
assert_eq!(config.limit, Some(1000));
assert_eq!(
config
.file_source
.projection()
.as_ref()
.map(|p| p.column_indices()),
Some(vec![0, 1])
);
assert_eq!(config.table_partition_cols().len(), 1);
assert_eq!(config.table_partition_cols()[0].name(), "date");
assert_eq!(config.file_groups.len(), 1);
assert_eq!(config.file_groups[0].len(), 1);
assert_eq!(
config.file_groups[0][0].object_meta.location.as_ref(),
"test.parquet"
);
assert_eq!(
config.file_compression_type,
FileCompressionType::UNCOMPRESSED
);
assert_eq!(config.output_ordering.len(), 1);
}
#[test]
fn equivalence_properties_after_schema_change() {
let file_schema = aggr_test_schema();
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
let file_source: Arc<dyn FileSource> = Arc::new(
MockSource::new(table_schema.clone()).with_filter(Arc::new(BinaryExpr::new(
col("c2", &file_schema).unwrap(),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
))),
);
let config = FileScanConfigBuilder::new(
object_store_url.clone(),
Arc::clone(&file_source),
)
.with_projection_indices(Some(vec![0, 1, 2]))
.unwrap()
.build();
let exprs = ProjectionExprs::new(vec![ProjectionExpr::new(
col("c1", &file_schema).unwrap(),
"c1",
)]);
let data_source = config
.try_swapping_with_projection(&exprs)
.unwrap()
.unwrap();
let eq_properties = data_source.eq_properties();
let eq_group = eq_properties.eq_group();
for class in eq_group.iter() {
for expr in class.iter() {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
assert_ne!(
col.name(),
"c2",
"c2 should not be present in any equivalence class"
);
}
}
}
}
#[test]
fn test_file_scan_config_builder_defaults() {
let file_schema = aggr_test_schema();
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
let file_source: Arc<dyn FileSource> =
Arc::new(MockSource::new(table_schema.clone()));
let config = FileScanConfigBuilder::new(
object_store_url.clone(),
Arc::clone(&file_source),
)
.build();
assert_eq!(config.object_store_url, object_store_url);
assert_eq!(*config.file_schema(), file_schema);
assert_eq!(config.limit, None);
let expected_projection: Vec<usize> = (0..file_schema.fields().len()).collect();
assert_eq!(
config
.file_source
.projection()
.as_ref()
.map(|p| p.column_indices()),
Some(expected_projection)
);
assert!(config.table_partition_cols().is_empty());
assert!(config.file_groups.is_empty());
assert_eq!(
config.file_compression_type,
FileCompressionType::UNCOMPRESSED
);
assert!(config.output_ordering.is_empty());
assert!(config.constraints.is_empty());
assert_eq!(config.statistics().num_rows, Precision::Absent);
assert_eq!(config.statistics().total_byte_size, Precision::Absent);
assert_eq!(
config.statistics().column_statistics.len(),
file_schema.fields().len()
);
for stat in config.statistics().column_statistics {
assert_eq!(stat.distinct_count, Precision::Absent);
assert_eq!(stat.min_value, Precision::Absent);
assert_eq!(stat.max_value, Precision::Absent);
assert_eq!(stat.null_count, Precision::Absent);
}
}
#[test]
fn test_file_scan_config_builder_new_from() {
let schema = aggr_test_schema();
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
let partition_cols = vec![Field::new(
"date",
wrap_partition_type_in_dict(DataType::Utf8),
false,
)];
let file = PartitionedFile::new("test_file.parquet", 100);
let table_schema = TableSchema::new(
Arc::clone(&schema),
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
);
let file_source: Arc<dyn FileSource> =
Arc::new(MockSource::new(table_schema.clone()));
let original_config = FileScanConfigBuilder::new(
object_store_url.clone(),
Arc::clone(&file_source),
)
.with_projection_indices(Some(vec![0, 2]))
.unwrap()
.with_limit(Some(10))
.with_file(file.clone())
.with_constraints(Constraints::default())
.build();
let new_builder = FileScanConfigBuilder::from(original_config);
let new_config = new_builder.build();
let partition_cols = partition_cols.into_iter().map(Arc::new).collect::<Vec<_>>();
assert_eq!(new_config.object_store_url, object_store_url);
assert_eq!(*new_config.file_schema(), schema);
assert_eq!(
new_config
.file_source
.projection()
.as_ref()
.map(|p| p.column_indices()),
Some(vec![0, 2])
);
assert_eq!(new_config.limit, Some(10));
assert_eq!(*new_config.table_partition_cols(), partition_cols);
assert_eq!(new_config.file_groups.len(), 1);
assert_eq!(new_config.file_groups[0].len(), 1);
assert_eq!(
new_config.file_groups[0][0].object_meta.location.as_ref(),
"test_file.parquet"
);
assert_eq!(new_config.constraints, Constraints::default());
}
#[test]
fn test_split_groups_by_statistics_with_target_partitions() -> Result<()> {
use datafusion_common::DFSchema;
use datafusion_expr::{col, execution_props::ExecutionProps};
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Float64,
false,
)]));
let exec_props = ExecutionProps::new();
let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?;
let sort_expr = [col("value").sort(true, false)];
let sort_ordering = sort_expr
.map(|expr| {
create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap()
})
.into();
struct TestCase {
name: String,
file_count: usize,
overlap_factor: f64,
target_partitions: usize,
expected_partition_count: usize,
}
let test_cases = vec![
TestCase {
name: "no_overlap_10_files_4_partitions".to_string(),
file_count: 10,
overlap_factor: 0.0,
target_partitions: 4,
expected_partition_count: 4,
},
TestCase {
name: "medium_overlap_20_files_5_partitions".to_string(),
file_count: 20,
overlap_factor: 0.5,
target_partitions: 5,
expected_partition_count: 5,
},
TestCase {
name: "high_overlap_30_files_3_partitions".to_string(),
file_count: 30,
overlap_factor: 0.8,
target_partitions: 3,
expected_partition_count: 7,
},
TestCase {
name: "fewer_files_than_partitions".to_string(),
file_count: 3,
overlap_factor: 0.0,
target_partitions: 10,
expected_partition_count: 3, },
TestCase {
name: "single_file".to_string(),
file_count: 1,
overlap_factor: 0.0,
target_partitions: 5,
expected_partition_count: 1, },
TestCase {
name: "empty_files".to_string(),
file_count: 0,
overlap_factor: 0.0,
target_partitions: 3,
expected_partition_count: 0, },
];
for case in test_cases {
println!("Running test case: {}", case.name);
let file_groups = generate_test_files(case.file_count, case.overlap_factor);
let result =
FileScanConfig::split_groups_by_statistics_with_target_partitions(
&schema,
&file_groups,
&sort_ordering,
case.target_partitions,
)?;
println!(
"Created {} partitions (target was {})",
result.len(),
case.target_partitions
);
assert_eq!(
result.len(),
case.expected_partition_count,
"Case '{}': Unexpected partition count",
case.name
);
assert!(
verify_sort_integrity(&result),
"Case '{}': Files within partitions are not properly ordered",
case.name
);
if case.file_count > 1 && case.expected_partition_count > 1 {
let group_sizes: Vec<usize> = result.iter().map(FileGroup::len).collect();
let max_size = *group_sizes.iter().max().unwrap();
let min_size = *group_sizes.iter().min().unwrap();
let avg_files_per_partition =
case.file_count as f64 / case.expected_partition_count as f64;
assert!(
(max_size as f64) < 2.0 * avg_files_per_partition,
"Case '{}': Unbalanced distribution. Max partition size {} exceeds twice the average {}",
case.name,
max_size,
avg_files_per_partition
);
println!("Distribution - min files: {min_size}, max files: {max_size}");
}
}
let empty_groups: Vec<FileGroup> = vec![];
let err = FileScanConfig::split_groups_by_statistics_with_target_partitions(
&schema,
&empty_groups,
&sort_ordering,
0,
)
.unwrap_err();
assert!(
err.to_string()
.contains("target_partitions must be greater than 0"),
"Expected error for zero target partitions"
);
Ok(())
}
#[test]
fn test_partition_statistics_projection() {
use crate::source::DataSourceExec;
use datafusion_physical_plan::ExecutionPlan;
let schema = Arc::new(Schema::new(vec![
Field::new("col0", DataType::Int32, false),
Field::new("col1", DataType::Int32, false),
Field::new("col2", DataType::Int32, false),
Field::new("col3", DataType::Int32, false),
]));
let file_group_stats = Statistics {
num_rows: Precision::Exact(100),
total_byte_size: Precision::Exact(1024),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
..ColumnStatistics::new_unknown()
},
ColumnStatistics {
null_count: Precision::Exact(5),
..ColumnStatistics::new_unknown()
},
ColumnStatistics {
null_count: Precision::Exact(10),
..ColumnStatistics::new_unknown()
},
ColumnStatistics {
null_count: Precision::Exact(15),
..ColumnStatistics::new_unknown()
},
],
};
let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
.with_statistics(Arc::new(file_group_stats));
let table_schema = TableSchema::new(Arc::clone(&schema), vec![]);
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::new(MockSource::new(table_schema.clone())),
)
.with_projection_indices(Some(vec![0, 2]))
.unwrap() .with_file_groups(vec![file_group])
.build();
let exec = DataSourceExec::from_data_source(config);
let partition_stats = exec.partition_statistics(Some(0)).unwrap();
assert_eq!(
partition_stats.column_statistics.len(),
2,
"Expected 2 column statistics (projected), but got {}",
partition_stats.column_statistics.len()
);
assert_eq!(
partition_stats.column_statistics[0].null_count,
Precision::Exact(0),
"First projected column should be col0 with 0 nulls"
);
assert_eq!(
partition_stats.column_statistics[1].null_count,
Precision::Exact(10),
"Second projected column should be col2 with 10 nulls"
);
assert_eq!(partition_stats.num_rows, Precision::Exact(100));
assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
}
#[test]
fn test_output_partitioning_not_partitioned_by_file_group() {
let file_schema = aggr_test_schema();
let partition_col =
Field::new("date", wrap_partition_type_in_dict(DataType::Utf8), false);
let config = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![partition_col],
);
let partitioning = config.output_partitioning();
assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
}
#[test]
fn test_output_partitioning_no_partition_columns() {
let file_schema = aggr_test_schema();
let mut config = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
vec![], );
config.partitioned_by_file_group = true;
let partitioning = config.output_partitioning();
assert!(matches!(partitioning, Partitioning::UnknownPartitioning(_)));
}
#[test]
fn test_output_partitioning_with_partition_columns() {
let file_schema = aggr_test_schema();
let single_partition_col = vec![Field::new(
"date",
wrap_partition_type_in_dict(DataType::Utf8),
false,
)];
let mut config = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
single_partition_col,
);
config.partitioned_by_file_group = true;
config.file_groups = vec![
FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
FileGroup::new(vec![PartitionedFile::new("f3.parquet".to_string(), 1024)]),
];
let partitioning = config.output_partitioning();
match partitioning {
Partitioning::Hash(exprs, num_partitions) => {
assert_eq!(num_partitions, 3);
assert_eq!(exprs.len(), 1);
assert_eq!(
exprs[0].as_any().downcast_ref::<Column>().unwrap().name(),
"date"
);
}
_ => panic!("Expected Hash partitioning"),
}
let multiple_partition_cols = vec![
Field::new("year", wrap_partition_type_in_dict(DataType::Utf8), false),
Field::new("month", wrap_partition_type_in_dict(DataType::Utf8), false),
];
config = config_for_projection(
Arc::clone(&file_schema),
None,
Statistics::new_unknown(&file_schema),
multiple_partition_cols,
);
config.partitioned_by_file_group = true;
config.file_groups = vec![
FileGroup::new(vec![PartitionedFile::new("f1.parquet".to_string(), 1024)]),
FileGroup::new(vec![PartitionedFile::new("f2.parquet".to_string(), 1024)]),
];
let partitioning = config.output_partitioning();
match partitioning {
Partitioning::Hash(exprs, num_partitions) => {
assert_eq!(num_partitions, 2);
assert_eq!(exprs.len(), 2);
let col_names: Vec<_> = exprs
.iter()
.map(|e| e.as_any().downcast_ref::<Column>().unwrap().name())
.collect();
assert_eq!(col_names, vec!["year", "month"]);
}
_ => panic!("Expected Hash partitioning"),
}
}
#[test]
fn try_pushdown_sort_reverses_file_groups_only_when_requested_is_reverse()
-> Result<()> {
let file_schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
let table_schema = TableSchema::new(Arc::clone(&file_schema), vec![]);
let file_source = Arc::new(InexactSortPushdownSource::new(table_schema));
let file_groups = vec![FileGroup::new(vec![
PartitionedFile::new("file1", 1),
PartitionedFile::new("file2", 1),
])];
let sort_expr_asc = PhysicalSortExpr::new_default(Arc::new(Column::new("a", 0)));
let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_groups(file_groups)
.with_output_ordering(vec![
LexOrdering::new(vec![sort_expr_asc.clone()]).unwrap(),
])
.build();
let requested_asc = vec![sort_expr_asc.clone()];
let result = config.try_pushdown_sort(&requested_asc)?;
let SortOrderPushdownResult::Inexact { inner } = result else {
panic!("Expected Inexact result");
};
let pushed_config = inner
.as_any()
.downcast_ref::<FileScanConfig>()
.expect("Expected FileScanConfig");
let pushed_files = pushed_config.file_groups[0].files();
assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file1");
assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file2");
let requested_desc = vec![sort_expr_asc.reverse()];
let result = config.try_pushdown_sort(&requested_desc)?;
let SortOrderPushdownResult::Inexact { inner } = result else {
panic!("Expected Inexact result");
};
let pushed_config = inner
.as_any()
.downcast_ref::<FileScanConfig>()
.expect("Expected FileScanConfig");
let pushed_files = pushed_config.file_groups[0].files();
assert_eq!(pushed_files[0].object_meta.location.as_ref(), "file2");
assert_eq!(pushed_files[1].object_meta.location.as_ref(), "file1");
Ok(())
}
}