use crate::db::query::intent::{
StructuralQueryCacheKey,
state::{GroupedIntent, ScalarIntent},
};
#[cfg(feature = "sql")]
use crate::db::query::plan::expr::FieldId;
use crate::{
db::{
access::{AccessPlan, canonical::canonicalize_value_set},
predicate::{CompareOp, MissingRowPolicy, Predicate},
query::{
builder::aggregate::AggregateExpr,
expr::{FilterExpr, SortExpr},
intent::{IntentError, QueryError, QueryIntent},
plan::{
AccessPlannedQuery, AccessPlanningInputs, GroupAggregateSpec, GroupHavingClause,
GroupHavingSymbol, LogicalPlan, OrderSpec, QueryMode, VisibleIndexes,
build_logical_plan, canonicalize_grouped_having_numeric_literal_for_field_kind,
expr::{Expr, ProjectionSelection},
fold_constant_predicate, is_limit_zero_load_window,
logical_query_from_logical_inputs, normalize_query_predicate, plan_query_access,
predicate_is_constant_false, resolve_group_field_slot,
validate_group_query_semantics, validate_order_shape, validate_query_semantics,
},
},
schema::SchemaInfo,
},
model::entity::EntityModel,
traits::FieldValue,
value::Value,
};
#[derive(Clone, Debug)]
pub(crate) struct QueryModel<'m, K> {
model: &'m EntityModel,
intent: QueryIntent<K>,
consistency: MissingRowPolicy,
}
pub(in crate::db) struct PreparedScalarPlanningState<'a> {
schema_info: &'static SchemaInfo,
access_inputs: AccessPlanningInputs<'a>,
normalized_predicate: Option<Predicate>,
}
impl<'a> PreparedScalarPlanningState<'a> {
const fn new(
schema_info: &'static SchemaInfo,
access_inputs: AccessPlanningInputs<'a>,
normalized_predicate: Option<Predicate>,
) -> Self {
Self {
schema_info,
access_inputs,
normalized_predicate,
}
}
#[must_use]
pub(in crate::db) const fn normalized_predicate(&self) -> Option<&Predicate> {
self.normalized_predicate.as_ref()
}
}
impl<'m, K: FieldValue> QueryModel<'m, K> {
#[must_use]
pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
Self {
model,
intent: QueryIntent::new(),
consistency,
}
}
pub(in crate::db) fn structural_cache_key_with_normalized_predicate_fingerprint(
&self,
predicate_fingerprint: Option<[u8; 32]>,
) -> StructuralQueryCacheKey {
StructuralQueryCacheKey::from_query_model_with_normalized_predicate_fingerprint(
self,
predicate_fingerprint,
)
}
#[must_use]
pub(crate) const fn mode(&self) -> QueryMode {
self.intent.mode()
}
#[must_use]
pub(in crate::db::query::intent) const fn model(&self) -> &'m EntityModel {
self.model
}
#[must_use]
pub(in crate::db::query::intent) fn has_explicit_order(&self) -> bool {
self.intent.has_explicit_order()
}
#[must_use]
pub(in crate::db::query::intent) const fn has_grouping(&self) -> bool {
self.intent.is_grouped()
}
#[must_use]
pub(in crate::db::query::intent) const fn scalar_intent_for_cache_key(
&self,
) -> &ScalarIntent<K> {
self.intent.scalar()
}
#[must_use]
pub(in crate::db::query::intent) const fn grouped_intent_for_cache_key(
&self,
) -> Option<&GroupedIntent<K>> {
self.intent.grouped()
}
#[must_use]
pub(in crate::db::query::intent) const fn consistency_for_cache_key(&self) -> MissingRowPolicy {
self.consistency
}
#[must_use]
pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
self.intent.append_predicate(predicate);
self
}
pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
let schema = SchemaInfo::cached_for_entity_model(self.model);
let predicate = expr.lower_with(schema).map_err(QueryError::validate)?;
Ok(self.filter(predicate))
}
pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
let schema = SchemaInfo::cached_for_entity_model(self.model);
let order = expr.lower_with(schema).map_err(QueryError::from)?;
validate_order_shape(Some(&order))
.map_err(IntentError::from)
.map_err(QueryError::from)?;
Ok(self.order_spec(order))
}
#[must_use]
pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
self.intent.push_order_ascending(field.as_ref());
self
}
#[must_use]
pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
self.intent.push_order_descending(field.as_ref());
self
}
pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
self.intent.set_order_spec(order);
self
}
#[must_use]
pub(crate) const fn distinct(mut self) -> Self {
self.intent.set_distinct();
self
}
#[cfg(feature = "sql")]
#[must_use]
pub(crate) fn select_fields<I, S>(mut self, fields: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let fields = fields
.into_iter()
.map(|field| FieldId::new(field.into()))
.collect::<Vec<_>>();
self.intent
.set_projection_selection(ProjectionSelection::Fields(fields));
self
}
#[cfg(feature = "sql")]
#[must_use]
pub(in crate::db::query::intent) fn projection_selection(
mut self,
selection: ProjectionSelection,
) -> Self {
self.intent.set_projection_selection(selection);
self
}
pub(in crate::db::query::intent) fn push_group_field(
mut self,
field: &str,
) -> Result<Self, QueryError> {
let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
self.intent.push_group_field_slot(field_slot);
Ok(self)
}
pub(in crate::db::query::intent) fn push_group_aggregate(
mut self,
aggregate: AggregateExpr,
) -> Self {
self.intent
.push_group_aggregate(GroupAggregateSpec::from_aggregate_expr(&aggregate));
self
}
pub(in crate::db::query::intent) fn grouped_limits(
mut self,
max_groups: u64,
max_group_bytes: u64,
) -> Self {
self.intent.set_grouped_limits(max_groups, max_group_bytes);
self
}
fn push_having_clause(mut self, clause: GroupHavingClause) -> Result<Self, QueryError> {
self.intent
.push_having_clause(clause)
.map_err(QueryError::intent)?;
Ok(self)
}
pub(in crate::db::query::intent) fn push_having_group_clause(
self,
field: &str,
op: CompareOp,
value: Value,
) -> Result<Self, QueryError> {
let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
let value =
canonicalize_grouped_having_numeric_literal_for_field_kind(field_slot.kind(), &value)
.unwrap_or(value);
self.push_having_clause(GroupHavingClause {
symbol: GroupHavingSymbol::GroupField(field_slot),
op,
value,
})
}
pub(in crate::db::query::intent) fn push_having_aggregate_clause(
self,
aggregate_index: usize,
op: CompareOp,
value: Value,
) -> Result<Self, QueryError> {
self.push_having_clause(GroupHavingClause {
symbol: GroupHavingSymbol::AggregateIndex(aggregate_index),
op,
value,
})
}
pub(in crate::db::query::intent) fn push_having_expr(
mut self,
expr: Expr,
) -> Result<Self, QueryError> {
self.intent
.push_having_expr(expr)
.map_err(QueryError::intent)?;
Ok(self)
}
pub(crate) fn by_id(mut self, id: K) -> Self {
self.intent.set_by_id(id);
self
}
pub(crate) fn by_ids<I>(mut self, ids: I) -> Self
where
I: IntoIterator<Item = K>,
{
self.intent.set_by_ids(ids);
self
}
pub(crate) fn only(mut self, id: K) -> Self {
self.intent.set_only(id);
self
}
#[must_use]
pub(crate) fn delete(mut self) -> Self {
self.intent = self.intent.set_delete_mode();
self
}
#[must_use]
pub(crate) fn limit(mut self, limit: u32) -> Self {
self.intent = self.intent.apply_limit(limit);
self
}
#[must_use]
pub(crate) fn offset(mut self, offset: u32) -> Self {
self.intent = self.intent.apply_offset(offset);
self
}
#[inline(never)]
pub(in crate::db::query::intent) fn build_plan_model(
&self,
) -> Result<AccessPlannedQuery, QueryError> {
self.build_plan_model_with_indexes(&VisibleIndexes::schema_owned(self.model.indexes()))
}
#[inline(never)]
pub(in crate::db::query::intent) fn build_plan_model_with_indexes(
&self,
visible_indexes: &VisibleIndexes<'_>,
) -> Result<AccessPlannedQuery, QueryError> {
let planning_state = self.prepare_scalar_planning_state()?;
self.build_plan_model_with_indexes_from_scalar_planning_state(
visible_indexes,
planning_state,
)
}
pub(in crate::db::query::intent) fn build_plan_model_with_indexes_from_scalar_planning_state(
&self,
visible_indexes: &VisibleIndexes<'_>,
planning_state: PreparedScalarPlanningState<'_>,
) -> Result<AccessPlannedQuery, QueryError> {
let PreparedScalarPlanningState {
schema_info,
access_inputs,
normalized_predicate,
} = planning_state;
let access_order = access_inputs.order();
let key_access_override = access_inputs.into_key_access_override();
let access_plan_value = self.plan_access_from_normalized_predicate(
visible_indexes,
schema_info,
normalized_predicate.as_ref(),
access_order,
key_access_override,
)?;
let normalized_predicate = strip_redundant_primary_key_predicate_for_exact_access(
self.model,
&access_plan_value,
normalized_predicate,
);
let logical_inputs = self.intent.planning_logical_inputs();
let logical_query = logical_query_from_logical_inputs(
logical_inputs,
normalized_predicate,
self.consistency,
);
let logical = build_logical_plan(self.model, logical_query);
let mut plan = AccessPlannedQuery::from_parts_with_projection(
logical,
access_plan_value,
self.intent.scalar().projection_selection.clone(),
);
simplify_limit_one_page_for_by_key_access(&mut plan);
plan.finalize_planner_route_profile_for_model(self.model);
self.validate_plan_semantics(schema_info, &plan)?;
plan.finalize_static_planning_shape_for_model(self.model)
.map_err(QueryError::execute)?;
Ok(plan)
}
pub(in crate::db::query::intent) fn prepare_scalar_planning_state(
&self,
) -> Result<PreparedScalarPlanningState<'_>, QueryError> {
let schema_info = SchemaInfo::cached_for_entity_model(self.model);
self.intent.validate_policy_shape()?;
let access_inputs = self.intent.planning_access_inputs();
let normalized_predicate = fold_constant_predicate(normalize_query_predicate(
schema_info,
access_inputs.predicate(),
)?);
Ok(PreparedScalarPlanningState::new(
schema_info,
access_inputs,
normalized_predicate,
))
}
fn plan_access_from_normalized_predicate(
&self,
visible_indexes: &VisibleIndexes<'_>,
schema_info: &SchemaInfo,
normalized_predicate: Option<&Predicate>,
order: Option<&OrderSpec>,
key_access_override: Option<AccessPlan<Value>>,
) -> Result<AccessPlan<Value>, QueryError> {
let limit_zero_window = is_limit_zero_load_window(self.intent.mode());
let constant_false_predicate = predicate_is_constant_false(normalized_predicate);
if limit_zero_window || constant_false_predicate {
return Ok(AccessPlan::by_keys(Vec::new()));
}
plan_query_access(
self.model,
visible_indexes.as_slice(),
schema_info,
normalized_predicate,
order,
key_access_override,
)
.map_err(QueryError::from)
}
fn validate_plan_semantics(
&self,
schema_info: &SchemaInfo,
plan: &AccessPlannedQuery,
) -> Result<(), QueryError> {
if plan.grouped_plan().is_some() {
validate_group_query_semantics(schema_info, self.model, plan)?;
} else {
validate_query_semantics(schema_info, self.model, plan)?;
}
Ok(())
}
}
fn strip_redundant_primary_key_predicate_for_exact_access(
model: &EntityModel,
access: &AccessPlan<Value>,
normalized_predicate: Option<Predicate>,
) -> Option<Predicate> {
let predicate = normalized_predicate?;
if ExactPrimaryKeyAccess::from_access(access)
.is_some_and(|access| access.matches_predicate(&predicate, model.primary_key.name))
{
return None;
}
Some(predicate)
}
enum ExactPrimaryKeyAccess<'a> {
ByKey(&'a Value),
ByKeys(&'a [Value]),
HalfOpenRange { start: &'a Value, end: &'a Value },
}
impl<'a> ExactPrimaryKeyAccess<'a> {
fn from_access(access: &'a AccessPlan<Value>) -> Option<Self> {
if let Some(access_keys) = access.as_path().and_then(|path| path.as_by_keys())
&& !access_keys.is_empty()
{
return Some(Self::ByKeys(access_keys));
}
if let Some(access_key) = access.as_path().and_then(|path| path.as_by_key()) {
return Some(Self::ByKey(access_key));
}
access
.as_primary_key_range_path()
.map(|(start, end)| Self::HalfOpenRange { start, end })
}
fn matches_predicate(self, predicate: &Predicate, primary_key_name: &str) -> bool {
match self {
Self::ByKey(access_key) => {
matches_primary_key_eq_predicate(predicate, primary_key_name, access_key)
}
Self::ByKeys(access_keys) => {
matches_primary_key_in_predicate(predicate, primary_key_name, access_keys)
}
Self::HalfOpenRange { start, end } => {
matches_primary_key_half_open_range(predicate, primary_key_name, start, end)
}
}
}
}
fn matches_primary_key_eq_predicate(
predicate: &Predicate,
primary_key_name: &str,
access_key: &Value,
) -> bool {
let Predicate::Compare(cmp) = predicate else {
return false;
};
cmp.field == primary_key_name && cmp.op == CompareOp::Eq && cmp.value == *access_key
}
fn matches_primary_key_in_predicate(
predicate: &Predicate,
primary_key_name: &str,
access_keys: &[Value],
) -> bool {
let Predicate::Compare(cmp) = predicate else {
return false;
};
if cmp.field != primary_key_name || cmp.op != CompareOp::In {
return false;
}
let Value::List(predicate_keys) = &cmp.value else {
return false;
};
let mut canonical_predicate_keys = predicate_keys.clone();
canonicalize_value_set(&mut canonical_predicate_keys);
canonical_predicate_keys == access_keys
}
fn matches_primary_key_half_open_range(
predicate: &Predicate,
primary_key_name: &str,
start: &Value,
end: &Value,
) -> bool {
let Predicate::And(children) = predicate else {
return false;
};
if children.len() != 2 {
return false;
}
let mut lower_matches = false;
let mut upper_matches = false;
for child in children {
let Predicate::Compare(cmp) = child else {
return false;
};
if cmp.field != primary_key_name {
return false;
}
match cmp.op {
CompareOp::Gte if cmp.value == *start => lower_matches = true,
CompareOp::Lt if cmp.value == *end => upper_matches = true,
_ => return false,
}
}
lower_matches && upper_matches
}
fn simplify_limit_one_page_for_by_key_access(plan: &mut AccessPlannedQuery) {
if !plan
.access
.as_path()
.is_some_and(|path: &crate::db::access::AccessPath<Value>| path.is_by_key())
{
return;
}
let scalar = match &mut plan.logical {
LogicalPlan::Scalar(scalar) => scalar,
LogicalPlan::Grouped(grouped) => &mut grouped.scalar,
};
let Some(page) = scalar.page.as_ref() else {
return;
};
if page.offset != 0 || page.limit != Some(1) {
return;
}
scalar.page = None;
}