Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2
3//! Module: query::intent
4//! Responsibility: query intent construction, coercion, and semantic-plan compilation.
5//! Does not own: executor runtime behavior or index storage details.
6//! Boundary: typed/fluent query inputs lowered into validated logical plans.
7
8#[cfg(test)]
9mod tests;
10
11// Key-only access intent and helpers.
12pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
13pub type LoadSpec = crate::db::query::plan::LoadSpec;
14pub type QueryMode = crate::db::query::plan::QueryMode;
15
16use crate::{
17    db::{
18        access::{AccessPath, AccessPlan, AccessPlanError, canonical_by_keys_path},
19        predicate::{
20            CompareOp, MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
21            normalize_enum_literals, reject_unsupported_query_features,
22        },
23        query::{
24            builder::aggregate::AggregateExpr,
25            explain::ExplainPlan,
26            expr::{FilterExpr, SortExpr, SortLowerError},
27            plan::{
28                AccessPlannedQuery, CursorPagingPolicyError, DeleteLimitSpec,
29                FluentLoadPolicyViolation, GroupAggregateKind, GroupAggregateSpec,
30                GroupHavingClause, GroupHavingSpec, GroupHavingSymbol, GroupSpec,
31                GroupedExecutionConfig, IntentKeyAccessKind as IntentValidationKeyAccessKind,
32                IntentKeyAccessPolicyViolation, LogicalPlan, OrderDirection, OrderSpec, PageSpec,
33                PlanError, PlannerError, PolicyPlanError, ScalarPlan, has_explicit_order,
34                plan_access, resolve_group_field_slot, validate_group_query_semantics,
35                validate_intent_key_access_policy, validate_intent_plan_shape,
36                validate_order_shape, validate_query_semantics,
37            },
38        },
39        response::ResponseError,
40    },
41    error::InternalError,
42    model::entity::EntityModel,
43    traits::{EntityKind, FieldValue, SingletonEntity},
44    value::Value,
45};
46use std::marker::PhantomData;
47use thiserror::Error as ThisError;
48
49///
50/// KeyAccess
51/// Primary-key-only access hints for query planning.
52///
53
54#[derive(Clone, Debug, Eq, PartialEq)]
55pub(crate) enum KeyAccess<K> {
56    Single(K),
57    Many(Vec<K>),
58}
59
60///
61/// KeyAccessKind
62/// Identifies which key-only builder set the access path.
63///
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub(crate) enum KeyAccessKind {
67    Single,
68    Many,
69    Only,
70}
71
72///
73/// KeyAccessState
74/// Tracks key-only access plus its origin for intent validation.
75///
76
77#[derive(Clone, Debug, Eq, PartialEq)]
78pub(crate) struct KeyAccessState<K> {
79    pub kind: KeyAccessKind,
80    pub access: KeyAccess<K>,
81}
82
83// Build a model-level access plan for key-only intents.
84pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
85where
86    K: FieldValue,
87{
88    // Phase 1: map typed keys into model-level Value access paths.
89    match access {
90        KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
91        KeyAccess::Many(keys) => {
92            let values = keys.iter().map(FieldValue::to_value).collect();
93            AccessPlan::path(canonical_by_keys_path(values))
94        }
95    }
96}
97
98// Convert model-level access plans into entity-keyed access plans.
99pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
100    model: &EntityModel,
101    access: AccessPlan<Value>,
102) -> Result<AccessPlan<E::Key>, PlanError> {
103    access.into_executable::<E>(model)
104}
105
106// Convert model-level key values into typed entity keys.
107pub(crate) fn coerce_entity_key<E: EntityKind>(
108    model: &EntityModel,
109    key: &Value,
110) -> Result<E::Key, PlanError> {
111    E::Key::from_value(key).ok_or_else(|| {
112        PlanError::from(AccessPlanError::PrimaryKeyMismatch {
113            field: model.primary_key.name.to_string(),
114            key: key.clone(),
115        })
116    })
117}
118
119impl AccessPlan<Value> {
120    /// Convert model-level access plans into typed executable access plans.
121    pub(crate) fn into_executable<E: EntityKind>(
122        self,
123        model: &EntityModel,
124    ) -> Result<AccessPlan<E::Key>, PlanError> {
125        match self {
126            Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
127            Self::Union(children) => {
128                let mut out = Vec::with_capacity(children.len());
129                for child in children {
130                    out.push(child.into_executable::<E>(model)?);
131                }
132
133                Ok(AccessPlan::union(out))
134            }
135            Self::Intersection(children) => {
136                let mut out = Vec::with_capacity(children.len());
137                for child in children {
138                    out.push(child.into_executable::<E>(model)?);
139                }
140
141                Ok(AccessPlan::intersection(out))
142            }
143        }
144    }
145}
146
147impl AccessPath<Value> {
148    /// Convert one model-level access path into a typed executable access path.
149    pub(crate) fn into_executable<E: EntityKind>(
150        self,
151        model: &EntityModel,
152    ) -> Result<AccessPath<E::Key>, PlanError> {
153        match self {
154            Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
155            Self::ByKeys(keys) => {
156                let mut out = Vec::with_capacity(keys.len());
157                for key in keys {
158                    out.push(coerce_entity_key::<E>(model, &key)?);
159                }
160
161                Ok(AccessPath::ByKeys(out))
162            }
163            Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
164                start: coerce_entity_key::<E>(model, &start)?,
165                end: coerce_entity_key::<E>(model, &end)?,
166            }),
167            Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
168            Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
169            Self::FullScan => Ok(AccessPath::FullScan),
170        }
171    }
172}
173
174///
175/// QueryModel
176///
177/// Model-level query intent and planning context.
178/// Consumes an `EntityModel` derived from typed entity definitions.
179///
180
181#[derive(Debug)]
182pub(crate) struct QueryModel<'m, K> {
183    model: &'m EntityModel,
184    mode: QueryMode,
185    predicate: Option<Predicate>,
186    key_access: Option<KeyAccessState<K>>,
187    key_access_conflict: bool,
188    group: Option<crate::db::query::plan::GroupSpec>,
189    having: Option<GroupHavingSpec>,
190    order: Option<OrderSpec>,
191    distinct: bool,
192    consistency: MissingRowPolicy,
193}
194
195impl<'m, K: FieldValue> QueryModel<'m, K> {
196    #[must_use]
197    pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
198        Self {
199            model,
200            mode: QueryMode::Load(LoadSpec::new()),
201            predicate: None,
202            key_access: None,
203            key_access_conflict: false,
204            group: None,
205            having: None,
206            order: None,
207            distinct: false,
208            consistency,
209        }
210    }
211
212    /// Return the intent mode (load vs delete).
213    #[must_use]
214    pub(crate) const fn mode(&self) -> QueryMode {
215        self.mode
216    }
217
218    #[must_use]
219    fn has_explicit_order(&self) -> bool {
220        has_explicit_order(self.order.as_ref())
221    }
222
223    #[must_use]
224    const fn has_grouping(&self) -> bool {
225        self.group.is_some()
226    }
227
228    #[must_use]
229    const fn load_spec(&self) -> Option<LoadSpec> {
230        match self.mode {
231            QueryMode::Load(spec) => Some(spec),
232            QueryMode::Delete(_) => None,
233        }
234    }
235
236    /// Add a predicate, implicitly AND-ing with any existing predicate.
237    #[must_use]
238    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
239        self.predicate = match self.predicate.take() {
240            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
241            None => Some(predicate),
242        };
243        self
244    }
245
246    /// Apply a dynamic filter expression using the model schema.
247    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
248        let schema = SchemaInfo::from_entity_model(self.model)?;
249        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
250
251        Ok(self.filter(predicate))
252    }
253
254    /// Apply a dynamic sort expression using the model schema.
255    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
256        let schema = SchemaInfo::from_entity_model(self.model)?;
257        let order = match expr.lower_with(&schema) {
258            Ok(order) => order,
259            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
260            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
261        };
262
263        validate_order_shape(Some(&order))
264            .map_err(IntentError::from)
265            .map_err(QueryError::from)?;
266
267        Ok(self.order_spec(order))
268    }
269
270    /// Append an ascending sort key.
271    #[must_use]
272    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
273        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
274        self
275    }
276
277    /// Append a descending sort key.
278    #[must_use]
279    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
280        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
281        self
282    }
283
284    /// Set a fully-specified order spec (validated before reaching this boundary).
285    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
286        self.order = Some(order);
287        self
288    }
289
290    /// Enable DISTINCT semantics for this query intent.
291    #[must_use]
292    pub(crate) const fn distinct(mut self) -> Self {
293        self.distinct = true;
294        self
295    }
296
297    // Resolve one grouped field into one stable field slot and append it to the
298    // grouped spec in declaration order.
299    fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
300        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
301        let group = self.group.get_or_insert(GroupSpec {
302            group_fields: Vec::new(),
303            aggregates: Vec::new(),
304            execution: GroupedExecutionConfig::unbounded(),
305        });
306        if !group
307            .group_fields
308            .iter()
309            .any(|existing| existing.index() == field_slot.index())
310        {
311            group.group_fields.push(field_slot);
312        }
313
314        Ok(self)
315    }
316
317    // Append one grouped aggregate terminal to the grouped declarative spec.
318    fn push_group_aggregate(
319        mut self,
320        kind: GroupAggregateKind,
321        target_field: Option<String>,
322        distinct: bool,
323    ) -> Self {
324        let group = self.group.get_or_insert(GroupSpec {
325            group_fields: Vec::new(),
326            aggregates: Vec::new(),
327            execution: GroupedExecutionConfig::unbounded(),
328        });
329        group.aggregates.push(GroupAggregateSpec {
330            kind,
331            target_field,
332            distinct,
333        });
334
335        self
336    }
337
338    // Override grouped hard limits for this grouped query.
339    fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
340        let group = self.group.get_or_insert(GroupSpec {
341            group_fields: Vec::new(),
342            aggregates: Vec::new(),
343            execution: GroupedExecutionConfig::unbounded(),
344        });
345        group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
346
347        self
348    }
349
350    // Append one grouped HAVING compare clause after GROUP BY terminal declaration.
351    fn push_having_clause(mut self, clause: GroupHavingClause) -> Result<Self, QueryError> {
352        if self.group.is_none() {
353            return Err(QueryError::Intent(IntentError::HavingRequiresGroupBy));
354        }
355
356        let having = self.having.get_or_insert(GroupHavingSpec {
357            clauses: Vec::new(),
358        });
359        having.clauses.push(clause);
360
361        Ok(self)
362    }
363
364    // Append one grouped HAVING clause that references one grouped key field.
365    fn push_having_group_clause(
366        self,
367        field: &str,
368        op: CompareOp,
369        value: Value,
370    ) -> Result<Self, QueryError> {
371        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
372
373        self.push_having_clause(GroupHavingClause {
374            symbol: GroupHavingSymbol::GroupField(field_slot),
375            op,
376            value,
377        })
378    }
379
380    // Append one grouped HAVING clause that references one grouped aggregate output.
381    fn push_having_aggregate_clause(
382        self,
383        aggregate_index: usize,
384        op: CompareOp,
385        value: Value,
386    ) -> Result<Self, QueryError> {
387        self.push_having_clause(GroupHavingClause {
388            symbol: GroupHavingSymbol::AggregateIndex(aggregate_index),
389            op,
390            value,
391        })
392    }
393
394    /// Track key-only access paths and detect conflicting key intents.
395    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
396        if let Some(existing) = &self.key_access
397            && existing.kind != kind
398        {
399            self.key_access_conflict = true;
400        }
401
402        self.key_access = Some(KeyAccessState { kind, access });
403
404        self
405    }
406
407    /// Set the access path to a single primary key lookup.
408    pub(crate) fn by_id(self, id: K) -> Self {
409        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
410    }
411
412    /// Set the access path to a primary key batch lookup.
413    pub(crate) fn by_ids<I>(self, ids: I) -> Self
414    where
415        I: IntoIterator<Item = K>,
416    {
417        self.set_key_access(
418            KeyAccessKind::Many,
419            KeyAccess::Many(ids.into_iter().collect()),
420        )
421    }
422
423    /// Set the access path to the singleton primary key.
424    pub(crate) fn only(self, id: K) -> Self {
425        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
426    }
427
428    /// Mark this intent as a delete query.
429    #[must_use]
430    pub(crate) const fn delete(mut self) -> Self {
431        if self.mode.is_load() {
432            self.mode = QueryMode::Delete(DeleteSpec::new());
433        }
434        self
435    }
436
437    /// Apply a limit to the current mode.
438    ///
439    /// Load limits bound result size; delete limits bound mutation size.
440    #[must_use]
441    pub(crate) const fn limit(mut self, limit: u32) -> Self {
442        match self.mode {
443            QueryMode::Load(mut spec) => {
444                spec.limit = Some(limit);
445                self.mode = QueryMode::Load(spec);
446            }
447            QueryMode::Delete(mut spec) => {
448                spec.limit = Some(limit);
449                self.mode = QueryMode::Delete(spec);
450            }
451        }
452        self
453    }
454
455    /// Apply an offset to a load intent.
456    #[must_use]
457    pub(crate) const fn offset(mut self, offset: u32) -> Self {
458        if let QueryMode::Load(mut spec) = self.mode {
459            spec.offset = offset;
460            self.mode = QueryMode::Load(spec);
461        }
462        self
463    }
464
465    /// Build a model-level logical plan using Value-based access keys.
466    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
467        // Phase 1: schema surface and intent validation.
468        let schema_info = SchemaInfo::from_entity_model(self.model)?;
469        self.validate_intent()?;
470
471        // Phase 2: predicate normalization and access planning.
472        let normalized_predicate = self
473            .predicate
474            .as_ref()
475            .map(|predicate| {
476                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
477                let predicate = normalize_enum_literals(&schema_info, predicate)?;
478                Ok::<Predicate, ValidateError>(normalize(&predicate))
479            })
480            .transpose()?;
481        let access_plan_value = match &self.key_access {
482            Some(state) => access_plan_from_keys_value(&state.access),
483            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
484        };
485
486        // Phase 3: assemble the executor-ready plan.
487        let scalar = ScalarPlan {
488            mode: self.mode,
489            predicate: normalized_predicate,
490            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
491            // This ensures explain/fingerprint/execution share one deterministic order shape.
492            order: canonicalize_order_spec(self.model, self.order.clone()),
493            distinct: self.distinct,
494            delete_limit: match self.mode {
495                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
496                QueryMode::Load(_) => None,
497            },
498            page: match self.mode {
499                QueryMode::Load(spec) => {
500                    if spec.limit.is_some() || spec.offset > 0 {
501                        Some(PageSpec {
502                            limit: spec.limit,
503                            offset: spec.offset,
504                        })
505                    } else {
506                        None
507                    }
508                }
509                QueryMode::Delete(_) => None,
510            },
511            consistency: self.consistency,
512        };
513        let mut plan =
514            AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
515        if let Some(group) = self.group.clone() {
516            plan = match self.having.clone() {
517                Some(having) => plan.into_grouped_with_having(group, Some(having)),
518                None => plan.into_grouped(group),
519            };
520        }
521
522        if plan.grouped_plan().is_some() {
523            validate_group_query_semantics(&schema_info, self.model, &plan)?;
524        } else {
525            validate_query_semantics(&schema_info, self.model, &plan)?;
526        }
527
528        Ok(plan)
529    }
530
531    // Validate pre-plan policy invariants and key-access rules before planning.
532    fn validate_intent(&self) -> Result<(), IntentError> {
533        validate_intent_plan_shape(self.mode, self.order.as_ref()).map_err(IntentError::from)?;
534
535        let key_access_kind = self.key_access.as_ref().map(|state| match state.kind {
536            KeyAccessKind::Single => IntentValidationKeyAccessKind::Single,
537            KeyAccessKind::Many => IntentValidationKeyAccessKind::Many,
538            KeyAccessKind::Only => IntentValidationKeyAccessKind::Only,
539        });
540        validate_intent_key_access_policy(
541            self.key_access_conflict,
542            key_access_kind,
543            self.predicate.is_some(),
544        )
545        .map_err(IntentError::from)?;
546        if self.having.is_some() && self.group.is_none() {
547            return Err(IntentError::HavingRequiresGroupBy);
548        }
549
550        Ok(())
551    }
552}
553
554///
555/// Query
556///
557/// Typed, declarative query intent for a specific entity type.
558///
559/// This intent is:
560/// - schema-agnostic at construction
561/// - normalized and validated only during planning
562/// - free of access-path decisions
563///
564
565///
566/// PlannedQuery
567///
568/// Neutral query-owned planned contract produced by query planning.
569/// Stores logical + access shape without executor compilation state.
570///
571#[derive(Debug)]
572pub struct PlannedQuery<E: EntityKind> {
573    plan: AccessPlannedQuery<E::Key>,
574    _marker: PhantomData<E>,
575}
576
577impl<E: EntityKind> PlannedQuery<E> {
578    #[must_use]
579    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
580        Self {
581            plan,
582            _marker: PhantomData,
583        }
584    }
585
586    #[must_use]
587    pub fn explain(&self) -> ExplainPlan {
588        self.plan.explain_with_model(E::MODEL)
589    }
590}
591
592///
593/// CompiledQuery
594///
595/// Query-owned compiled handoff produced by `Query::plan()`.
596/// This type intentionally carries only logical/access query semantics.
597/// Executor runtime shape is derived explicitly at the executor boundary.
598///
599#[derive(Clone, Debug)]
600pub struct CompiledQuery<E: EntityKind> {
601    plan: AccessPlannedQuery<E::Key>,
602    _marker: PhantomData<E>,
603}
604
605impl<E: EntityKind> CompiledQuery<E> {
606    #[must_use]
607    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
608        Self {
609            plan,
610            _marker: PhantomData,
611        }
612    }
613
614    #[must_use]
615    pub fn explain(&self) -> ExplainPlan {
616        self.plan.explain_with_model(E::MODEL)
617    }
618
619    #[must_use]
620    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
621        self.plan
622    }
623}
624
625#[derive(Debug)]
626pub struct Query<E: EntityKind> {
627    intent: QueryModel<'static, E::Key>,
628    _marker: PhantomData<E>,
629}
630
631impl<E: EntityKind> Query<E> {
632    /// Create a new intent with an explicit missing-row policy.
633    /// Ignore favors idempotency and may mask index/data divergence on deletes.
634    /// Use Error to surface missing rows during scan/delete execution.
635    #[must_use]
636    pub const fn new(consistency: MissingRowPolicy) -> Self {
637        Self {
638            intent: QueryModel::new(E::MODEL, consistency),
639            _marker: PhantomData,
640        }
641    }
642
643    /// Return the intent mode (load vs delete).
644    #[must_use]
645    pub const fn mode(&self) -> QueryMode {
646        self.intent.mode()
647    }
648
649    #[must_use]
650    pub(crate) fn has_explicit_order(&self) -> bool {
651        self.intent.has_explicit_order()
652    }
653
654    #[must_use]
655    pub(crate) const fn has_grouping(&self) -> bool {
656        self.intent.has_grouping()
657    }
658
659    #[must_use]
660    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
661        self.intent.load_spec()
662    }
663
664    /// Add a predicate, implicitly AND-ing with any existing predicate.
665    #[must_use]
666    pub fn filter(mut self, predicate: Predicate) -> Self {
667        self.intent = self.intent.filter(predicate);
668        self
669    }
670
671    /// Apply a dynamic filter expression.
672    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
673        let Self { intent, _marker } = self;
674        let intent = intent.filter_expr(expr)?;
675
676        Ok(Self { intent, _marker })
677    }
678
679    /// Apply a dynamic sort expression.
680    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
681        let Self { intent, _marker } = self;
682        let intent = intent.sort_expr(expr)?;
683
684        Ok(Self { intent, _marker })
685    }
686
687    /// Append an ascending sort key.
688    #[must_use]
689    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
690        self.intent = self.intent.order_by(field);
691        self
692    }
693
694    /// Append a descending sort key.
695    #[must_use]
696    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
697        self.intent = self.intent.order_by_desc(field);
698        self
699    }
700
701    /// Enable DISTINCT semantics for this query.
702    #[must_use]
703    pub fn distinct(mut self) -> Self {
704        self.intent = self.intent.distinct();
705        self
706    }
707
708    /// Add one GROUP BY field.
709    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
710        let Self { intent, _marker } = self;
711        let intent = intent.push_group_field(field.as_ref())?;
712
713        Ok(Self { intent, _marker })
714    }
715
716    /// Add one aggregate terminal via composable aggregate expression.
717    #[must_use]
718    pub fn aggregate(mut self, aggregate: AggregateExpr) -> Self {
719        self.intent = self.intent.push_group_aggregate(
720            aggregate.kind(),
721            aggregate.target_field().map(str::to_string),
722            aggregate.is_distinct(),
723        );
724        self
725    }
726
727    /// Override grouped hard limits for grouped execution budget enforcement.
728    #[must_use]
729    pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
730        self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
731        self
732    }
733
734    /// Add one grouped HAVING compare clause over one grouped key field.
735    pub fn having_group(
736        self,
737        field: impl AsRef<str>,
738        op: CompareOp,
739        value: Value,
740    ) -> Result<Self, QueryError> {
741        let field = field.as_ref().to_owned();
742        let Self { intent, _marker } = self;
743        let intent = intent.push_having_group_clause(&field, op, value)?;
744
745        Ok(Self { intent, _marker })
746    }
747
748    /// Add one grouped HAVING compare clause over one grouped aggregate output.
749    pub fn having_aggregate(
750        self,
751        aggregate_index: usize,
752        op: CompareOp,
753        value: Value,
754    ) -> Result<Self, QueryError> {
755        let Self { intent, _marker } = self;
756        let intent = intent.push_having_aggregate_clause(aggregate_index, op, value)?;
757
758        Ok(Self { intent, _marker })
759    }
760
761    /// Set the access path to a single primary key lookup.
762    pub(crate) fn by_id(self, id: E::Key) -> Self {
763        let Self { intent, _marker } = self;
764        Self {
765            intent: intent.by_id(id),
766            _marker,
767        }
768    }
769
770    /// Set the access path to a primary key batch lookup.
771    pub(crate) fn by_ids<I>(self, ids: I) -> Self
772    where
773        I: IntoIterator<Item = E::Key>,
774    {
775        let Self { intent, _marker } = self;
776        Self {
777            intent: intent.by_ids(ids),
778            _marker,
779        }
780    }
781
782    /// Mark this intent as a delete query.
783    #[must_use]
784    pub fn delete(mut self) -> Self {
785        self.intent = self.intent.delete();
786        self
787    }
788
789    /// Apply a limit to the current mode.
790    ///
791    /// Load limits bound result size; delete limits bound mutation size.
792    /// For scalar load queries, any use of `limit` or `offset` requires an
793    /// explicit `order_by(...)` so pagination is deterministic.
794    /// GROUP BY queries use canonical grouped-key order by default.
795    #[must_use]
796    pub fn limit(mut self, limit: u32) -> Self {
797        self.intent = self.intent.limit(limit);
798        self
799    }
800
801    /// Apply an offset to a load intent.
802    ///
803    /// Scalar pagination requires an explicit `order_by(...)`.
804    /// GROUP BY queries use canonical grouped-key order by default.
805    #[must_use]
806    pub fn offset(mut self, offset: u32) -> Self {
807        self.intent = self.intent.offset(offset);
808        self
809    }
810
811    /// Explain this intent without executing it.
812    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
813        let plan = self.planned()?;
814
815        Ok(plan.explain())
816    }
817
818    /// Plan this intent into a neutral planned query contract.
819    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
820        let plan = self.build_plan()?;
821
822        Ok(PlannedQuery::new(plan))
823    }
824
825    /// Compile this intent into query-owned handoff state.
826    ///
827    /// This boundary intentionally does not expose executor runtime shape.
828    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
829        let plan = self.build_plan()?;
830
831        Ok(CompiledQuery::new(plan))
832    }
833
834    // Build a logical plan for the current intent.
835    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
836        let plan_value = self.intent.build_plan_model()?;
837        let (logical, access) = plan_value.into_parts();
838        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
839        let plan = AccessPlannedQuery::from_parts(logical, access);
840
841        Ok(plan)
842    }
843}
844
845impl<E> Query<E>
846where
847    E: EntityKind + SingletonEntity,
848    E::Key: Default,
849{
850    /// Set the access path to the singleton primary key.
851    pub(crate) fn only(self) -> Self {
852        let Self { intent, _marker } = self;
853
854        Self {
855            intent: intent.only(E::Key::default()),
856            _marker,
857        }
858    }
859}
860
861///
862/// QueryError
863///
864
865#[derive(Debug, ThisError)]
866pub enum QueryError {
867    #[error("{0}")]
868    Validate(#[from] ValidateError),
869
870    #[error("{0}")]
871    Plan(Box<PlanError>),
872
873    #[error("{0}")]
874    Intent(#[from] IntentError),
875
876    #[error("{0}")]
877    Response(#[from] ResponseError),
878
879    #[error("{0}")]
880    Execute(#[from] InternalError),
881}
882
883impl From<PlannerError> for QueryError {
884    fn from(err: PlannerError) -> Self {
885        match err {
886            PlannerError::Plan(err) => Self::from(*err),
887            PlannerError::Internal(err) => Self::Execute(*err),
888        }
889    }
890}
891
892impl From<PlanError> for QueryError {
893    fn from(err: PlanError) -> Self {
894        Self::Plan(Box::new(err))
895    }
896}
897
898///
899/// IntentError
900///
901
902#[derive(Clone, Copy, Debug, ThisError)]
903pub enum IntentError {
904    #[error("{0}")]
905    PlanShape(#[from] PolicyPlanError),
906
907    #[error("by_ids() cannot be combined with predicates")]
908    ByIdsWithPredicate,
909
910    #[error("only() cannot be combined with predicates")]
911    OnlyWithPredicate,
912
913    #[error("multiple key access methods were used on the same query")]
914    KeyAccessConflict,
915
916    #[error("cursor pagination requires an explicit ordering")]
917    CursorRequiresOrder,
918
919    #[error("cursor pagination requires an explicit limit")]
920    CursorRequiresLimit,
921
922    #[error("cursor tokens can only be used with .page().execute()")]
923    CursorRequiresPagedExecution,
924
925    #[error("grouped queries require execute_grouped(...)")]
926    GroupedRequiresExecuteGrouped,
927
928    #[error("HAVING requires GROUP BY")]
929    HavingRequiresGroupBy,
930}
931
932impl From<CursorPagingPolicyError> for IntentError {
933    fn from(err: CursorPagingPolicyError) -> Self {
934        match err {
935            CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
936            CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
937        }
938    }
939}
940
941impl From<IntentKeyAccessPolicyViolation> for IntentError {
942    fn from(err: IntentKeyAccessPolicyViolation) -> Self {
943        match err {
944            IntentKeyAccessPolicyViolation::KeyAccessConflict => Self::KeyAccessConflict,
945            IntentKeyAccessPolicyViolation::ByIdsWithPredicate => Self::ByIdsWithPredicate,
946            IntentKeyAccessPolicyViolation::OnlyWithPredicate => Self::OnlyWithPredicate,
947        }
948    }
949}
950
951impl From<FluentLoadPolicyViolation> for IntentError {
952    fn from(err: FluentLoadPolicyViolation) -> Self {
953        match err {
954            FluentLoadPolicyViolation::CursorRequiresPagedExecution => {
955                Self::CursorRequiresPagedExecution
956            }
957            FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped => {
958                Self::GroupedRequiresExecuteGrouped
959            }
960            FluentLoadPolicyViolation::CursorRequiresOrder => Self::CursorRequiresOrder,
961            FluentLoadPolicyViolation::CursorRequiresLimit => Self::CursorRequiresLimit,
962        }
963    }
964}
965
966/// Helper to append an ordering field while preserving existing order spec.
967fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
968    match order {
969        Some(mut spec) => {
970            spec.fields.push((field.to_string(), direction));
971            spec
972        }
973        None => OrderSpec {
974            fields: vec![(field.to_string(), direction)],
975        },
976    }
977}
978
979// Normalize ORDER BY into a canonical, deterministic shape:
980// - preserve user field order
981// - remove explicit primary-key references from the user segment
982// - append exactly one primary-key field as the terminal tie-break
983fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
984    let mut order = order?;
985    if order.fields.is_empty() {
986        return Some(order);
987    }
988
989    let pk_field = model.primary_key.name;
990    let mut pk_direction = None;
991    order.fields.retain(|(field, direction)| {
992        if field == pk_field {
993            if pk_direction.is_none() {
994                pk_direction = Some(*direction);
995            }
996            false
997        } else {
998            true
999        }
1000    });
1001
1002    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
1003    order.fields.push((pk_field.to_string(), pk_direction));
1004
1005    Some(order)
1006}