use crate::{
db::{
index::{
IndexCompareOp, IndexLiteral, IndexPredicateProgram, TextPrefixBoundMode,
predicate::literal_index_component_bytes, starts_with_component_bounds,
},
predicate::{
CompareOp, ExecutableComparePredicate, ExecutablePredicate, IndexCompileTarget,
IndexPredicateCapability, PredicateCapabilityContext, classify_index_compare_component,
classify_index_compare_target, classify_predicate_capabilities,
classify_predicate_capabilities_for_targets, lower_index_compare_literal_for_target,
lower_index_starts_with_prefix_for_target,
},
},
value::Value,
};
use std::ops::Bound;
#[derive(Clone, Copy)]
pub(crate) enum IndexCompilePolicy {
ConservativeSubset,
StrictAllOrNone,
}
enum ComponentBoundProgram {
Unbounded,
Program(IndexPredicateProgram),
}
impl ComponentBoundProgram {
fn into_program(self) -> Option<IndexPredicateProgram> {
match self {
Self::Unbounded => None,
Self::Program(program) => Some(program),
}
}
}
#[must_use]
pub(crate) fn compile_index_program(
predicate: &ExecutablePredicate,
index_slots: &[usize],
mode: IndexCompilePolicy,
) -> Option<IndexPredicateProgram> {
match mode {
IndexCompilePolicy::ConservativeSubset => {
compile_index_program_from_resolved(predicate, index_slots)
}
IndexCompilePolicy::StrictAllOrNone => {
let capabilities = classify_predicate_capabilities(
predicate,
PredicateCapabilityContext::index_compile(index_slots),
);
match capabilities.index() {
IndexPredicateCapability::FullyIndexable => {
compile_index_program_from_resolved_full(predicate, index_slots)
}
IndexPredicateCapability::PartiallyIndexable
| IndexPredicateCapability::RequiresFullScan => None,
}
}
}
}
#[must_use]
pub(crate) fn compile_index_program_for_targets(
predicate: &ExecutablePredicate,
compile_targets: &[IndexCompileTarget],
mode: IndexCompilePolicy,
) -> Option<IndexPredicateProgram> {
match mode {
IndexCompilePolicy::ConservativeSubset => {
compile_index_program_from_resolved_for_targets(predicate, compile_targets)
}
IndexCompilePolicy::StrictAllOrNone => {
let capabilities =
classify_predicate_capabilities_for_targets(predicate, compile_targets);
match capabilities.index() {
IndexPredicateCapability::FullyIndexable => {
compile_index_program_from_resolved_full_for_targets(predicate, compile_targets)
}
IndexPredicateCapability::PartiallyIndexable
| IndexPredicateCapability::RequiresFullScan => None,
}
}
}
}
const fn index_compare_op(op: CompareOp) -> Option<IndexCompareOp> {
match op {
CompareOp::Eq => Some(IndexCompareOp::Eq),
CompareOp::Ne => Some(IndexCompareOp::Ne),
CompareOp::Lt => Some(IndexCompareOp::Lt),
CompareOp::Lte => Some(IndexCompareOp::Lte),
CompareOp::Gt => Some(IndexCompareOp::Gt),
CompareOp::Gte => Some(IndexCompareOp::Gte),
CompareOp::In => Some(IndexCompareOp::In),
CompareOp::NotIn => Some(IndexCompareOp::NotIn),
CompareOp::Contains | CompareOp::StartsWith | CompareOp::EndsWith => None,
}
}
fn compile_index_program_from_resolved(
predicate: &ExecutablePredicate,
index_slots: &[usize],
) -> Option<IndexPredicateProgram> {
if let ExecutablePredicate::And(children) = predicate {
return compile_index_program_and_subset(children, index_slots);
}
compile_index_program_from_resolved_full(predicate, index_slots)
}
fn compile_index_program_from_resolved_for_targets(
predicate: &ExecutablePredicate,
compile_targets: &[IndexCompileTarget],
) -> Option<IndexPredicateProgram> {
if let ExecutablePredicate::And(children) = predicate {
return compile_index_program_and_subset_for_targets(children, compile_targets);
}
compile_index_program_from_resolved_full_for_targets(predicate, compile_targets)
}
fn compile_index_program_and_subset(
children: &[ExecutablePredicate],
index_slots: &[usize],
) -> Option<IndexPredicateProgram> {
let mut compiled = Vec::new();
for child in children {
let child_program = if let ExecutablePredicate::And(nested) = child {
compile_index_program_and_subset(nested, index_slots)
} else {
let capabilities = classify_predicate_capabilities(
child,
PredicateCapabilityContext::index_compile(index_slots),
);
match capabilities.index() {
IndexPredicateCapability::FullyIndexable => {
compile_index_program_from_resolved_full(child, index_slots)
}
IndexPredicateCapability::PartiallyIndexable
| IndexPredicateCapability::RequiresFullScan => None,
}
};
let Some(child_program) = child_program else {
continue;
};
match child_program {
IndexPredicateProgram::True => {}
IndexPredicateProgram::False => return Some(IndexPredicateProgram::False),
other => compiled.push(other),
}
}
match compiled.len() {
0 => None,
1 => compiled.pop(),
_ => Some(IndexPredicateProgram::And(compiled)),
}
}
fn compile_index_program_and_subset_for_targets(
children: &[ExecutablePredicate],
compile_targets: &[IndexCompileTarget],
) -> Option<IndexPredicateProgram> {
let mut compiled = Vec::new();
for child in children {
let child_program = if let ExecutablePredicate::And(nested) = child {
compile_index_program_and_subset_for_targets(nested, compile_targets)
} else {
let capabilities = classify_predicate_capabilities_for_targets(child, compile_targets);
match capabilities.index() {
IndexPredicateCapability::FullyIndexable => {
compile_index_program_from_resolved_full_for_targets(child, compile_targets)
}
IndexPredicateCapability::PartiallyIndexable
| IndexPredicateCapability::RequiresFullScan => None,
}
};
let Some(child_program) = child_program else {
continue;
};
match child_program {
IndexPredicateProgram::True => {}
IndexPredicateProgram::False => return Some(IndexPredicateProgram::False),
other => compiled.push(other),
}
}
match compiled.len() {
0 => None,
1 => compiled.pop(),
_ => Some(IndexPredicateProgram::And(compiled)),
}
}
fn compile_index_program_from_resolved_full(
predicate: &ExecutablePredicate,
index_slots: &[usize],
) -> Option<IndexPredicateProgram> {
match predicate {
ExecutablePredicate::True => Some(IndexPredicateProgram::True),
ExecutablePredicate::False => Some(IndexPredicateProgram::False),
ExecutablePredicate::And(children) => Some(IndexPredicateProgram::And(
children
.iter()
.map(|child| compile_index_program_from_resolved_full(child, index_slots))
.collect::<Option<Vec<_>>>()?,
)),
ExecutablePredicate::Or(children) => Some(IndexPredicateProgram::Or(
children
.iter()
.map(|child| compile_index_program_from_resolved_full(child, index_slots))
.collect::<Option<Vec<_>>>()?,
)),
ExecutablePredicate::Not(inner) => Some(IndexPredicateProgram::Not(Box::new(
compile_index_program_from_resolved_full(inner, index_slots)?,
))),
ExecutablePredicate::Compare(cmp) => compile_compare_index_node(cmp, index_slots),
ExecutablePredicate::IsNull { .. }
| ExecutablePredicate::IsNotNull { .. }
| ExecutablePredicate::IsMissing { .. }
| ExecutablePredicate::IsEmpty { .. }
| ExecutablePredicate::IsNotEmpty { .. }
| ExecutablePredicate::TextContains { .. }
| ExecutablePredicate::TextContainsCi { .. } => None,
}
}
fn compile_index_program_from_resolved_full_for_targets(
predicate: &ExecutablePredicate,
compile_targets: &[IndexCompileTarget],
) -> Option<IndexPredicateProgram> {
match predicate {
ExecutablePredicate::True => Some(IndexPredicateProgram::True),
ExecutablePredicate::False => Some(IndexPredicateProgram::False),
ExecutablePredicate::And(children) => Some(IndexPredicateProgram::And(
children
.iter()
.map(|child| {
compile_index_program_from_resolved_full_for_targets(child, compile_targets)
})
.collect::<Option<Vec<_>>>()?,
)),
ExecutablePredicate::Or(children) => Some(IndexPredicateProgram::Or(
children
.iter()
.map(|child| {
compile_index_program_from_resolved_full_for_targets(child, compile_targets)
})
.collect::<Option<Vec<_>>>()?,
)),
ExecutablePredicate::Not(inner) => Some(IndexPredicateProgram::Not(Box::new(
compile_index_program_from_resolved_full_for_targets(inner, compile_targets)?,
))),
ExecutablePredicate::Compare(cmp) => {
compile_compare_index_node_for_targets(cmp, compile_targets)
}
ExecutablePredicate::IsNull { .. }
| ExecutablePredicate::IsNotNull { .. }
| ExecutablePredicate::IsMissing { .. }
| ExecutablePredicate::IsEmpty { .. }
| ExecutablePredicate::IsNotEmpty { .. }
| ExecutablePredicate::TextContains { .. }
| ExecutablePredicate::TextContainsCi { .. } => None,
}
}
fn compile_compare_index_node(
cmp: &ExecutableComparePredicate,
index_slots: &[usize],
) -> Option<IndexPredicateProgram> {
let component_index = classify_index_compare_component(cmp, index_slots)?;
let literal_value = cmp.right_literal()?;
if cmp.op.is_equality_family() || cmp.op.is_ordering_family() {
let literal = literal_index_component_bytes(literal_value)?;
Some(IndexPredicateProgram::Compare {
component_index,
op: index_compare_op(cmp.op)?,
literal: IndexLiteral::One(literal),
})
} else if cmp.op.is_membership_family() {
let Value::List(items) = literal_value else {
return None;
};
if items.is_empty() {
return None;
}
let literals = items
.iter()
.map(literal_index_component_bytes)
.collect::<Option<Vec<_>>>()?;
Some(IndexPredicateProgram::Compare {
component_index,
op: index_compare_op(cmp.op)?,
literal: IndexLiteral::Many(literals),
})
} else if matches!(cmp.op, CompareOp::StartsWith) {
compile_starts_with_index_node(component_index, literal_value)
} else {
None
}
}
fn compile_compare_index_node_for_targets(
cmp: &ExecutableComparePredicate,
compile_targets: &[IndexCompileTarget],
) -> Option<IndexPredicateProgram> {
let target = classify_index_compare_target(cmp, compile_targets)?;
let literal_value = cmp.right_literal()?;
if cmp.op.is_equality_family() || cmp.op.is_ordering_family() {
let lowered =
lower_index_compare_literal_for_target(target, literal_value, cmp.coercion.id)?;
let literal = literal_index_component_bytes(&lowered)?;
Some(IndexPredicateProgram::Compare {
component_index: target.component_index,
op: index_compare_op(cmp.op)?,
literal: IndexLiteral::One(literal),
})
} else if cmp.op.is_membership_family() {
let Value::List(values) = literal_value else {
return None;
};
let literals = values
.iter()
.map(|value| {
let lowered =
lower_index_compare_literal_for_target(target, value, cmp.coercion.id)?;
literal_index_component_bytes(&lowered)
})
.collect::<Option<Vec<_>>>()?;
if literals.is_empty() {
return None;
}
Some(IndexPredicateProgram::Compare {
component_index: target.component_index,
op: index_compare_op(cmp.op)?,
literal: IndexLiteral::Many(literals),
})
} else if matches!(cmp.op, CompareOp::StartsWith) {
compile_starts_with_index_node_for_target(literal_value, cmp.coercion.id, target)
} else {
None
}
}
fn compile_starts_with_index_node(
component_index: usize,
value: &Value,
) -> Option<IndexPredicateProgram> {
let Value::Text(prefix) = value else {
return None;
};
if prefix.is_empty() {
return None;
}
compile_text_prefix_bounds_for_component(component_index, prefix)
}
fn compile_starts_with_index_node_for_target(
value: &Value,
coercion: crate::db::predicate::CoercionId,
target: IndexCompileTarget,
) -> Option<IndexPredicateProgram> {
let prefix = lower_index_starts_with_prefix_for_target(target, value, coercion)?;
compile_text_prefix_bounds_for_component(target.component_index, &prefix)
}
fn compile_text_prefix_bounds_for_component(
component_index: usize,
prefix: &str,
) -> Option<IndexPredicateProgram> {
let (lower, upper) = starts_with_component_bounds(prefix, TextPrefixBoundMode::Strict)?;
let lower = compile_component_bound(component_index, &lower, true)?;
let upper = compile_component_bound(component_index, &upper, false)?;
match (lower.into_program(), upper.into_program()) {
(None, None) => None,
(Some(program), None) | (None, Some(program)) => Some(program),
(Some(lower), Some(upper)) => Some(IndexPredicateProgram::And(vec![lower, upper])),
}
}
fn compile_component_bound(
component_index: usize,
bound: &Bound<Value>,
lower: bool,
) -> Option<ComponentBoundProgram> {
let (value, op) = match (bound, lower) {
(Bound::Unbounded, _) => return Some(ComponentBoundProgram::Unbounded),
(Bound::Included(value), true) => (value, IndexCompareOp::Gte),
(Bound::Excluded(value), true) => (value, IndexCompareOp::Gt),
(Bound::Included(value), false) => (value, IndexCompareOp::Lte),
(Bound::Excluded(value), false) => (value, IndexCompareOp::Lt),
};
let literal = literal_index_component_bytes(value)?;
Some(ComponentBoundProgram::Program(
IndexPredicateProgram::Compare {
component_index,
op,
literal: IndexLiteral::One(literal),
},
))
}