use crate::db::predicate::{CompareOp, ComparePredicate, Predicate, compare_eq, compare_order};
use std::cmp::Ordering;
#[derive(Clone)]
enum ComparePairSimplification {
NoChange,
Contradiction,
KeepFirst,
KeepSecond,
ReplaceFirst(ComparePredicate),
ReplaceSecond(ComparePredicate),
}
#[must_use]
pub(in crate::db::predicate) fn simplify_and_compare_constraints(
mut predicates: Vec<Predicate>,
) -> Option<Vec<Predicate>> {
loop {
let mut changed = false;
'scan: for i in 0..predicates.len() {
for j in i.saturating_add(1)..predicates.len() {
let (Predicate::Compare(left), Predicate::Compare(right)) =
(&predicates[i], &predicates[j])
else {
continue;
};
if left.field != right.field || left.coercion != right.coercion {
continue;
}
match simplify_compare_pair_for_and(left, right) {
ComparePairSimplification::NoChange => continue,
ComparePairSimplification::Contradiction => return None,
ComparePairSimplification::KeepFirst => {
predicates.remove(j);
}
ComparePairSimplification::KeepSecond => {
predicates.remove(i);
}
ComparePairSimplification::ReplaceFirst(replacement) => {
predicates[i] = Predicate::Compare(replacement);
predicates.remove(j);
}
ComparePairSimplification::ReplaceSecond(replacement) => {
predicates[j] = Predicate::Compare(replacement);
predicates.remove(i);
}
}
changed = true;
break 'scan;
}
}
if !changed {
break;
}
}
Some(predicates)
}
fn simplify_compare_pair_for_and(
left: &ComparePredicate,
right: &ComparePredicate,
) -> ComparePairSimplification {
match (left.op, right.op) {
(CompareOp::Eq, CompareOp::Eq) => simplify_eq_eq_pair(left, right),
(CompareOp::Eq, _) => simplify_eq_with_constraint_pair(left, right, true),
(_, CompareOp::Eq) => simplify_eq_with_constraint_pair(right, left, false),
_ => simplify_constraint_constraint_pair(left, right),
}
}
fn simplify_eq_eq_pair(
left: &ComparePredicate,
right: &ComparePredicate,
) -> ComparePairSimplification {
match compare_eq(&left.value, &right.value, &left.coercion) {
Some(true) => ComparePairSimplification::KeepFirst,
Some(false) => ComparePairSimplification::Contradiction,
None => ComparePairSimplification::NoChange,
}
}
fn simplify_eq_with_constraint_pair(
eq: &ComparePredicate,
constraint: &ComparePredicate,
eq_is_first: bool,
) -> ComparePairSimplification {
let Some(ordering) = compare_order(&eq.value, &constraint.value, &eq.coercion) else {
return ComparePairSimplification::NoChange;
};
let satisfies = match constraint.op {
CompareOp::Gt => ordering.is_gt(),
CompareOp::Gte => ordering.is_gt() || ordering.is_eq(),
CompareOp::Lt => ordering.is_lt(),
CompareOp::Lte => ordering.is_lt() || ordering.is_eq(),
CompareOp::Eq
| CompareOp::Ne
| CompareOp::In
| CompareOp::NotIn
| CompareOp::Contains
| CompareOp::StartsWith
| CompareOp::EndsWith => return ComparePairSimplification::NoChange,
};
if !satisfies {
return ComparePairSimplification::Contradiction;
}
if eq_is_first {
ComparePairSimplification::KeepFirst
} else {
ComparePairSimplification::KeepSecond
}
}
fn simplify_constraint_constraint_pair(
left: &ComparePredicate,
right: &ComparePredicate,
) -> ComparePairSimplification {
let left_lower = lower_bound_inclusive(left.op);
let right_lower = lower_bound_inclusive(right.op);
let left_upper = upper_bound_inclusive(left.op);
let right_upper = upper_bound_inclusive(right.op);
if left_lower.is_some() && right_lower.is_some() {
return simplify_two_lower_bounds(left, right);
}
if left_upper.is_some() && right_upper.is_some() {
return simplify_two_upper_bounds(left, right);
}
if left_lower.is_some() && right_upper.is_some() {
return simplify_lower_upper_pair(left, right);
}
if left_upper.is_some() && right_lower.is_some() {
return match simplify_lower_upper_pair(right, left) {
ComparePairSimplification::KeepFirst => ComparePairSimplification::KeepSecond,
ComparePairSimplification::KeepSecond => ComparePairSimplification::KeepFirst,
ComparePairSimplification::ReplaceFirst(cmp) => {
ComparePairSimplification::ReplaceSecond(cmp)
}
ComparePairSimplification::ReplaceSecond(cmp) => {
ComparePairSimplification::ReplaceFirst(cmp)
}
ComparePairSimplification::NoChange => ComparePairSimplification::NoChange,
ComparePairSimplification::Contradiction => ComparePairSimplification::Contradiction,
};
}
ComparePairSimplification::NoChange
}
fn simplify_two_lower_bounds(
left: &ComparePredicate,
right: &ComparePredicate,
) -> ComparePairSimplification {
let Some(ordering) = compare_order(&left.value, &right.value, &left.coercion) else {
return ComparePairSimplification::NoChange;
};
let Some(left_inclusive) = lower_bound_inclusive(left.op) else {
return ComparePairSimplification::NoChange;
};
let Some(right_inclusive) = lower_bound_inclusive(right.op) else {
return ComparePairSimplification::NoChange;
};
match ordering {
Ordering::Greater => ComparePairSimplification::KeepFirst,
Ordering::Less => ComparePairSimplification::KeepSecond,
Ordering::Equal => {
if !left_inclusive && right_inclusive {
ComparePairSimplification::KeepFirst
} else if left_inclusive && !right_inclusive {
ComparePairSimplification::KeepSecond
} else {
ComparePairSimplification::KeepFirst
}
}
}
}
fn simplify_two_upper_bounds(
left: &ComparePredicate,
right: &ComparePredicate,
) -> ComparePairSimplification {
let Some(ordering) = compare_order(&left.value, &right.value, &left.coercion) else {
return ComparePairSimplification::NoChange;
};
let Some(left_inclusive) = upper_bound_inclusive(left.op) else {
return ComparePairSimplification::NoChange;
};
let Some(right_inclusive) = upper_bound_inclusive(right.op) else {
return ComparePairSimplification::NoChange;
};
match ordering {
Ordering::Less => ComparePairSimplification::KeepFirst,
Ordering::Greater => ComparePairSimplification::KeepSecond,
Ordering::Equal => {
if !left_inclusive && right_inclusive {
ComparePairSimplification::KeepFirst
} else if left_inclusive && !right_inclusive {
ComparePairSimplification::KeepSecond
} else {
ComparePairSimplification::KeepFirst
}
}
}
}
fn simplify_lower_upper_pair(
lower: &ComparePredicate,
upper: &ComparePredicate,
) -> ComparePairSimplification {
let Some(ordering) = compare_order(&lower.value, &upper.value, &lower.coercion) else {
return ComparePairSimplification::NoChange;
};
let Some(lower_inclusive) = lower_bound_inclusive(lower.op) else {
return ComparePairSimplification::NoChange;
};
let Some(upper_inclusive) = upper_bound_inclusive(upper.op) else {
return ComparePairSimplification::NoChange;
};
match ordering {
Ordering::Less => ComparePairSimplification::NoChange,
Ordering::Greater => ComparePairSimplification::Contradiction,
Ordering::Equal => {
if lower_inclusive && upper_inclusive {
ComparePairSimplification::ReplaceFirst(ComparePredicate {
field: lower.field.clone(),
op: CompareOp::Eq,
value: lower.value.clone(),
coercion: lower.coercion.clone(),
})
} else {
ComparePairSimplification::Contradiction
}
}
}
}
const fn lower_bound_inclusive(op: CompareOp) -> Option<bool> {
match op {
CompareOp::Gt => Some(false),
CompareOp::Gte => Some(true),
CompareOp::Eq
| CompareOp::Ne
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::In
| CompareOp::NotIn
| CompareOp::Contains
| CompareOp::StartsWith
| CompareOp::EndsWith => None,
}
}
const fn upper_bound_inclusive(op: CompareOp) -> Option<bool> {
match op {
CompareOp::Lt => Some(false),
CompareOp::Lte => Some(true),
CompareOp::Eq
| CompareOp::Ne
| CompareOp::Gt
| CompareOp::Gte
| CompareOp::In
| CompareOp::NotIn
| CompareOp::Contains
| CompareOp::StartsWith
| CompareOp::EndsWith => None,
}
}