use crate::{
db::{
index::{
IndexCompareOp, IndexLiteral, IndexPredicateProgram, next_text_prefix,
predicate::literal_index_component_bytes,
},
predicate::{
CompareOp, ExecutableComparePredicate, ExecutablePredicate, IndexCompileTarget,
IndexPredicateCapability, PredicateCapabilityContext, classify_index_compare_component,
classify_index_compare_target, classify_predicate_capabilities,
classify_predicate_capabilities_for_targets, lower_index_compare_literal_for_target,
lower_index_starts_with_prefix_for_target,
},
},
value::Value,
};
#[derive(Clone, Copy)]
pub(crate) enum IndexCompilePolicy {
ConservativeSubset,
StrictAllOrNone,
}
#[must_use]
pub(crate) fn compile_index_program(
predicate: &ExecutablePredicate,
index_slots: &[usize],
mode: IndexCompilePolicy,
) -> Option<IndexPredicateProgram> {
match mode {
IndexCompilePolicy::ConservativeSubset => {
compile_index_program_from_resolved(predicate, index_slots)
}
IndexCompilePolicy::StrictAllOrNone => {
let capabilities = classify_predicate_capabilities(
predicate,
PredicateCapabilityContext::index_compile(index_slots),
);
match capabilities.index() {
IndexPredicateCapability::FullyIndexable => {
compile_index_program_from_resolved_full(predicate, index_slots)
}
IndexPredicateCapability::PartiallyIndexable
| IndexPredicateCapability::RequiresFullScan => None,
}
}
}
}
#[must_use]
pub(crate) fn compile_index_program_for_targets(
predicate: &ExecutablePredicate,
compile_targets: &[IndexCompileTarget],
mode: IndexCompilePolicy,
) -> Option<IndexPredicateProgram> {
match mode {
IndexCompilePolicy::ConservativeSubset => {
compile_index_program_from_resolved_for_targets(predicate, compile_targets)
}
IndexCompilePolicy::StrictAllOrNone => {
let capabilities =
classify_predicate_capabilities_for_targets(predicate, compile_targets);
match capabilities.index() {
IndexPredicateCapability::FullyIndexable => {
compile_index_program_from_resolved_full_for_targets(predicate, compile_targets)
}
IndexPredicateCapability::PartiallyIndexable
| IndexPredicateCapability::RequiresFullScan => None,
}
}
}
}
fn compile_index_program_from_resolved(
predicate: &ExecutablePredicate,
index_slots: &[usize],
) -> Option<IndexPredicateProgram> {
if let ExecutablePredicate::And(children) = predicate {
return compile_index_program_and_subset(children, index_slots);
}
compile_index_program_from_resolved_full(predicate, index_slots)
}
fn compile_index_program_from_resolved_for_targets(
predicate: &ExecutablePredicate,
compile_targets: &[IndexCompileTarget],
) -> Option<IndexPredicateProgram> {
if let ExecutablePredicate::And(children) = predicate {
return compile_index_program_and_subset_for_targets(children, compile_targets);
}
compile_index_program_from_resolved_full_for_targets(predicate, compile_targets)
}
fn compile_index_program_and_subset(
children: &[ExecutablePredicate],
index_slots: &[usize],
) -> Option<IndexPredicateProgram> {
let mut compiled = Vec::new();
for child in children {
let child_program = if let ExecutablePredicate::And(nested) = child {
compile_index_program_and_subset(nested, index_slots)
} else {
let capabilities = classify_predicate_capabilities(
child,
PredicateCapabilityContext::index_compile(index_slots),
);
match capabilities.index() {
IndexPredicateCapability::FullyIndexable => {
compile_index_program_from_resolved_full(child, index_slots)
}
IndexPredicateCapability::PartiallyIndexable
| IndexPredicateCapability::RequiresFullScan => None,
}
};
let Some(child_program) = child_program else {
continue;
};
match child_program {
IndexPredicateProgram::True => {}
IndexPredicateProgram::False => return Some(IndexPredicateProgram::False),
other => compiled.push(other),
}
}
match compiled.len() {
0 => None,
1 => compiled.pop(),
_ => Some(IndexPredicateProgram::And(compiled)),
}
}
fn compile_index_program_and_subset_for_targets(
children: &[ExecutablePredicate],
compile_targets: &[IndexCompileTarget],
) -> Option<IndexPredicateProgram> {
let mut compiled = Vec::new();
for child in children {
let child_program = if let ExecutablePredicate::And(nested) = child {
compile_index_program_and_subset_for_targets(nested, compile_targets)
} else {
let capabilities = classify_predicate_capabilities_for_targets(child, compile_targets);
match capabilities.index() {
IndexPredicateCapability::FullyIndexable => {
compile_index_program_from_resolved_full_for_targets(child, compile_targets)
}
IndexPredicateCapability::PartiallyIndexable
| IndexPredicateCapability::RequiresFullScan => None,
}
};
let Some(child_program) = child_program else {
continue;
};
match child_program {
IndexPredicateProgram::True => {}
IndexPredicateProgram::False => return Some(IndexPredicateProgram::False),
other => compiled.push(other),
}
}
match compiled.len() {
0 => None,
1 => compiled.pop(),
_ => Some(IndexPredicateProgram::And(compiled)),
}
}
fn compile_index_program_from_resolved_full(
predicate: &ExecutablePredicate,
index_slots: &[usize],
) -> Option<IndexPredicateProgram> {
match predicate {
ExecutablePredicate::True => Some(IndexPredicateProgram::True),
ExecutablePredicate::False => Some(IndexPredicateProgram::False),
ExecutablePredicate::And(children) => Some(IndexPredicateProgram::And(
children
.iter()
.map(|child| compile_index_program_from_resolved_full(child, index_slots))
.collect::<Option<Vec<_>>>()?,
)),
ExecutablePredicate::Or(children) => Some(IndexPredicateProgram::Or(
children
.iter()
.map(|child| compile_index_program_from_resolved_full(child, index_slots))
.collect::<Option<Vec<_>>>()?,
)),
ExecutablePredicate::Not(inner) => Some(IndexPredicateProgram::Not(Box::new(
compile_index_program_from_resolved_full(inner, index_slots)?,
))),
ExecutablePredicate::Compare(cmp) => compile_compare_index_node(cmp, index_slots),
ExecutablePredicate::IsNull { .. }
| ExecutablePredicate::IsNotNull { .. }
| ExecutablePredicate::IsMissing { .. }
| ExecutablePredicate::IsEmpty { .. }
| ExecutablePredicate::IsNotEmpty { .. }
| ExecutablePredicate::TextContains { .. }
| ExecutablePredicate::TextContainsCi { .. } => None,
}
}
fn compile_index_program_from_resolved_full_for_targets(
predicate: &ExecutablePredicate,
compile_targets: &[IndexCompileTarget],
) -> Option<IndexPredicateProgram> {
match predicate {
ExecutablePredicate::True => Some(IndexPredicateProgram::True),
ExecutablePredicate::False => Some(IndexPredicateProgram::False),
ExecutablePredicate::And(children) => Some(IndexPredicateProgram::And(
children
.iter()
.map(|child| {
compile_index_program_from_resolved_full_for_targets(child, compile_targets)
})
.collect::<Option<Vec<_>>>()?,
)),
ExecutablePredicate::Or(children) => Some(IndexPredicateProgram::Or(
children
.iter()
.map(|child| {
compile_index_program_from_resolved_full_for_targets(child, compile_targets)
})
.collect::<Option<Vec<_>>>()?,
)),
ExecutablePredicate::Not(inner) => Some(IndexPredicateProgram::Not(Box::new(
compile_index_program_from_resolved_full_for_targets(inner, compile_targets)?,
))),
ExecutablePredicate::Compare(cmp) => {
compile_compare_index_node_for_targets(cmp, compile_targets)
}
ExecutablePredicate::IsNull { .. }
| ExecutablePredicate::IsNotNull { .. }
| ExecutablePredicate::IsMissing { .. }
| ExecutablePredicate::IsEmpty { .. }
| ExecutablePredicate::IsNotEmpty { .. }
| ExecutablePredicate::TextContains { .. }
| ExecutablePredicate::TextContainsCi { .. } => None,
}
}
fn compile_compare_index_node(
cmp: &ExecutableComparePredicate,
index_slots: &[usize],
) -> Option<IndexPredicateProgram> {
let component_index = classify_index_compare_component(cmp, index_slots)?;
let literal_value = cmp.right_literal()?;
match cmp.op {
CompareOp::Eq
| CompareOp::Ne
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::Gt
| CompareOp::Gte => {
let literal = literal_index_component_bytes(literal_value)?;
let op = match cmp.op {
CompareOp::Eq => IndexCompareOp::Eq,
CompareOp::Ne => IndexCompareOp::Ne,
CompareOp::Lt => IndexCompareOp::Lt,
CompareOp::Lte => IndexCompareOp::Lte,
CompareOp::Gt => IndexCompareOp::Gt,
CompareOp::Gte => IndexCompareOp::Gte,
CompareOp::In
| CompareOp::NotIn
| CompareOp::Contains
| CompareOp::StartsWith
| CompareOp::EndsWith => {
unreachable!("op branch must match index compare subset")
}
};
Some(IndexPredicateProgram::Compare {
component_index,
op,
literal: IndexLiteral::One(literal),
})
}
CompareOp::In | CompareOp::NotIn => {
let Value::List(items) = literal_value else {
return None;
};
if items.is_empty() {
return None;
}
let literals = items
.iter()
.map(literal_index_component_bytes)
.collect::<Option<Vec<_>>>()?;
let op = match cmp.op {
CompareOp::In => IndexCompareOp::In,
CompareOp::NotIn => IndexCompareOp::NotIn,
CompareOp::Eq
| CompareOp::Ne
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::Gt
| CompareOp::Gte
| CompareOp::Contains
| CompareOp::StartsWith
| CompareOp::EndsWith => unreachable!("op branch must match index compare subset"),
};
Some(IndexPredicateProgram::Compare {
component_index,
op,
literal: IndexLiteral::Many(literals),
})
}
CompareOp::StartsWith => compile_starts_with_index_node(component_index, literal_value),
CompareOp::Contains | CompareOp::EndsWith => None,
}
}
fn compile_compare_index_node_for_targets(
cmp: &ExecutableComparePredicate,
compile_targets: &[IndexCompileTarget],
) -> Option<IndexPredicateProgram> {
let target = classify_index_compare_target(cmp, compile_targets)?;
let literal_value = cmp.right_literal()?;
match cmp.op {
CompareOp::Eq
| CompareOp::Ne
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::Gt
| CompareOp::Gte => {
let lowered =
lower_index_compare_literal_for_target(target, literal_value, cmp.coercion.id)?;
let literal = literal_index_component_bytes(&lowered)?;
let op = match cmp.op {
CompareOp::Eq => IndexCompareOp::Eq,
CompareOp::Ne => IndexCompareOp::Ne,
CompareOp::Lt => IndexCompareOp::Lt,
CompareOp::Lte => IndexCompareOp::Lte,
CompareOp::Gt => IndexCompareOp::Gt,
CompareOp::Gte => IndexCompareOp::Gte,
CompareOp::In
| CompareOp::NotIn
| CompareOp::Contains
| CompareOp::StartsWith
| CompareOp::EndsWith => unreachable!("handled in other compile branches"),
};
Some(IndexPredicateProgram::Compare {
component_index: target.component_index,
op,
literal: IndexLiteral::One(literal),
})
}
CompareOp::In | CompareOp::NotIn => {
let Value::List(values) = literal_value else {
return None;
};
let literals = values
.iter()
.map(|value| {
let lowered =
lower_index_compare_literal_for_target(target, value, cmp.coercion.id)?;
literal_index_component_bytes(&lowered)
})
.collect::<Option<Vec<_>>>()?;
if literals.is_empty() {
return None;
}
let op = match cmp.op {
CompareOp::In => IndexCompareOp::In,
CompareOp::NotIn => IndexCompareOp::NotIn,
CompareOp::Eq
| CompareOp::Ne
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::Gt
| CompareOp::Gte
| CompareOp::Contains
| CompareOp::StartsWith
| CompareOp::EndsWith => unreachable!("handled in other compile branches"),
};
Some(IndexPredicateProgram::Compare {
component_index: target.component_index,
op,
literal: IndexLiteral::Many(literals),
})
}
CompareOp::StartsWith => {
compile_starts_with_index_node_for_target(literal_value, cmp.coercion.id, target)
}
CompareOp::Contains | CompareOp::EndsWith => None,
}
}
fn compile_starts_with_index_node(
component_index: usize,
value: &Value,
) -> Option<IndexPredicateProgram> {
let Value::Text(prefix) = value else {
return None;
};
if prefix.is_empty() {
return None;
}
let lower_literal = literal_index_component_bytes(&Value::Text(prefix.clone()))?;
let lower = IndexPredicateProgram::Compare {
component_index,
op: IndexCompareOp::Gte,
literal: IndexLiteral::One(lower_literal),
};
let Some(upper_prefix) = next_text_prefix(prefix) else {
return Some(lower);
};
let upper_literal = literal_index_component_bytes(&Value::Text(upper_prefix))?;
let upper = IndexPredicateProgram::Compare {
component_index,
op: IndexCompareOp::Lt,
literal: IndexLiteral::One(upper_literal),
};
Some(IndexPredicateProgram::And(vec![lower, upper]))
}
fn compile_starts_with_index_node_for_target(
value: &Value,
coercion: crate::db::predicate::CoercionId,
target: IndexCompileTarget,
) -> Option<IndexPredicateProgram> {
let prefix = lower_index_starts_with_prefix_for_target(target, value, coercion)?;
let lower_literal = literal_index_component_bytes(&Value::Text(prefix.clone()))?;
let lower = IndexPredicateProgram::Compare {
component_index: target.component_index,
op: IndexCompareOp::Gte,
literal: IndexLiteral::One(lower_literal),
};
let Some(upper_prefix) = next_text_prefix(&prefix) else {
return Some(lower);
};
let upper_literal = literal_index_component_bytes(&Value::Text(upper_prefix))?;
let upper = IndexPredicateProgram::Compare {
component_index: target.component_index,
op: IndexCompareOp::Lt,
literal: IndexLiteral::One(upper_literal),
};
Some(IndexPredicateProgram::And(vec![lower, upper]))
}