use crate::{
db::predicate::{CoercionId, CompareOp, ExecutableComparePredicate, ExecutablePredicate},
model::{entity::EntityModel, field::LeafCodec},
value::Value,
};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum ScalarPredicateCapability {
ScalarSafe,
RequiresGenericEvaluation,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum IndexPredicateCapability {
FullyIndexable,
PartiallyIndexable,
RequiresFullScan,
}
#[derive(Clone, Copy, Debug, Default)]
pub(in crate::db) struct PredicateCapabilityContext<'a> {
model: Option<&'static EntityModel>,
index_slots: Option<&'a [usize]>,
}
impl<'a> PredicateCapabilityContext<'a> {
#[must_use]
pub(in crate::db) const fn runtime(model: &'static EntityModel) -> Self {
Self {
model: Some(model),
index_slots: None,
}
}
#[must_use]
pub(in crate::db) const fn index_compile(index_slots: &'a [usize]) -> Self {
Self {
model: None,
index_slots: Some(index_slots),
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) struct PredicateCapabilityProfile {
scalar: ScalarPredicateCapability,
index: IndexPredicateCapability,
}
impl PredicateCapabilityProfile {
#[must_use]
pub(in crate::db) const fn scalar(self) -> ScalarPredicateCapability {
self.scalar
}
#[must_use]
pub(in crate::db) const fn index(self) -> IndexPredicateCapability {
self.index
}
}
#[must_use]
pub(in crate::db) fn classify_predicate_capabilities(
predicate: &ExecutablePredicate,
context: PredicateCapabilityContext<'_>,
) -> PredicateCapabilityProfile {
PredicateCapabilityProfile {
scalar: context.model.map_or(
ScalarPredicateCapability::RequiresGenericEvaluation,
|model| classify_scalar_capability(model, predicate),
),
index: context
.index_slots
.map_or(IndexPredicateCapability::RequiresFullScan, |index_slots| {
classify_index_capability(predicate, index_slots)
}),
}
}
#[must_use]
pub(in crate::db) fn classify_index_compare_component(
cmp: &ExecutableComparePredicate,
index_slots: &[usize],
) -> Option<usize> {
if !compare_is_indexable(cmp, index_slots) {
return None;
}
let field_slot = cmp.field_slot?;
index_slots.iter().position(|slot| *slot == field_slot)
}
fn classify_scalar_capability(
model: &'static EntityModel,
predicate: &ExecutablePredicate,
) -> ScalarPredicateCapability {
if predicate_is_scalar_safe(model, predicate) {
ScalarPredicateCapability::ScalarSafe
} else {
ScalarPredicateCapability::RequiresGenericEvaluation
}
}
fn classify_index_capability(
predicate: &ExecutablePredicate,
index_slots: &[usize],
) -> IndexPredicateCapability {
match predicate {
ExecutablePredicate::True | ExecutablePredicate::False => {
IndexPredicateCapability::FullyIndexable
}
ExecutablePredicate::And(children) => merge_and_index_capability(
children
.iter()
.map(|child| classify_index_capability(child, index_slots)),
),
ExecutablePredicate::Or(children) => {
if children.iter().all(|child| {
classify_index_capability(child, index_slots)
== IndexPredicateCapability::FullyIndexable
}) {
IndexPredicateCapability::FullyIndexable
} else {
IndexPredicateCapability::RequiresFullScan
}
}
ExecutablePredicate::Not(inner) => {
if classify_index_capability(inner, index_slots)
== IndexPredicateCapability::FullyIndexable
{
IndexPredicateCapability::FullyIndexable
} else {
IndexPredicateCapability::RequiresFullScan
}
}
ExecutablePredicate::Compare(cmp) => {
if compare_is_indexable(cmp, index_slots) {
IndexPredicateCapability::FullyIndexable
} else {
IndexPredicateCapability::RequiresFullScan
}
}
ExecutablePredicate::IsNull { .. }
| ExecutablePredicate::IsNotNull { .. }
| ExecutablePredicate::IsMissing { .. }
| ExecutablePredicate::IsEmpty { .. }
| ExecutablePredicate::IsNotEmpty { .. }
| ExecutablePredicate::TextContains { .. }
| ExecutablePredicate::TextContainsCi { .. } => IndexPredicateCapability::RequiresFullScan,
}
}
fn merge_and_index_capability(
children: impl Iterator<Item = IndexPredicateCapability>,
) -> IndexPredicateCapability {
let mut all_full = true;
let mut any_retainable = false;
for capability in children {
match capability {
IndexPredicateCapability::FullyIndexable => {
any_retainable = true;
}
IndexPredicateCapability::PartiallyIndexable => {
all_full = false;
any_retainable = true;
}
IndexPredicateCapability::RequiresFullScan => {
all_full = false;
}
}
}
if all_full {
IndexPredicateCapability::FullyIndexable
} else if any_retainable {
IndexPredicateCapability::PartiallyIndexable
} else {
IndexPredicateCapability::RequiresFullScan
}
}
fn predicate_is_scalar_safe(model: &'static EntityModel, predicate: &ExecutablePredicate) -> bool {
match predicate {
ExecutablePredicate::True
| ExecutablePredicate::False
| ExecutablePredicate::IsMissing { .. } => true,
ExecutablePredicate::And(children) | ExecutablePredicate::Or(children) => children
.iter()
.all(|child| predicate_is_scalar_safe(model, child)),
ExecutablePredicate::Not(inner) => predicate_is_scalar_safe(model, inner),
ExecutablePredicate::Compare(cmp) => compare_is_scalar_safe(model, cmp),
ExecutablePredicate::IsNull { field_slot }
| ExecutablePredicate::IsNotNull { field_slot }
| ExecutablePredicate::IsEmpty { field_slot }
| ExecutablePredicate::IsNotEmpty { field_slot } => {
scalar_field_slot_supported(model, *field_slot)
}
ExecutablePredicate::TextContains { field_slot, value }
| ExecutablePredicate::TextContainsCi { field_slot, value } => {
scalar_field_slot_supported(model, *field_slot) && matches!(value, Value::Text(_))
}
}
}
fn compare_is_scalar_safe(model: &'static EntityModel, cmp: &ExecutableComparePredicate) -> bool {
scalar_field_slot_supported(model, cmp.field_slot)
&& scalar_compare_op_supported(cmp.op)
&& scalar_compare_coercion_supported(cmp.coercion.id)
&& scalar_compare_literal_supported(cmp.op, &cmp.value)
}
fn compare_is_indexable(cmp: &ExecutableComparePredicate, index_slots: &[usize]) -> bool {
if cmp.coercion.id != CoercionId::Strict {
return false;
}
let Some(field_slot) = cmp.field_slot else {
return false;
};
if !index_slots.contains(&field_slot) {
return false;
}
match cmp.op {
CompareOp::Eq
| CompareOp::Ne
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::Gt
| CompareOp::Gte => value_is_index_literal(&cmp.value),
CompareOp::In | CompareOp::NotIn => list_value_is_non_empty_index_literal(&cmp.value),
CompareOp::StartsWith => matches!(&cmp.value, Value::Text(prefix) if !prefix.is_empty()),
CompareOp::Contains | CompareOp::EndsWith => false,
}
}
const fn scalar_compare_op_supported(op: CompareOp) -> bool {
matches!(
op,
CompareOp::Eq
| CompareOp::Ne
| CompareOp::Lt
| CompareOp::Lte
| CompareOp::Gt
| CompareOp::Gte
| CompareOp::In
| CompareOp::NotIn
| CompareOp::StartsWith
| CompareOp::EndsWith
)
}
const fn scalar_compare_coercion_supported(coercion: CoercionId) -> bool {
!matches!(coercion, CoercionId::NumericWiden)
}
fn scalar_field_slot_supported(model: &'static EntityModel, field_slot: Option<usize>) -> bool {
let Some(field_slot) = field_slot else {
return false;
};
let Some(field_model) = model.fields().get(field_slot) else {
return false;
};
matches!(field_model.leaf_codec(), LeafCodec::Scalar(_))
}
fn scalar_compare_literal_supported(op: CompareOp, value: &Value) -> bool {
match op {
CompareOp::In | CompareOp::NotIn => match value {
Value::List(items) => items.iter().all(value_is_scalar_literal_supported),
_ => false,
},
_ => value_is_scalar_literal_supported(value),
}
}
const fn value_is_scalar_literal_supported(value: &Value) -> bool {
matches!(
value,
Value::Null
| Value::Blob(_)
| Value::Bool(_)
| Value::Date(_)
| Value::Duration(_)
| Value::Float32(_)
| Value::Float64(_)
| Value::Int(_)
| Value::Principal(_)
| Value::Subaccount(_)
| Value::Text(_)
| Value::Timestamp(_)
| Value::Uint(_)
| Value::Ulid(_)
| Value::Unit
)
}
const fn value_is_index_literal(value: &Value) -> bool {
matches!(
value,
Value::Blob(_)
| Value::Bool(_)
| Value::Date(_)
| Value::Duration(_)
| Value::Float32(_)
| Value::Float64(_)
| Value::Int(_)
| Value::Principal(_)
| Value::Subaccount(_)
| Value::Text(_)
| Value::Timestamp(_)
| Value::Uint(_)
| Value::Ulid(_)
| Value::Unit
)
}
fn list_value_is_non_empty_index_literal(value: &Value) -> bool {
let Value::List(items) = value else {
return false;
};
!items.is_empty() && items.iter().all(value_is_index_literal)
}
#[cfg(test)]
mod tests {
use crate::{
db::predicate::{
CoercionId, CoercionSpec, CompareOp, ExecutableComparePredicate, ExecutablePredicate,
IndexPredicateCapability, PredicateCapabilityContext, ScalarPredicateCapability,
classify_index_compare_component, classify_predicate_capabilities,
},
model::{
entity::EntityModel,
field::{FieldKind, FieldModel},
},
value::Value,
};
static CAPABILITY_FIELDS: [FieldModel; 3] = [
FieldModel::new("score", FieldKind::Int),
FieldModel::new("name", FieldKind::Text),
FieldModel::new("tags", FieldKind::List(&FieldKind::Text)),
];
static CAPABILITY_MODEL: EntityModel = EntityModel::new(
"PredicateCapabilityEntity",
"PredicateCapabilityEntity",
&CAPABILITY_FIELDS[0],
&CAPABILITY_FIELDS,
&[],
);
#[test]
fn strict_scalar_compare_is_scalar_safe_and_indexable_when_indexed() {
let predicate = ExecutablePredicate::Compare(ExecutableComparePredicate {
field_slot: Some(0),
op: CompareOp::Eq,
value: Value::Int(7),
coercion: CoercionSpec::new(CoercionId::Strict),
});
let profile = classify_predicate_capabilities(
&predicate,
PredicateCapabilityContext {
model: Some(&CAPABILITY_MODEL),
index_slots: Some(&[0]),
},
);
assert_eq!(profile.scalar(), ScalarPredicateCapability::ScalarSafe);
assert_eq!(profile.index(), IndexPredicateCapability::FullyIndexable);
}
#[test]
fn scalar_text_contains_requires_full_scan() {
let predicate = ExecutablePredicate::TextContainsCi {
field_slot: Some(1),
value: Value::Text("alp".to_string()),
};
let profile = classify_predicate_capabilities(
&predicate,
PredicateCapabilityContext {
model: Some(&CAPABILITY_MODEL),
index_slots: Some(&[1]),
},
);
assert_eq!(profile.scalar(), ScalarPredicateCapability::ScalarSafe);
assert_eq!(profile.index(), IndexPredicateCapability::RequiresFullScan);
}
#[test]
fn mixed_and_tree_is_partially_indexable_but_not_fully_indexable() {
let predicate = ExecutablePredicate::And(vec![
ExecutablePredicate::Compare(ExecutableComparePredicate {
field_slot: Some(0),
op: CompareOp::Eq,
value: Value::Int(7),
coercion: CoercionSpec::new(CoercionId::Strict),
}),
ExecutablePredicate::TextContainsCi {
field_slot: Some(1),
value: Value::Text("alp".to_string()),
},
]);
let profile = classify_predicate_capabilities(
&predicate,
PredicateCapabilityContext::index_compile(&[0]),
);
assert_eq!(
profile.index(),
IndexPredicateCapability::PartiallyIndexable
);
}
#[test]
fn index_compare_component_requires_strict_supported_projection() {
let strict = ExecutableComparePredicate {
field_slot: Some(0),
op: CompareOp::In,
value: Value::List(vec![Value::Int(1), Value::Int(2)]),
coercion: CoercionSpec::new(CoercionId::Strict),
};
let non_strict = ExecutableComparePredicate {
field_slot: Some(0),
op: CompareOp::Eq,
value: Value::Int(7),
coercion: CoercionSpec::new(CoercionId::NumericWiden),
};
assert_eq!(classify_index_compare_component(&strict, &[0]), Some(0));
assert_eq!(classify_index_compare_component(&strict, &[1]), None);
assert_eq!(classify_index_compare_component(&non_strict, &[0]), None);
}
}