Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2
3//! Module: query::intent
4//! Responsibility: query intent construction, coercion, and semantic-plan compilation.
5//! Does not own: executor runtime behavior or index storage details.
6//! Boundary: typed/fluent query inputs lowered into validated logical plans.
7
8#[cfg(test)]
9mod tests;
10
11// Key-only access intent and helpers.
12pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
13pub type LoadSpec = crate::db::query::plan::LoadSpec;
14pub type QueryMode = crate::db::query::plan::QueryMode;
15
16use crate::{
17    db::{
18        access::{AccessPath, AccessPlan, AccessPlanError, normalize_access_plan_value},
19        cursor::CursorPlanError,
20        predicate::{
21            CompareOp, MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
22            normalize_enum_literals, reject_unsupported_query_features,
23        },
24        query::{
25            builder::aggregate::AggregateExpr,
26            explain::ExplainPlan,
27            expr::{FilterExpr, SortExpr, SortLowerError},
28            plan::{
29                AccessPlannedQuery, CursorPagingPolicyError, DeleteLimitSpec,
30                FluentLoadPolicyViolation, GroupAggregateSpec, GroupHavingClause, GroupHavingSpec,
31                GroupHavingSymbol, GroupSpec, GroupedExecutionConfig,
32                IntentKeyAccessKind as IntentValidationKeyAccessKind,
33                IntentKeyAccessPolicyViolation, LogicalPlan, OrderDirection, OrderSpec, PageSpec,
34                PlanError, PlannerError, PolicyPlanError, ScalarPlan, has_explicit_order,
35                plan_access, resolve_group_field_slot, validate_group_query_semantics,
36                validate_intent_key_access_policy, validate_intent_plan_shape,
37                validate_order_shape, validate_query_semantics,
38            },
39        },
40        response::ResponseError,
41    },
42    error::{ErrorClass, InternalError},
43    model::entity::EntityModel,
44    traits::{EntityKind, FieldValue, SingletonEntity},
45    value::Value,
46};
47use std::marker::PhantomData;
48use thiserror::Error as ThisError;
49
50///
51/// KeyAccess
52/// Primary-key-only access hints for query planning.
53///
54
55#[derive(Clone, Debug, Eq, PartialEq)]
56pub(crate) enum KeyAccess<K> {
57    Single(K),
58    Many(Vec<K>),
59}
60
61///
62/// KeyAccessKind
63/// Identifies which key-only builder set the access path.
64///
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67pub(crate) enum KeyAccessKind {
68    Single,
69    Many,
70    Only,
71}
72
73///
74/// KeyAccessState
75/// Tracks key-only access plus its origin for intent validation.
76///
77
78#[derive(Clone, Debug, Eq, PartialEq)]
79pub(crate) struct KeyAccessState<K> {
80    pub kind: KeyAccessKind,
81    pub access: KeyAccess<K>,
82}
83
84// Build a model-level access plan for key-only intents.
85pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
86where
87    K: FieldValue,
88{
89    // Phase 1: map typed keys into model-level Value access paths.
90    let plan = match access {
91        KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
92        KeyAccess::Many(keys) => {
93            let values = keys.iter().map(FieldValue::to_value).collect();
94            AccessPlan::path(AccessPath::ByKeys(values))
95        }
96    };
97
98    // Phase 2: canonicalize the access shape via the shared access boundary.
99    normalize_access_plan_value(plan)
100}
101
102// Convert model-level access plans into entity-keyed access plans.
103pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
104    model: &EntityModel,
105    access: AccessPlan<Value>,
106) -> Result<AccessPlan<E::Key>, PlanError> {
107    access.into_executable::<E>(model)
108}
109
110// Convert model-level key values into typed entity keys.
111pub(crate) fn coerce_entity_key<E: EntityKind>(
112    model: &EntityModel,
113    key: &Value,
114) -> Result<E::Key, PlanError> {
115    E::Key::from_value(key).ok_or_else(|| {
116        PlanError::from(AccessPlanError::PrimaryKeyMismatch {
117            field: model.primary_key.name.to_string(),
118            key: key.clone(),
119        })
120    })
121}
122
123impl AccessPlan<Value> {
124    /// Convert model-level access plans into typed executable access plans.
125    pub(crate) fn into_executable<E: EntityKind>(
126        self,
127        model: &EntityModel,
128    ) -> Result<AccessPlan<E::Key>, PlanError> {
129        match self {
130            Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
131            Self::Union(children) => {
132                let mut out = Vec::with_capacity(children.len());
133                for child in children {
134                    out.push(child.into_executable::<E>(model)?);
135                }
136
137                Ok(AccessPlan::union(out))
138            }
139            Self::Intersection(children) => {
140                let mut out = Vec::with_capacity(children.len());
141                for child in children {
142                    out.push(child.into_executable::<E>(model)?);
143                }
144
145                Ok(AccessPlan::intersection(out))
146            }
147        }
148    }
149}
150
151impl AccessPath<Value> {
152    /// Convert one model-level access path into a typed executable access path.
153    pub(crate) fn into_executable<E: EntityKind>(
154        self,
155        model: &EntityModel,
156    ) -> Result<AccessPath<E::Key>, PlanError> {
157        match self {
158            Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
159            Self::ByKeys(keys) => {
160                let mut out = Vec::with_capacity(keys.len());
161                for key in keys {
162                    out.push(coerce_entity_key::<E>(model, &key)?);
163                }
164
165                Ok(AccessPath::ByKeys(out))
166            }
167            Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
168                start: coerce_entity_key::<E>(model, &start)?,
169                end: coerce_entity_key::<E>(model, &end)?,
170            }),
171            Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
172            Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
173            Self::FullScan => Ok(AccessPath::FullScan),
174        }
175    }
176}
177
178///
179/// QueryModel
180///
181/// Model-level query intent and planning context.
182/// Consumes an `EntityModel` derived from typed entity definitions.
183///
184
185#[derive(Debug)]
186pub(crate) struct QueryModel<'m, K> {
187    model: &'m EntityModel,
188    mode: QueryMode,
189    predicate: Option<Predicate>,
190    key_access: Option<KeyAccessState<K>>,
191    key_access_conflict: bool,
192    group: Option<crate::db::query::plan::GroupSpec>,
193    having: Option<GroupHavingSpec>,
194    order: Option<OrderSpec>,
195    distinct: bool,
196    consistency: MissingRowPolicy,
197}
198
199impl<'m, K: FieldValue> QueryModel<'m, K> {
200    #[must_use]
201    pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
202        Self {
203            model,
204            mode: QueryMode::Load(LoadSpec::new()),
205            predicate: None,
206            key_access: None,
207            key_access_conflict: false,
208            group: None,
209            having: None,
210            order: None,
211            distinct: false,
212            consistency,
213        }
214    }
215
216    /// Return the intent mode (load vs delete).
217    #[must_use]
218    pub(crate) const fn mode(&self) -> QueryMode {
219        self.mode
220    }
221
222    #[must_use]
223    fn has_explicit_order(&self) -> bool {
224        has_explicit_order(self.order.as_ref())
225    }
226
227    #[must_use]
228    const fn has_grouping(&self) -> bool {
229        self.group.is_some()
230    }
231
232    #[must_use]
233    const fn load_spec(&self) -> Option<LoadSpec> {
234        match self.mode {
235            QueryMode::Load(spec) => Some(spec),
236            QueryMode::Delete(_) => None,
237        }
238    }
239
240    /// Add a predicate, implicitly AND-ing with any existing predicate.
241    #[must_use]
242    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
243        self.predicate = match self.predicate.take() {
244            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
245            None => Some(predicate),
246        };
247        self
248    }
249
250    /// Apply a dynamic filter expression using the model schema.
251    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
252        let schema = SchemaInfo::from_entity_model(self.model)?;
253        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
254
255        Ok(self.filter(predicate))
256    }
257
258    /// Apply a dynamic sort expression using the model schema.
259    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
260        let schema = SchemaInfo::from_entity_model(self.model)?;
261        let order = match expr.lower_with(&schema) {
262            Ok(order) => order,
263            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
264            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
265        };
266
267        validate_order_shape(Some(&order))
268            .map_err(IntentError::from)
269            .map_err(QueryError::from)?;
270
271        Ok(self.order_spec(order))
272    }
273
274    /// Append an ascending sort key.
275    #[must_use]
276    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
277        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
278        self
279    }
280
281    /// Append a descending sort key.
282    #[must_use]
283    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
284        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
285        self
286    }
287
288    /// Set a fully-specified order spec (validated before reaching this boundary).
289    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
290        self.order = Some(order);
291        self
292    }
293
294    /// Enable DISTINCT semantics for this query intent.
295    #[must_use]
296    pub(crate) const fn distinct(mut self) -> Self {
297        self.distinct = true;
298        self
299    }
300
301    // Resolve one grouped field into one stable field slot and append it to the
302    // grouped spec in declaration order.
303    fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
304        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
305        let group = self.group.get_or_insert(GroupSpec {
306            group_fields: Vec::new(),
307            aggregates: Vec::new(),
308            execution: GroupedExecutionConfig::unbounded(),
309        });
310        if !group
311            .group_fields
312            .iter()
313            .any(|existing| existing.index() == field_slot.index())
314        {
315            group.group_fields.push(field_slot);
316        }
317
318        Ok(self)
319    }
320
321    // Append one grouped aggregate terminal to the grouped declarative spec.
322    fn push_group_aggregate(mut self, aggregate: AggregateExpr) -> Self {
323        let group = self.group.get_or_insert(GroupSpec {
324            group_fields: Vec::new(),
325            aggregates: Vec::new(),
326            execution: GroupedExecutionConfig::unbounded(),
327        });
328        group
329            .aggregates
330            .push(GroupAggregateSpec::from_aggregate_expr(&aggregate));
331
332        self
333    }
334
335    // Override grouped hard limits for this grouped query.
336    fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
337        let group = self.group.get_or_insert(GroupSpec {
338            group_fields: Vec::new(),
339            aggregates: Vec::new(),
340            execution: GroupedExecutionConfig::unbounded(),
341        });
342        group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
343
344        self
345    }
346
347    // Append one grouped HAVING compare clause after GROUP BY terminal declaration.
348    fn push_having_clause(mut self, clause: GroupHavingClause) -> Result<Self, QueryError> {
349        if self.group.is_none() {
350            return Err(QueryError::Intent(IntentError::HavingRequiresGroupBy));
351        }
352
353        let having = self.having.get_or_insert(GroupHavingSpec {
354            clauses: Vec::new(),
355        });
356        having.clauses.push(clause);
357
358        Ok(self)
359    }
360
361    // Append one grouped HAVING clause that references one grouped key field.
362    fn push_having_group_clause(
363        self,
364        field: &str,
365        op: CompareOp,
366        value: Value,
367    ) -> Result<Self, QueryError> {
368        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
369
370        self.push_having_clause(GroupHavingClause {
371            symbol: GroupHavingSymbol::GroupField(field_slot),
372            op,
373            value,
374        })
375    }
376
377    // Append one grouped HAVING clause that references one grouped aggregate output.
378    fn push_having_aggregate_clause(
379        self,
380        aggregate_index: usize,
381        op: CompareOp,
382        value: Value,
383    ) -> Result<Self, QueryError> {
384        self.push_having_clause(GroupHavingClause {
385            symbol: GroupHavingSymbol::AggregateIndex(aggregate_index),
386            op,
387            value,
388        })
389    }
390
391    /// Track key-only access paths and detect conflicting key intents.
392    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
393        if let Some(existing) = &self.key_access
394            && existing.kind != kind
395        {
396            self.key_access_conflict = true;
397        }
398
399        self.key_access = Some(KeyAccessState { kind, access });
400
401        self
402    }
403
404    /// Set the access path to a single primary key lookup.
405    pub(crate) fn by_id(self, id: K) -> Self {
406        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
407    }
408
409    /// Set the access path to a primary key batch lookup.
410    pub(crate) fn by_ids<I>(self, ids: I) -> Self
411    where
412        I: IntoIterator<Item = K>,
413    {
414        self.set_key_access(
415            KeyAccessKind::Many,
416            KeyAccess::Many(ids.into_iter().collect()),
417        )
418    }
419
420    /// Set the access path to the singleton primary key.
421    pub(crate) fn only(self, id: K) -> Self {
422        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
423    }
424
425    /// Mark this intent as a delete query.
426    #[must_use]
427    pub(crate) const fn delete(mut self) -> Self {
428        if self.mode.is_load() {
429            self.mode = QueryMode::Delete(DeleteSpec::new());
430        }
431        self
432    }
433
434    /// Apply a limit to the current mode.
435    ///
436    /// Load limits bound result size; delete limits bound mutation size.
437    #[must_use]
438    pub(crate) const fn limit(mut self, limit: u32) -> Self {
439        match self.mode {
440            QueryMode::Load(mut spec) => {
441                spec.limit = Some(limit);
442                self.mode = QueryMode::Load(spec);
443            }
444            QueryMode::Delete(mut spec) => {
445                spec.limit = Some(limit);
446                self.mode = QueryMode::Delete(spec);
447            }
448        }
449        self
450    }
451
452    /// Apply an offset to a load intent.
453    #[must_use]
454    pub(crate) const fn offset(mut self, offset: u32) -> Self {
455        if let QueryMode::Load(mut spec) = self.mode {
456            spec.offset = offset;
457            self.mode = QueryMode::Load(spec);
458        }
459        self
460    }
461
462    /// Build a model-level logical plan using Value-based access keys.
463    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
464        // Phase 1: schema surface and intent validation.
465        let schema_info = SchemaInfo::from_entity_model(self.model)?;
466        self.validate_intent()?;
467
468        // Phase 2: predicate normalization and access planning.
469        let normalized_predicate = self
470            .predicate
471            .as_ref()
472            .map(|predicate| {
473                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
474                let predicate = normalize_enum_literals(&schema_info, predicate)?;
475                Ok::<Predicate, ValidateError>(normalize(&predicate))
476            })
477            .transpose()?;
478        let access_plan_value = match &self.key_access {
479            Some(state) => access_plan_from_keys_value(&state.access),
480            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
481        };
482
483        // Phase 3: assemble the executor-ready plan.
484        let scalar = ScalarPlan {
485            mode: self.mode,
486            predicate: normalized_predicate,
487            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
488            // This ensures explain/fingerprint/execution share one deterministic order shape.
489            order: canonicalize_order_spec(self.model, self.order.clone()),
490            distinct: self.distinct,
491            delete_limit: match self.mode {
492                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
493                QueryMode::Load(_) => None,
494            },
495            page: match self.mode {
496                QueryMode::Load(spec) => {
497                    if spec.limit.is_some() || spec.offset > 0 {
498                        Some(PageSpec {
499                            limit: spec.limit,
500                            offset: spec.offset,
501                        })
502                    } else {
503                        None
504                    }
505                }
506                QueryMode::Delete(_) => None,
507            },
508            consistency: self.consistency,
509        };
510        let mut plan =
511            AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
512        if let Some(group) = self.group.clone() {
513            plan = match self.having.clone() {
514                Some(having) => plan.into_grouped_with_having(group, Some(having)),
515                None => plan.into_grouped(group),
516            };
517        }
518
519        if plan.grouped_plan().is_some() {
520            validate_group_query_semantics(&schema_info, self.model, &plan)?;
521        } else {
522            validate_query_semantics(&schema_info, self.model, &plan)?;
523        }
524
525        Ok(plan)
526    }
527
528    // Validate pre-plan policy invariants and key-access rules before planning.
529    fn validate_intent(&self) -> Result<(), IntentError> {
530        validate_intent_plan_shape(self.mode, self.order.as_ref()).map_err(IntentError::from)?;
531
532        let key_access_kind = self.key_access.as_ref().map(|state| match state.kind {
533            KeyAccessKind::Single => IntentValidationKeyAccessKind::Single,
534            KeyAccessKind::Many => IntentValidationKeyAccessKind::Many,
535            KeyAccessKind::Only => IntentValidationKeyAccessKind::Only,
536        });
537        validate_intent_key_access_policy(
538            self.key_access_conflict,
539            key_access_kind,
540            self.predicate.is_some(),
541        )
542        .map_err(IntentError::from)?;
543        if self.having.is_some() && self.group.is_none() {
544            return Err(IntentError::HavingRequiresGroupBy);
545        }
546
547        Ok(())
548    }
549}
550
551///
552/// Query
553///
554/// Typed, declarative query intent for a specific entity type.
555///
556/// This intent is:
557/// - schema-agnostic at construction
558/// - normalized and validated only during planning
559/// - free of access-path decisions
560///
561
562///
563/// PlannedQuery
564///
565/// Neutral query-owned planned contract produced by query planning.
566/// Stores logical + access shape without executor compilation state.
567///
568#[derive(Debug)]
569pub struct PlannedQuery<E: EntityKind> {
570    plan: AccessPlannedQuery<E::Key>,
571    _marker: PhantomData<E>,
572}
573
574impl<E: EntityKind> PlannedQuery<E> {
575    #[must_use]
576    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
577        Self {
578            plan,
579            _marker: PhantomData,
580        }
581    }
582
583    #[must_use]
584    pub fn explain(&self) -> ExplainPlan {
585        self.plan.explain_with_model(E::MODEL)
586    }
587}
588
589///
590/// CompiledQuery
591///
592/// Query-owned compiled handoff produced by `Query::plan()`.
593/// This type intentionally carries only logical/access query semantics.
594/// Executor runtime shape is derived explicitly at the executor boundary.
595///
596#[derive(Clone, Debug)]
597pub struct CompiledQuery<E: EntityKind> {
598    plan: AccessPlannedQuery<E::Key>,
599    _marker: PhantomData<E>,
600}
601
602impl<E: EntityKind> CompiledQuery<E> {
603    #[must_use]
604    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
605        Self {
606            plan,
607            _marker: PhantomData,
608        }
609    }
610
611    #[must_use]
612    pub fn explain(&self) -> ExplainPlan {
613        self.plan.explain_with_model(E::MODEL)
614    }
615
616    /// Borrow planner-lowered projection semantics for this compiled query.
617    #[must_use]
618    #[cfg(test)]
619    pub(crate) fn projection_spec(&self) -> crate::db::query::plan::expr::ProjectionSpec {
620        self.plan.projection_spec(E::MODEL)
621    }
622
623    #[must_use]
624    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
625        self.plan
626    }
627}
628
629#[derive(Debug)]
630pub struct Query<E: EntityKind> {
631    intent: QueryModel<'static, E::Key>,
632    _marker: PhantomData<E>,
633}
634
635impl<E: EntityKind> Query<E> {
636    /// Create a new intent with an explicit missing-row policy.
637    /// Ignore favors idempotency and may mask index/data divergence on deletes.
638    /// Use Error to surface missing rows during scan/delete execution.
639    #[must_use]
640    pub const fn new(consistency: MissingRowPolicy) -> Self {
641        Self {
642            intent: QueryModel::new(E::MODEL, consistency),
643            _marker: PhantomData,
644        }
645    }
646
647    /// Return the intent mode (load vs delete).
648    #[must_use]
649    pub const fn mode(&self) -> QueryMode {
650        self.intent.mode()
651    }
652
653    #[must_use]
654    pub(crate) fn has_explicit_order(&self) -> bool {
655        self.intent.has_explicit_order()
656    }
657
658    #[must_use]
659    pub(crate) const fn has_grouping(&self) -> bool {
660        self.intent.has_grouping()
661    }
662
663    #[must_use]
664    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
665        self.intent.load_spec()
666    }
667
668    /// Add a predicate, implicitly AND-ing with any existing predicate.
669    #[must_use]
670    pub fn filter(mut self, predicate: Predicate) -> Self {
671        self.intent = self.intent.filter(predicate);
672        self
673    }
674
675    /// Apply a dynamic filter expression.
676    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
677        let Self { intent, _marker } = self;
678        let intent = intent.filter_expr(expr)?;
679
680        Ok(Self { intent, _marker })
681    }
682
683    /// Apply a dynamic sort expression.
684    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
685        let Self { intent, _marker } = self;
686        let intent = intent.sort_expr(expr)?;
687
688        Ok(Self { intent, _marker })
689    }
690
691    /// Append an ascending sort key.
692    #[must_use]
693    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
694        self.intent = self.intent.order_by(field);
695        self
696    }
697
698    /// Append a descending sort key.
699    #[must_use]
700    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
701        self.intent = self.intent.order_by_desc(field);
702        self
703    }
704
705    /// Enable DISTINCT semantics for this query.
706    #[must_use]
707    pub fn distinct(mut self) -> Self {
708        self.intent = self.intent.distinct();
709        self
710    }
711
712    /// Add one GROUP BY field.
713    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
714        let Self { intent, _marker } = self;
715        let intent = intent.push_group_field(field.as_ref())?;
716
717        Ok(Self { intent, _marker })
718    }
719
720    /// Add one aggregate terminal via composable aggregate expression.
721    #[must_use]
722    pub fn aggregate(mut self, aggregate: AggregateExpr) -> Self {
723        self.intent = self.intent.push_group_aggregate(aggregate);
724        self
725    }
726
727    /// Override grouped hard limits for grouped execution budget enforcement.
728    #[must_use]
729    pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
730        self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
731        self
732    }
733
734    /// Add one grouped HAVING compare clause over one grouped key field.
735    pub fn having_group(
736        self,
737        field: impl AsRef<str>,
738        op: CompareOp,
739        value: Value,
740    ) -> Result<Self, QueryError> {
741        let field = field.as_ref().to_owned();
742        let Self { intent, _marker } = self;
743        let intent = intent.push_having_group_clause(&field, op, value)?;
744
745        Ok(Self { intent, _marker })
746    }
747
748    /// Add one grouped HAVING compare clause over one grouped aggregate output.
749    pub fn having_aggregate(
750        self,
751        aggregate_index: usize,
752        op: CompareOp,
753        value: Value,
754    ) -> Result<Self, QueryError> {
755        let Self { intent, _marker } = self;
756        let intent = intent.push_having_aggregate_clause(aggregate_index, op, value)?;
757
758        Ok(Self { intent, _marker })
759    }
760
761    /// Set the access path to a single primary key lookup.
762    pub(crate) fn by_id(self, id: E::Key) -> Self {
763        let Self { intent, _marker } = self;
764        Self {
765            intent: intent.by_id(id),
766            _marker,
767        }
768    }
769
770    /// Set the access path to a primary key batch lookup.
771    pub(crate) fn by_ids<I>(self, ids: I) -> Self
772    where
773        I: IntoIterator<Item = E::Key>,
774    {
775        let Self { intent, _marker } = self;
776        Self {
777            intent: intent.by_ids(ids),
778            _marker,
779        }
780    }
781
782    /// Mark this intent as a delete query.
783    #[must_use]
784    pub fn delete(mut self) -> Self {
785        self.intent = self.intent.delete();
786        self
787    }
788
789    /// Apply a limit to the current mode.
790    ///
791    /// Load limits bound result size; delete limits bound mutation size.
792    /// For scalar load queries, any use of `limit` or `offset` requires an
793    /// explicit `order_by(...)` so pagination is deterministic.
794    /// GROUP BY queries use canonical grouped-key order by default.
795    #[must_use]
796    pub fn limit(mut self, limit: u32) -> Self {
797        self.intent = self.intent.limit(limit);
798        self
799    }
800
801    /// Apply an offset to a load intent.
802    ///
803    /// Scalar pagination requires an explicit `order_by(...)`.
804    /// GROUP BY queries use canonical grouped-key order by default.
805    #[must_use]
806    pub fn offset(mut self, offset: u32) -> Self {
807        self.intent = self.intent.offset(offset);
808        self
809    }
810
811    /// Explain this intent without executing it.
812    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
813        let plan = self.planned()?;
814
815        Ok(plan.explain())
816    }
817
818    /// Plan this intent into a neutral planned query contract.
819    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
820        let plan = self.build_plan()?;
821        let _projection = plan.projection_spec(E::MODEL);
822
823        Ok(PlannedQuery::new(plan))
824    }
825
826    /// Compile this intent into query-owned handoff state.
827    ///
828    /// This boundary intentionally does not expose executor runtime shape.
829    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
830        let plan = self.build_plan()?;
831        let _projection = plan.projection_spec(E::MODEL);
832
833        Ok(CompiledQuery::new(plan))
834    }
835
836    // Build a logical plan for the current intent.
837    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
838        let plan_value = self.intent.build_plan_model()?;
839        let (logical, access) = plan_value.into_parts();
840        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
841        let plan = AccessPlannedQuery::from_parts(logical, access);
842
843        Ok(plan)
844    }
845}
846
847impl<E> Query<E>
848where
849    E: EntityKind + SingletonEntity,
850    E::Key: Default,
851{
852    /// Set the access path to the singleton primary key.
853    pub(crate) fn only(self) -> Self {
854        let Self { intent, _marker } = self;
855
856        Self {
857            intent: intent.only(E::Key::default()),
858            _marker,
859        }
860    }
861}
862
863///
864/// QueryError
865///
866
867#[derive(Debug, ThisError)]
868pub enum QueryError {
869    #[error("{0}")]
870    Validate(#[from] ValidateError),
871
872    #[error("{0}")]
873    Plan(Box<PlanError>),
874
875    #[error("{0}")]
876    Intent(#[from] IntentError),
877
878    #[error("{0}")]
879    Response(#[from] ResponseError),
880
881    #[error("{0}")]
882    Execute(#[from] QueryExecuteError),
883}
884
885impl QueryError {
886    /// Construct an execution-domain query error from one classified runtime error.
887    pub(crate) fn execute(err: InternalError) -> Self {
888        Self::Execute(QueryExecuteError::from(err))
889    }
890}
891
892///
893/// QueryExecuteError
894///
895
896#[derive(Debug, ThisError)]
897pub enum QueryExecuteError {
898    #[error("{0}")]
899    Corruption(InternalError),
900
901    #[error("{0}")]
902    InvariantViolation(InternalError),
903
904    #[error("{0}")]
905    Conflict(InternalError),
906
907    #[error("{0}")]
908    NotFound(InternalError),
909
910    #[error("{0}")]
911    Unsupported(InternalError),
912
913    #[error("{0}")]
914    Internal(InternalError),
915}
916
917impl QueryExecuteError {
918    #[must_use]
919    /// Borrow the wrapped classified runtime error.
920    pub const fn as_internal(&self) -> &InternalError {
921        match self {
922            Self::Corruption(err)
923            | Self::InvariantViolation(err)
924            | Self::Conflict(err)
925            | Self::NotFound(err)
926            | Self::Unsupported(err)
927            | Self::Internal(err) => err,
928        }
929    }
930}
931
932impl From<InternalError> for QueryExecuteError {
933    fn from(err: InternalError) -> Self {
934        match err.class {
935            ErrorClass::Corruption => Self::Corruption(err),
936            ErrorClass::InvariantViolation => Self::InvariantViolation(err),
937            ErrorClass::Conflict => Self::Conflict(err),
938            ErrorClass::NotFound => Self::NotFound(err),
939            ErrorClass::Unsupported => Self::Unsupported(err),
940            ErrorClass::Internal => Self::Internal(err),
941        }
942    }
943}
944
945impl From<PlannerError> for QueryError {
946    fn from(err: PlannerError) -> Self {
947        match err {
948            PlannerError::Plan(err) => Self::from(*err),
949            PlannerError::Internal(err) => Self::execute(*err),
950        }
951    }
952}
953
954impl From<PlanError> for QueryError {
955    fn from(err: PlanError) -> Self {
956        Self::Plan(Box::new(err))
957    }
958}
959
960///
961/// IntentError
962///
963
964#[derive(Clone, Copy, Debug, ThisError)]
965pub enum IntentError {
966    #[error("{0}")]
967    PlanShape(#[from] PolicyPlanError),
968
969    #[error("by_ids() cannot be combined with predicates")]
970    ByIdsWithPredicate,
971
972    #[error("only() cannot be combined with predicates")]
973    OnlyWithPredicate,
974
975    #[error("multiple key access methods were used on the same query")]
976    KeyAccessConflict,
977
978    #[error(
979        "{message}",
980        message = CursorPlanError::cursor_requires_order_message()
981    )]
982    CursorRequiresOrder,
983
984    #[error(
985        "{message}",
986        message = CursorPlanError::cursor_requires_limit_message()
987    )]
988    CursorRequiresLimit,
989
990    #[error("cursor tokens can only be used with .page().execute()")]
991    CursorRequiresPagedExecution,
992
993    #[error("grouped queries require execute_grouped(...)")]
994    GroupedRequiresExecuteGrouped,
995
996    #[error("HAVING requires GROUP BY")]
997    HavingRequiresGroupBy,
998}
999
1000impl From<CursorPagingPolicyError> for IntentError {
1001    fn from(err: CursorPagingPolicyError) -> Self {
1002        match err {
1003            CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
1004            CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
1005        }
1006    }
1007}
1008
1009impl From<IntentKeyAccessPolicyViolation> for IntentError {
1010    fn from(err: IntentKeyAccessPolicyViolation) -> Self {
1011        match err {
1012            IntentKeyAccessPolicyViolation::KeyAccessConflict => Self::KeyAccessConflict,
1013            IntentKeyAccessPolicyViolation::ByIdsWithPredicate => Self::ByIdsWithPredicate,
1014            IntentKeyAccessPolicyViolation::OnlyWithPredicate => Self::OnlyWithPredicate,
1015        }
1016    }
1017}
1018
1019impl From<FluentLoadPolicyViolation> for IntentError {
1020    fn from(err: FluentLoadPolicyViolation) -> Self {
1021        match err {
1022            FluentLoadPolicyViolation::CursorRequiresPagedExecution => {
1023                Self::CursorRequiresPagedExecution
1024            }
1025            FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped => {
1026                Self::GroupedRequiresExecuteGrouped
1027            }
1028            FluentLoadPolicyViolation::CursorRequiresOrder => Self::CursorRequiresOrder,
1029            FluentLoadPolicyViolation::CursorRequiresLimit => Self::CursorRequiresLimit,
1030        }
1031    }
1032}
1033
1034/// Helper to append an ordering field while preserving existing order spec.
1035fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
1036    match order {
1037        Some(mut spec) => {
1038            spec.fields.push((field.to_string(), direction));
1039            spec
1040        }
1041        None => OrderSpec {
1042            fields: vec![(field.to_string(), direction)],
1043        },
1044    }
1045}
1046
1047// Normalize ORDER BY into a canonical, deterministic shape:
1048// - preserve user field order
1049// - remove explicit primary-key references from the user segment
1050// - append exactly one primary-key field as the terminal tie-break
1051fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
1052    let mut order = order?;
1053    if order.fields.is_empty() {
1054        return Some(order);
1055    }
1056
1057    let pk_field = model.primary_key.name;
1058    let mut pk_direction = None;
1059    order.fields.retain(|(field, direction)| {
1060        if field == pk_field {
1061            if pk_direction.is_none() {
1062                pk_direction = Some(*direction);
1063            }
1064            false
1065        } else {
1066            true
1067        }
1068    });
1069
1070    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
1071    order.fields.push((pk_field.to_string(), pk_direction));
1072
1073    Some(order)
1074}