use crate::{
db::{
access::{AccessPlan, SemanticIndexRangeSpec},
predicate::{CoercionId, CompareOp, Predicate, canonical_cmp},
query::plan::{
OrderSpec,
key_item_match::{eq_lookup_value_for_key_item, starts_with_lookup_value_for_key_item},
planner::{
AccessCandidateScore, access_candidate_score_outranks,
candidate_satisfies_secondary_order, index_literal_matches_schema,
range::{
CachedCompare, IndexFieldConstraint, RangeConstraint,
bounds::{
merge_range_constraint, merge_range_constraint_bounds,
strict_text_prefix_upper_bound,
},
},
},
},
schema::{SchemaInfo, literal_matches_type},
},
model::{
entity::EntityModel,
index::{IndexKeyItem, IndexKeyItemsRef, IndexModel},
},
value::Value,
};
use std::{cmp::Ordering, ops::Bound};
pub(in crate::db::query::plan::planner) fn primary_key_range_from_and(
model: &EntityModel,
schema: &SchemaInfo,
children: &[Predicate],
) -> Option<AccessPlan<Value>> {
let field_type = schema.field(model.primary_key.name)?;
if !field_type.is_keyable() {
return None;
}
let mut lower = None::<Value>;
let mut upper = None::<Value>;
for child in children {
let Predicate::Compare(cmp) = child else {
return None;
};
if cmp.field != model.primary_key.name || cmp.coercion.id != CoercionId::Strict {
return None;
}
if !literal_matches_type(&cmp.value, field_type) {
return None;
}
match cmp.op {
CompareOp::Gte if lower.is_none() => lower = Some(cmp.value.clone()),
CompareOp::Lt if upper.is_none() => upper = Some(cmp.value.clone()),
_ => return None,
}
}
let (Some(start), Some(end)) = (lower, upper) else {
return None;
};
if canonical_cmp(&start, &end) != Ordering::Less {
return None;
}
Some(AccessPlan::key_range(start, end))
}
pub(in crate::db::query::plan::planner) fn index_range_from_and(
model: &EntityModel,
candidate_indexes: &[&'static IndexModel],
schema: &SchemaInfo,
children: &[Predicate],
order: Option<&OrderSpec>,
) -> Option<SemanticIndexRangeSpec> {
let mut compares = Vec::with_capacity(children.len());
for child in children {
let Predicate::Compare(cmp) = child else {
return None;
};
if !matches!(
cmp.op,
CompareOp::Eq
| CompareOp::Gt
| CompareOp::Gte
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::StartsWith
) {
return None;
}
if !matches!(
(cmp.op, cmp.coercion.id),
(
CompareOp::Eq
| CompareOp::StartsWith
| CompareOp::Gt
| CompareOp::Gte
| CompareOp::Lt
| CompareOp::Lte,
CoercionId::Strict | CoercionId::TextCasefold
)
) {
return None;
}
compares.push(CachedCompare {
cmp,
literal_compatible: index_literal_matches_schema(schema, &cmp.field, &cmp.value),
});
}
let mut best: Option<(
AccessCandidateScore,
&'static IndexModel,
usize,
Vec<Value>,
RangeConstraint,
)> = None;
for index in candidate_indexes {
let Some((range_slot, prefix, range)) =
index_range_candidate_for_index(index, schema, &compares)
else {
continue;
};
let prefix_len = prefix.len();
let score = AccessCandidateScore::new(
prefix_len,
false,
candidate_satisfies_secondary_order(model, order, index, prefix_len),
);
match best {
None => best = Some((score, index, range_slot, prefix, range)),
Some((best_score, best_index, _, _, _))
if access_candidate_score_outranks(score, best_score, false)
|| (score == best_score && index.name() < best_index.name()) =>
{
best = Some((score, index, range_slot, prefix, range));
}
_ => {}
}
}
best.map(|(_, index, range_slot, prefix, range)| {
let field_slots = (0..=range_slot).collect();
SemanticIndexRangeSpec::new(*index, field_slots, prefix, range.lower, range.upper)
})
}
fn index_range_candidate_for_index(
index: &'static IndexModel,
schema: &SchemaInfo,
compares: &[CachedCompare<'_>],
) -> Option<(usize, Vec<Value>, RangeConstraint)> {
match index.key_items() {
IndexKeyItemsRef::Fields(fields) => index_range_candidate_for_key_items(
index,
schema,
fields.iter().copied().map(IndexKeyItem::Field),
compares,
),
IndexKeyItemsRef::Items(items) => {
index_range_candidate_for_key_items(index, schema, items.iter().copied(), compares)
}
}
}
fn index_range_candidate_for_key_items<I>(
index: &'static IndexModel,
schema: &SchemaInfo,
key_items: I,
compares: &[CachedCompare<'_>],
) -> Option<(usize, Vec<Value>, RangeConstraint)>
where
I: IntoIterator<Item = IndexKeyItem>,
{
let mut prefix = Vec::new();
let mut range: Option<RangeConstraint> = None;
let mut range_position = None;
for (position, key_item) in key_items.into_iter().enumerate() {
let constraint = key_item_constraint_for_index_slot(index, schema, key_item, compares)?;
if !consume_index_slot_constraint(
&mut prefix,
&mut range,
&mut range_position,
position,
constraint,
) {
return None;
}
}
let (Some(range_position), Some(range)) = (range_position, range) else {
return None;
};
if prefix.len() >= index.fields().len() {
return None;
}
Some((range_position, prefix, range))
}
fn consume_index_slot_constraint(
prefix: &mut Vec<Value>,
range: &mut Option<RangeConstraint>,
range_position: &mut Option<usize>,
position: usize,
constraint: IndexFieldConstraint,
) -> bool {
match constraint {
IndexFieldConstraint::Eq(value) if range.is_none() => {
prefix.push(value);
true
}
IndexFieldConstraint::Range(candidate) if range.is_none() => {
*range = Some(candidate);
*range_position = Some(position);
true
}
IndexFieldConstraint::None if range.is_none() => false,
IndexFieldConstraint::None => true,
_ => false,
}
}
fn key_item_constraint_for_index_slot(
index: &'static IndexModel,
schema: &SchemaInfo,
key_item: IndexKeyItem,
compares: &[CachedCompare<'_>],
) -> Option<IndexFieldConstraint> {
let mut constraint = IndexFieldConstraint::None;
let field_type = schema.field(key_item.field())?;
for cached in compares {
let cmp = cached.cmp;
if cmp.field.as_str() != key_item.field() {
continue;
}
if matches!(key_item, IndexKeyItem::Field(_))
&& cmp.coercion.id == CoercionId::Strict
&& !field_type.is_orderable()
{
return None;
}
match cmp.op {
CompareOp::Eq => match &constraint {
IndexFieldConstraint::None => {
let Some(candidate) = eq_lookup_value_for_key_item(
key_item,
cmp.field.as_str(),
&cmp.value,
cmp.coercion.id,
cached.literal_compatible,
) else {
continue;
};
constraint = IndexFieldConstraint::Eq(candidate);
}
IndexFieldConstraint::Eq(existing) => {
let Some(candidate) = eq_lookup_value_for_key_item(
key_item,
cmp.field.as_str(),
&cmp.value,
cmp.coercion.id,
cached.literal_compatible,
) else {
continue;
};
if existing != &candidate {
return None;
}
}
IndexFieldConstraint::Range(_) => return None,
},
CompareOp::Gt | CompareOp::Gte | CompareOp::Lt | CompareOp::Lte => {
merge_ordered_compare_constraint_for_key_item(
index,
key_item,
cached,
&mut constraint,
)?;
}
CompareOp::StartsWith => {
let Some(prefix) = starts_with_lookup_value_for_key_item(
key_item,
cmp.field.as_str(),
&cmp.value,
cmp.coercion.id,
cached.literal_compatible,
) else {
continue;
};
let candidate = RangeConstraint {
lower: Bound::Included(Value::Text(prefix.clone())),
upper: match key_item {
IndexKeyItem::Field(_) => strict_text_prefix_upper_bound(&prefix),
IndexKeyItem::Expression(_) => Bound::Unbounded,
},
};
let mut range = match &constraint {
IndexFieldConstraint::None => candidate.clone(),
IndexFieldConstraint::Eq(_) => return None,
IndexFieldConstraint::Range(existing) => existing.clone(),
};
if !merge_range_constraint_bounds(&mut range, &candidate) {
return None;
}
constraint = IndexFieldConstraint::Range(range);
}
_ => return None,
}
}
Some(constraint)
}
fn merge_ordered_compare_constraint_for_key_item(
index: &'static IndexModel,
key_item: IndexKeyItem,
cached: &CachedCompare<'_>,
constraint: &mut IndexFieldConstraint,
) -> Option<()> {
let cmp = cached.cmp;
let candidate = eq_lookup_value_for_key_item(
key_item,
cmp.field.as_str(),
&cmp.value,
cmp.coercion.id,
cached.literal_compatible,
)?;
match key_item {
IndexKeyItem::Field(_) => {
if cmp.coercion.id != CoercionId::Strict
|| !index.is_field_indexable(key_item.field(), cmp.op)
{
return Some(());
}
}
IndexKeyItem::Expression(_) => {
if cmp.coercion.id != CoercionId::TextCasefold {
return Some(());
}
}
}
let mut range = match constraint {
IndexFieldConstraint::None => RangeConstraint::default(),
IndexFieldConstraint::Eq(_) => return None,
IndexFieldConstraint::Range(existing) => existing.clone(),
};
if !merge_range_constraint(&mut range, cmp.op, &candidate) {
return None;
}
*constraint = IndexFieldConstraint::Range(range);
Some(())
}