use crate::arrow::schema_evolution::create_index_mapping;
pub(crate) use crate::predicate_stats::{predicates_may_match_with_schema, StatsAccessor};
use crate::spec::{DataField, Predicate, PredicateOperator};
pub(crate) fn reader_pruning_predicates(data_predicates: Vec<Predicate>) -> Vec<Predicate> {
data_predicates
.into_iter()
.filter(predicate_supported_for_reader_pruning)
.collect()
}
pub(crate) fn remap_predicates_to_file(
predicates: &[Predicate],
table_fields: &[DataField],
file_fields: &[DataField],
) -> Vec<Predicate> {
let mapping = build_field_mapping(table_fields, file_fields);
predicates
.iter()
.map(|p| remap_predicate(p, &mapping))
.collect()
}
fn remap_predicate(predicate: &Predicate, mapping: &[Option<usize>]) -> Predicate {
match predicate {
Predicate::Leaf {
column,
index,
data_type,
op,
literals,
} => {
match mapping.get(*index).copied().flatten() {
Some(file_index) => Predicate::Leaf {
column: column.clone(),
index: file_index,
data_type: data_type.clone(),
op: *op,
literals: literals.clone(),
},
None => match op {
PredicateOperator::IsNull => Predicate::AlwaysTrue,
_ => Predicate::AlwaysFalse,
},
}
}
Predicate::And(children) => {
let remapped: Vec<_> = children
.iter()
.map(|c| remap_predicate(c, mapping))
.collect();
if remapped.iter().any(|p| matches!(p, Predicate::AlwaysFalse)) {
Predicate::AlwaysFalse
} else {
let filtered: Vec<_> = remapped
.into_iter()
.filter(|p| !matches!(p, Predicate::AlwaysTrue))
.collect();
match filtered.len() {
0 => Predicate::AlwaysTrue,
1 => filtered.into_iter().next().unwrap(),
_ => Predicate::and(filtered),
}
}
}
Predicate::Or(children) => {
let remapped: Vec<_> = children
.iter()
.map(|c| remap_predicate(c, mapping))
.collect();
if remapped.iter().any(|p| matches!(p, Predicate::AlwaysTrue)) {
Predicate::AlwaysTrue
} else {
let filtered: Vec<_> = remapped
.into_iter()
.filter(|p| !matches!(p, Predicate::AlwaysFalse))
.collect();
match filtered.len() {
0 => Predicate::AlwaysFalse,
1 => filtered.into_iter().next().unwrap(),
_ => Predicate::or(filtered),
}
}
}
Predicate::Not(inner) => {
let remapped = remap_predicate(inner, mapping);
match remapped {
Predicate::AlwaysTrue => Predicate::AlwaysFalse,
Predicate::AlwaysFalse => Predicate::AlwaysTrue,
other => Predicate::Not(Box::new(other)),
}
}
Predicate::AlwaysTrue => Predicate::AlwaysTrue,
Predicate::AlwaysFalse => Predicate::AlwaysFalse,
}
}
pub(crate) fn build_field_mapping(
table_fields: &[DataField],
file_fields: &[DataField],
) -> Vec<Option<usize>> {
normalize_field_mapping(
create_index_mapping(table_fields, file_fields),
table_fields.len(),
)
}
fn predicate_supported_for_reader_pruning(predicate: &Predicate) -> bool {
match predicate {
Predicate::AlwaysFalse => true,
Predicate::Leaf { op, .. } => {
matches!(
op,
PredicateOperator::IsNull
| PredicateOperator::IsNotNull
| PredicateOperator::Eq
| PredicateOperator::NotEq
| PredicateOperator::Lt
| PredicateOperator::LtEq
| PredicateOperator::Gt
| PredicateOperator::GtEq
| PredicateOperator::In
| PredicateOperator::NotIn
)
}
Predicate::AlwaysTrue | Predicate::And(_) | Predicate::Or(_) | Predicate::Not(_) => false,
}
}
fn identity_field_mapping(num_fields: usize) -> Vec<Option<usize>> {
(0..num_fields).map(Some).collect()
}
fn normalize_field_mapping(mapping: Option<Vec<i32>>, num_fields: usize) -> Vec<Option<usize>> {
mapping
.map(|field_mapping| {
field_mapping
.into_iter()
.map(|index| usize::try_from(index).ok())
.collect()
})
.unwrap_or_else(|| identity_field_mapping(num_fields))
}