use crate::spec::{
field_idx_to_partition_idx, BinaryRow, DataField, DataType, Datum, Predicate, PredicateOperator,
};
use std::collections::HashSet;
pub(super) fn split_partition_and_data_predicates(
filter: Predicate,
fields: &[DataField],
partition_keys: &[String],
) -> (Option<Predicate>, Vec<Predicate>) {
let mapping = field_idx_to_partition_idx(fields, partition_keys);
let mut partition_predicates = Vec::new();
let mut data_predicates = Vec::new();
for conjunct in filter.split_and() {
let strict_partition_only = conjunct.references_only_mapped_fields(&mapping);
if let Some(projected) = conjunct.project_field_index_inclusive(&mapping) {
partition_predicates.push(projected);
}
if !strict_partition_only {
data_predicates.push(conjunct);
}
}
let partition_predicate = if partition_predicates.is_empty() {
None
} else {
Some(Predicate::and(partition_predicates))
};
(partition_predicate, data_predicates)
}
pub(super) fn extract_predicate_for_keys(
filter: &Predicate,
fields: &[DataField],
keys: &[String],
) -> Option<Predicate> {
if keys.is_empty() {
return None;
}
let mapping = field_idx_to_partition_idx(fields, keys);
let projected: Vec<Predicate> = filter
.clone()
.split_and()
.into_iter()
.filter_map(|conjunct| conjunct.project_field_index_inclusive(&mapping))
.collect();
if projected.is_empty() {
None
} else {
Some(Predicate::and(projected))
}
}
pub(super) fn compute_target_buckets(
bucket_predicate: &Predicate,
bucket_key_fields: &[DataField],
total_buckets: i32,
) -> Option<HashSet<i32>> {
if total_buckets <= 0 || bucket_key_fields.is_empty() {
return None;
}
let num_keys = bucket_key_fields.len();
let mut field_candidates: Vec<Option<Vec<Option<&Datum>>>> = vec![None; num_keys];
collect_eq_candidates(bucket_predicate, &mut field_candidates);
let candidates: Vec<&Vec<Option<&Datum>>> =
field_candidates.iter().filter_map(|c| c.as_ref()).collect();
if candidates.len() != num_keys {
return None;
}
let mut buckets = HashSet::new();
let mut combo: Vec<usize> = vec![0; num_keys];
loop {
let datums: Vec<(Option<&Datum>, &DataType)> = (0..num_keys)
.map(|i| {
let vals = field_candidates[i].as_ref().unwrap();
(vals[combo[i]], bucket_key_fields[i].data_type())
})
.collect();
let bucket = BinaryRow::compute_bucket_from_datums(&datums, total_buckets);
buckets.insert(bucket);
let mut carry = true;
for i in (0..num_keys).rev() {
if carry {
combo[i] += 1;
if combo[i] < field_candidates[i].as_ref().unwrap().len() {
carry = false;
} else {
combo[i] = 0;
}
}
}
if carry {
break;
}
}
if buckets.is_empty() {
None
} else {
Some(buckets)
}
}
fn collect_eq_candidates<'a>(
predicate: &'a Predicate,
field_candidates: &mut Vec<Option<Vec<Option<&'a Datum>>>>,
) {
match predicate {
Predicate::And(children) => {
for child in children {
collect_eq_candidates(child, field_candidates);
}
}
Predicate::Leaf {
index,
op,
literals,
..
} => {
if *index < field_candidates.len() {
match op {
PredicateOperator::Eq => {
if let Some(lit) = literals.first() {
field_candidates[*index] = Some(vec![Some(lit)]);
}
}
PredicateOperator::In => {
if !literals.is_empty() {
field_candidates[*index] = Some(literals.iter().map(Some).collect());
}
}
PredicateOperator::IsNull => {
field_candidates[*index] = Some(vec![None]);
}
_ => {}
}
}
}
_ => {}
}
}