use crate::{
db::{
access::{
AccessPlan, SemanticIndexAccessContract, SemanticIndexKeyItemRef,
SemanticIndexKeyItemsRef, SemanticIndexRangeSpec,
},
index::{TextPrefixBoundMode, starts_with_component_bounds},
predicate::{CoercionId, CompareOp, ComparePredicate},
query::plan::{
OrderSpec,
key_item_match::{eq_lookup_value_for_key_item, starts_with_lookup_value_for_key_item},
planner::{
AccessCandidateScore, access_candidate_score_from_index_contract,
access_candidate_score_outranks, index_literal_matches_schema,
prefix::{index_multi_lookup_for_in, index_prefix_for_eq},
range_bound_count,
},
},
schema::{FieldType, SchemaInfo, literal_matches_type},
},
model::entity::EntityModel,
value::Value,
};
use std::ops::Bound;
pub(super) fn plan_compare(
model: &EntityModel,
candidate_indexes: &[SemanticIndexAccessContract],
schema: &SchemaInfo,
cmp: &ComparePredicate,
order: Option<&OrderSpec>,
grouped: bool,
) -> AccessPlan<Value> {
if cmp.coercion.id == CoercionId::Strict
&& cmp.field == model.primary_key.name
&& let Some(field_type) = schema.field(model.primary_key.name)
&& let Some(path) = plan_pk_compare(field_type, &cmp.value, cmp.op)
{
return path;
}
match cmp.op {
CompareOp::Eq => {
if !coercion_supports_index_lookup(cmp.coercion.id) {
return AccessPlan::full_scan();
}
if let Some(paths) = index_prefix_for_eq(
model,
candidate_indexes,
schema,
&cmp.field,
&cmp.value,
cmp.coercion.id,
order,
grouped,
) {
return paths;
}
}
CompareOp::In => {
if !coercion_supports_index_lookup(cmp.coercion.id) {
return AccessPlan::full_scan();
}
if let Value::List(items) = &cmp.value {
if items.is_empty() {
return AccessPlan::by_keys(Vec::new());
}
if let Some(paths) = index_multi_lookup_for_in(
model,
candidate_indexes,
schema,
&cmp.field,
items,
cmp.coercion.id,
) {
return AccessPlan::union(paths);
}
}
}
CompareOp::Gt | CompareOp::Gte | CompareOp::Lt | CompareOp::Lte => {
if !coercion_supports_index_lookup(cmp.coercion.id) {
return AccessPlan::full_scan();
}
let Some(field_type) = schema.field(&cmp.field) else {
return AccessPlan::full_scan();
};
if !field_supports_ordered_compare(field_type, cmp.coercion.id) {
return AccessPlan::full_scan();
}
if let Some(path) =
plan_ordered_compare(model, candidate_indexes, schema, cmp, order, grouped)
{
return path;
}
}
CompareOp::StartsWith => {
if !coercion_supports_index_lookup(cmp.coercion.id) {
return AccessPlan::full_scan();
}
if let Some(path) =
plan_starts_with_compare(model, candidate_indexes, schema, cmp, order, grouped)
{
return path;
}
}
_ => {
}
}
AccessPlan::full_scan()
}
const fn coercion_supports_index_lookup(coercion: CoercionId) -> bool {
matches!(coercion, CoercionId::Strict | CoercionId::TextCasefold)
}
const fn field_supports_ordered_compare(field_type: &FieldType, coercion: CoercionId) -> bool {
match coercion {
CoercionId::Strict => field_type.is_orderable(),
CoercionId::TextCasefold => field_type.is_text(),
_ => false,
}
}
fn plan_pk_compare(
field_type: &FieldType,
value: &Value,
op: CompareOp,
) -> Option<AccessPlan<Value>> {
if !field_type.is_keyable() {
return None;
}
match op {
CompareOp::Eq => {
if !literal_matches_type(value, field_type) {
return None;
}
Some(AccessPlan::by_key(value.clone()))
}
CompareOp::In => {
let Value::List(items) = value else {
return None;
};
for item in items {
if !literal_matches_type(item, field_type) {
return None;
}
}
Some(AccessPlan::by_keys(items.clone()))
}
_ => {
None
}
}
}
fn plan_starts_with_compare(
model: &EntityModel,
candidate_indexes: &[SemanticIndexAccessContract],
schema: &SchemaInfo,
cmp: &ComparePredicate,
order: Option<&OrderSpec>,
grouped: bool,
) -> Option<AccessPlan<Value>> {
let field_type = schema.field(&cmp.field)?;
if !field_type.is_text() {
return None;
}
let literal_compatible = index_literal_matches_schema(schema, &cmp.field, &cmp.value);
let mut best: Option<(
AccessCandidateScore,
SemanticIndexAccessContract,
Bound<Value>,
Bound<Value>,
)> = None;
for index in candidate_indexes {
let Some(leading_key_item) = index.key_item_at(0) else {
continue;
};
let Some(prefix) = starts_with_lookup_value_for_key_item(
leading_key_item,
cmp.field.as_str(),
&cmp.value,
cmp.coercion.id,
literal_compatible,
) else {
continue;
};
let (lower, upper) = starts_with_component_bounds(
&prefix,
if matches!(leading_key_item, SemanticIndexKeyItemRef::Expression(_)) {
TextPrefixBoundMode::LowerOnly
} else {
TextPrefixBoundMode::Strict
},
)?;
let score = access_candidate_score_from_index_contract(
model,
order,
index.clone(),
0,
false,
range_bound_count(&lower, &upper),
grouped,
);
match best {
None => best = Some((score, index.clone(), lower, upper)),
Some((best_score, best_index, _, _))
if access_candidate_score_outranks(score, best_score, false)
|| (score == best_score && index.name() < best_index.name()) =>
{
best = Some((score, index.clone(), lower, upper));
}
_ => {}
}
}
best.map(|(_, index, lower, upper)| {
AccessPlan::index_range(SemanticIndexRangeSpec::from_access_contract(
index,
vec![0usize],
Vec::new(),
lower,
upper,
))
})
}
fn plan_ordered_compare(
model: &EntityModel,
candidate_indexes: &[SemanticIndexAccessContract],
schema: &SchemaInfo,
cmp: &ComparePredicate,
order: Option<&OrderSpec>,
grouped: bool,
) -> Option<AccessPlan<Value>> {
let literal_compatible = index_literal_matches_schema(schema, &cmp.field, &cmp.value);
let mut best: Option<(
AccessCandidateScore,
SemanticIndexAccessContract,
Bound<Value>,
Bound<Value>,
)> = None;
for index in candidate_indexes {
let Some(leading_key_item) = index.key_item_at(0) else {
continue;
};
if index.key_arity() != 1 {
continue;
}
let Some(bound_value) = eq_lookup_value_for_key_item(
leading_key_item,
cmp.field.as_str(),
&cmp.value,
cmp.coercion.id,
literal_compatible,
) else {
continue;
};
match leading_key_item {
SemanticIndexKeyItemRef::Field(_) => {
if cmp.coercion.id != CoercionId::Strict
|| !matches!(
index.key_item_at(0),
Some(SemanticIndexKeyItemRef::Field(field))
if field == cmp.field.as_str()
)
|| !field_key_contract_supports_operator(index, cmp.field.as_str(), cmp.op)
{
continue;
}
}
SemanticIndexKeyItemRef::Expression(_) => {
if cmp.coercion.id != CoercionId::TextCasefold {
continue;
}
}
}
let (lower, upper) = match cmp.op {
CompareOp::Gt => (Bound::Excluded(bound_value), Bound::Unbounded),
CompareOp::Gte => (Bound::Included(bound_value), Bound::Unbounded),
CompareOp::Lt => (Bound::Unbounded, Bound::Excluded(bound_value)),
CompareOp::Lte => (Bound::Unbounded, Bound::Included(bound_value)),
_ => unreachable!("ordered compare helper must receive one of Gt/Gte/Lt/Lte"),
};
let score = access_candidate_score_from_index_contract(
model,
order,
index.clone(),
0,
false,
range_bound_count(&lower, &upper),
grouped,
);
match best {
None => best = Some((score, index.clone(), lower, upper)),
Some((best_score, best_index, _, _))
if access_candidate_score_outranks(score, best_score, false)
|| (score == best_score && index.name() < best_index.name()) =>
{
best = Some((score, index.clone(), lower, upper));
}
_ => {}
}
}
best.map(|(_, index, lower, upper)| {
AccessPlan::index_range(SemanticIndexRangeSpec::from_access_contract(
index,
vec![0usize],
Vec::new(),
lower,
upper,
))
})
}
fn field_key_contract_supports_operator(
index_contract: &SemanticIndexAccessContract,
field: &str,
op: CompareOp,
) -> bool {
if index_contract.has_expression_key_items() {
return false;
}
if !contract_contains_field_key(index_contract, field) {
return false;
}
matches!(
op,
CompareOp::Eq
| CompareOp::In
| CompareOp::Gt
| CompareOp::Gte
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::StartsWith
)
}
fn contract_contains_field_key(index_contract: &SemanticIndexAccessContract, field: &str) -> bool {
match index_contract.key_items() {
SemanticIndexKeyItemsRef::Fields(fields) => fields.iter().any(|key_field| key_field == field),
SemanticIndexKeyItemsRef::Static(crate::model::index::IndexKeyItemsRef::Fields(fields)) => {
fields.contains(&field)
}
SemanticIndexKeyItemsRef::Static(crate::model::index::IndexKeyItemsRef::Items(items)) => items.iter().any(|item| {
matches!(item, crate::model::index::IndexKeyItem::Field(key_field) if key_field == &field)
}),
}
}