use super::FileScanConfig;
use crate::file::FileSource;
use crate::file_groups::FileGroup;
use crate::source::DataSource;
use crate::statistics::MinMaxStatistics;
use arrow::datatypes::SchemaRef;
use datafusion_common::Result;
use datafusion_common::stats::Precision;
use datafusion_physical_expr::equivalence::project_orderings;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::SortOrderPushdownResult;
use log::debug;
use std::sync::Arc;
pub(crate) struct SortedFileGroups {
file_groups: Vec<FileGroup>,
any_reordered: bool,
all_non_overlapping: bool,
}
impl FileScanConfig {
pub(crate) fn rebuild_with_source(
&self,
new_file_source: Arc<dyn FileSource>,
is_exact: bool,
order: &[PhysicalSortExpr],
) -> Result<FileScanConfig> {
let mut new_config = self.clone();
let reverse_file_groups = if self.output_ordering.is_empty() {
false
} else if let Some(requested) = LexOrdering::new(order.iter().cloned()) {
let projected_schema = self.projected_schema()?;
let orderings = project_orderings(&self.output_ordering, &projected_schema);
orderings
.iter()
.any(|ordering| ordering.is_reverse(&requested))
} else {
false
};
if reverse_file_groups {
new_config.file_groups = new_config
.file_groups
.into_iter()
.map(|group| {
let mut files = group.into_inner();
files.reverse();
files.into()
})
.collect();
}
new_config.file_source = new_file_source;
let all_non_overlapping = if !reverse_file_groups {
if let Some(sort_order) = LexOrdering::new(order.iter().cloned()) {
let projected_schema = new_config.projected_schema()?;
let projection_indices = new_config
.file_source
.projection()
.as_ref()
.and_then(|p| ordered_column_indices_from_projection(p));
let result = sort_files_within_groups_by_statistics(
&new_config.file_groups,
&sort_order,
&projected_schema,
projection_indices.as_deref(),
);
new_config.file_groups = result.file_groups;
result.all_non_overlapping
} else {
false
}
} else {
false
};
let keep_ordering = match (all_non_overlapping, is_exact) {
(false, _) => false,
(true, true) => true,
(true, false) => {
let projected_schema = new_config.projected_schema()?;
let projection_indices = new_config
.file_source
.projection()
.as_ref()
.and_then(|p| ordered_column_indices_from_projection(p));
if any_file_has_nulls_in_sort_columns(
&new_config.file_groups,
order,
&projected_schema,
projection_indices.as_deref(),
) {
false
} else {
let new_eq_props = new_config.eq_properties();
new_eq_props.ordering_satisfy(order.iter().cloned())?
}
}
};
if !keep_ordering {
new_config.output_ordering = vec![];
}
Ok(new_config)
}
pub(crate) fn try_sort_file_groups_by_statistics(
&self,
order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn DataSource>>> {
let Some(sort_order) = LexOrdering::new(order.iter().cloned()) else {
return Ok(SortOrderPushdownResult::Unsupported);
};
let projected_schema = self.projected_schema()?;
let projection_indices = self
.file_source
.projection()
.as_ref()
.and_then(|p| ordered_column_indices_from_projection(p));
let result = sort_files_within_groups_by_statistics(
&self.file_groups,
&sort_order,
&projected_schema,
projection_indices.as_deref(),
);
if !result.any_reordered {
return Ok(SortOrderPushdownResult::Unsupported);
}
let mut new_config = self.clone();
new_config.file_groups = result.file_groups;
if result.all_non_overlapping
&& !self.output_ordering.is_empty()
&& !any_file_has_nulls_in_sort_columns(
&new_config.file_groups,
order,
&projected_schema,
projection_indices.as_deref(),
)
{
let new_eq_props = new_config.eq_properties();
if new_eq_props.ordering_satisfy(order.iter().cloned())? {
return Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(new_config),
});
}
}
new_config.output_ordering = vec![];
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_config),
})
}
}
pub(crate) fn sort_files_within_groups_by_statistics(
file_groups: &[FileGroup],
sort_order: &LexOrdering,
projected_schema: &SchemaRef,
projection_indices: Option<&[usize]>,
) -> SortedFileGroups {
let mut any_reordered = false;
let mut confirmed_non_overlapping: usize = 0;
let mut new_groups = Vec::with_capacity(file_groups.len());
for group in file_groups {
if group.len() <= 1 {
new_groups.push(group.clone());
confirmed_non_overlapping += 1;
continue;
}
let files: Vec<_> = group.iter().collect();
let statistics = match MinMaxStatistics::new_from_files(
sort_order,
projected_schema,
projection_indices,
files.iter().copied(),
) {
Ok(stats) => stats,
Err(e) => {
log::trace!(
"Cannot sort file group by statistics: {e}. Keeping original order."
);
new_groups.push(group.clone());
continue;
}
};
let sorted_indices = statistics.min_values_sorted();
let already_sorted = sorted_indices
.iter()
.enumerate()
.all(|(pos, (idx, _))| pos == *idx);
let sorted_group: FileGroup = if already_sorted {
group.clone()
} else {
any_reordered = true;
sorted_indices
.iter()
.map(|(idx, _)| files[*idx].clone())
.collect()
};
let sorted_files: Vec<_> = sorted_group.iter().collect();
let is_non_overlapping = match MinMaxStatistics::new_from_files(
sort_order,
projected_schema,
projection_indices,
sorted_files.iter().copied(),
) {
Ok(stats) => stats.is_sorted(),
Err(_) => false,
};
if is_non_overlapping {
confirmed_non_overlapping += 1;
}
new_groups.push(sorted_group);
}
SortedFileGroups {
file_groups: new_groups,
any_reordered,
all_non_overlapping: confirmed_non_overlapping == file_groups.len(),
}
}
pub(crate) fn any_file_has_nulls_in_sort_columns(
file_groups: &[FileGroup],
order: &[PhysicalSortExpr],
projected_schema: &SchemaRef,
projection_indices: Option<&[usize]>,
) -> bool {
let Some(sort_columns) =
sort_columns_from_physical_sort_exprs_nullable(order, projected_schema)
else {
return true; };
for group in file_groups {
for file in group.iter() {
let Some(stats) = file.statistics.as_ref() else {
return true; };
for col in &sort_columns {
let stat_idx = projection_indices
.map(|p| p[col.index()])
.unwrap_or_else(|| col.index());
if stat_idx >= stats.column_statistics.len() {
return true;
}
let col_stats = &stats.column_statistics[stat_idx];
match &col_stats.null_count {
Precision::Exact(0) => {} Precision::Exact(_) => return true, _ => return true, }
}
}
}
false
}
pub(crate) fn ordered_column_indices_from_projection(
projection: &ProjectionExprs,
) -> Option<Vec<usize>> {
projection
.expr_iter()
.map(|e| {
let index = e.downcast_ref::<Column>()?.index();
Some(index)
})
.collect::<Option<Vec<usize>>>()
}
fn sort_columns_from_physical_sort_exprs_nullable(
order: &[PhysicalSortExpr],
_schema: &SchemaRef,
) -> Option<Vec<Column>> {
order
.iter()
.map(|expr| expr.expr.downcast_ref::<Column>().cloned())
.collect()
}
pub(crate) fn is_ordering_valid_for_file_groups(
file_groups: &[FileGroup],
ordering: &LexOrdering,
schema: &SchemaRef,
projection: Option<&[usize]>,
) -> bool {
file_groups.iter().all(|group| {
if group.len() <= 1 {
return true; }
match MinMaxStatistics::new_from_files(ordering, schema, projection, group.iter())
{
Ok(stats) => stats.is_sorted(),
Err(_) => false, }
})
}
pub(crate) fn validate_orderings(
orderings: &[LexOrdering],
schema: &SchemaRef,
file_groups: &[FileGroup],
projection: Option<&[usize]>,
) -> Vec<LexOrdering> {
orderings
.iter()
.filter(|ordering| {
is_ordering_valid_for_file_groups(file_groups, ordering, schema, projection)
})
.cloned()
.collect()
}
pub(crate) fn get_projected_output_ordering(
base_config: &FileScanConfig,
projected_schema: &SchemaRef,
) -> Vec<LexOrdering> {
let projected_orderings =
project_orderings(&base_config.output_ordering, projected_schema);
let indices = base_config
.file_source
.projection()
.as_ref()
.map(|p| ordered_column_indices_from_projection(p));
match indices {
Some(Some(indices)) => {
validate_orderings(
&projected_orderings,
projected_schema,
&base_config.file_groups,
Some(indices.as_slice()),
)
}
None => {
validate_orderings(
&projected_orderings,
projected_schema,
&base_config.file_groups,
None,
)
}
Some(None) => {
if base_config.file_groups.iter().all(|g| g.len() <= 1) {
projected_orderings
} else {
debug!(
"Skipping specified output orderings. \
Some file groups couldn't be determined to be sorted: {:?}",
base_config.file_groups
);
vec![]
}
}
}
}