Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2
3//! Module: query::intent
4//! Responsibility: query intent construction, coercion, and semantic-plan compilation.
5//! Does not own: executor runtime behavior or index storage details.
6//! Boundary: typed/fluent query inputs lowered into validated logical plans.
7
8#[cfg(test)]
9mod tests;
10
11// Key-only access intent and helpers.
12pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
13pub type LoadSpec = crate::db::query::plan::LoadSpec;
14pub type QueryMode = crate::db::query::plan::QueryMode;
15
16use crate::{
17    db::{
18        access::{AccessPath, AccessPlan, AccessPlanError, normalize_access_plan_value},
19        cursor::CursorPlanError,
20        predicate::{
21            CompareOp, MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
22            normalize_enum_literals, reject_unsupported_query_features,
23        },
24        query::{
25            builder::aggregate::AggregateExpr,
26            explain::ExplainPlan,
27            expr::{FilterExpr, SortExpr, SortLowerError},
28            plan::{
29                AccessPlannedQuery, CursorPagingPolicyError, DeleteLimitSpec,
30                FluentLoadPolicyViolation, GroupAggregateKind, GroupAggregateSpec,
31                GroupHavingClause, GroupHavingSpec, GroupHavingSymbol, GroupSpec,
32                GroupedExecutionConfig, IntentKeyAccessKind as IntentValidationKeyAccessKind,
33                IntentKeyAccessPolicyViolation, LogicalPlan, OrderDirection, OrderSpec, PageSpec,
34                PlanError, PlannerError, PolicyPlanError, ScalarPlan, has_explicit_order,
35                plan_access, resolve_group_field_slot, validate_group_query_semantics,
36                validate_intent_key_access_policy, validate_intent_plan_shape,
37                validate_order_shape, validate_query_semantics,
38            },
39        },
40        response::ResponseError,
41    },
42    error::InternalError,
43    model::entity::EntityModel,
44    traits::{EntityKind, FieldValue, SingletonEntity},
45    value::Value,
46};
47use std::marker::PhantomData;
48use thiserror::Error as ThisError;
49
50///
51/// KeyAccess
52/// Primary-key-only access hints for query planning.
53///
54
55#[derive(Clone, Debug, Eq, PartialEq)]
56pub(crate) enum KeyAccess<K> {
57    Single(K),
58    Many(Vec<K>),
59}
60
61///
62/// KeyAccessKind
63/// Identifies which key-only builder set the access path.
64///
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67pub(crate) enum KeyAccessKind {
68    Single,
69    Many,
70    Only,
71}
72
73///
74/// KeyAccessState
75/// Tracks key-only access plus its origin for intent validation.
76///
77
78#[derive(Clone, Debug, Eq, PartialEq)]
79pub(crate) struct KeyAccessState<K> {
80    pub kind: KeyAccessKind,
81    pub access: KeyAccess<K>,
82}
83
84// Build a model-level access plan for key-only intents.
85pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
86where
87    K: FieldValue,
88{
89    // Phase 1: map typed keys into model-level Value access paths.
90    let plan = match access {
91        KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
92        KeyAccess::Many(keys) => {
93            let values = keys.iter().map(FieldValue::to_value).collect();
94            AccessPlan::path(AccessPath::ByKeys(values))
95        }
96    };
97
98    // Phase 2: canonicalize the access shape via the shared access boundary.
99    normalize_access_plan_value(plan)
100}
101
102// Convert model-level access plans into entity-keyed access plans.
103pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
104    model: &EntityModel,
105    access: AccessPlan<Value>,
106) -> Result<AccessPlan<E::Key>, PlanError> {
107    access.into_executable::<E>(model)
108}
109
110// Convert model-level key values into typed entity keys.
111pub(crate) fn coerce_entity_key<E: EntityKind>(
112    model: &EntityModel,
113    key: &Value,
114) -> Result<E::Key, PlanError> {
115    E::Key::from_value(key).ok_or_else(|| {
116        PlanError::from(AccessPlanError::PrimaryKeyMismatch {
117            field: model.primary_key.name.to_string(),
118            key: key.clone(),
119        })
120    })
121}
122
123impl AccessPlan<Value> {
124    /// Convert model-level access plans into typed executable access plans.
125    pub(crate) fn into_executable<E: EntityKind>(
126        self,
127        model: &EntityModel,
128    ) -> Result<AccessPlan<E::Key>, PlanError> {
129        match self {
130            Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
131            Self::Union(children) => {
132                let mut out = Vec::with_capacity(children.len());
133                for child in children {
134                    out.push(child.into_executable::<E>(model)?);
135                }
136
137                Ok(AccessPlan::union(out))
138            }
139            Self::Intersection(children) => {
140                let mut out = Vec::with_capacity(children.len());
141                for child in children {
142                    out.push(child.into_executable::<E>(model)?);
143                }
144
145                Ok(AccessPlan::intersection(out))
146            }
147        }
148    }
149}
150
151impl AccessPath<Value> {
152    /// Convert one model-level access path into a typed executable access path.
153    pub(crate) fn into_executable<E: EntityKind>(
154        self,
155        model: &EntityModel,
156    ) -> Result<AccessPath<E::Key>, PlanError> {
157        match self {
158            Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
159            Self::ByKeys(keys) => {
160                let mut out = Vec::with_capacity(keys.len());
161                for key in keys {
162                    out.push(coerce_entity_key::<E>(model, &key)?);
163                }
164
165                Ok(AccessPath::ByKeys(out))
166            }
167            Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
168                start: coerce_entity_key::<E>(model, &start)?,
169                end: coerce_entity_key::<E>(model, &end)?,
170            }),
171            Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
172            Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
173            Self::FullScan => Ok(AccessPath::FullScan),
174        }
175    }
176}
177
178///
179/// QueryModel
180///
181/// Model-level query intent and planning context.
182/// Consumes an `EntityModel` derived from typed entity definitions.
183///
184
185#[derive(Debug)]
186pub(crate) struct QueryModel<'m, K> {
187    model: &'m EntityModel,
188    mode: QueryMode,
189    predicate: Option<Predicate>,
190    key_access: Option<KeyAccessState<K>>,
191    key_access_conflict: bool,
192    group: Option<crate::db::query::plan::GroupSpec>,
193    having: Option<GroupHavingSpec>,
194    order: Option<OrderSpec>,
195    distinct: bool,
196    consistency: MissingRowPolicy,
197}
198
199impl<'m, K: FieldValue> QueryModel<'m, K> {
200    #[must_use]
201    pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
202        Self {
203            model,
204            mode: QueryMode::Load(LoadSpec::new()),
205            predicate: None,
206            key_access: None,
207            key_access_conflict: false,
208            group: None,
209            having: None,
210            order: None,
211            distinct: false,
212            consistency,
213        }
214    }
215
216    /// Return the intent mode (load vs delete).
217    #[must_use]
218    pub(crate) const fn mode(&self) -> QueryMode {
219        self.mode
220    }
221
222    #[must_use]
223    fn has_explicit_order(&self) -> bool {
224        has_explicit_order(self.order.as_ref())
225    }
226
227    #[must_use]
228    const fn has_grouping(&self) -> bool {
229        self.group.is_some()
230    }
231
232    #[must_use]
233    const fn load_spec(&self) -> Option<LoadSpec> {
234        match self.mode {
235            QueryMode::Load(spec) => Some(spec),
236            QueryMode::Delete(_) => None,
237        }
238    }
239
240    /// Add a predicate, implicitly AND-ing with any existing predicate.
241    #[must_use]
242    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
243        self.predicate = match self.predicate.take() {
244            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
245            None => Some(predicate),
246        };
247        self
248    }
249
250    /// Apply a dynamic filter expression using the model schema.
251    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
252        let schema = SchemaInfo::from_entity_model(self.model)?;
253        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
254
255        Ok(self.filter(predicate))
256    }
257
258    /// Apply a dynamic sort expression using the model schema.
259    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
260        let schema = SchemaInfo::from_entity_model(self.model)?;
261        let order = match expr.lower_with(&schema) {
262            Ok(order) => order,
263            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
264            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
265        };
266
267        validate_order_shape(Some(&order))
268            .map_err(IntentError::from)
269            .map_err(QueryError::from)?;
270
271        Ok(self.order_spec(order))
272    }
273
274    /// Append an ascending sort key.
275    #[must_use]
276    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
277        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
278        self
279    }
280
281    /// Append a descending sort key.
282    #[must_use]
283    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
284        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
285        self
286    }
287
288    /// Set a fully-specified order spec (validated before reaching this boundary).
289    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
290        self.order = Some(order);
291        self
292    }
293
294    /// Enable DISTINCT semantics for this query intent.
295    #[must_use]
296    pub(crate) const fn distinct(mut self) -> Self {
297        self.distinct = true;
298        self
299    }
300
301    // Resolve one grouped field into one stable field slot and append it to the
302    // grouped spec in declaration order.
303    fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
304        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
305        let group = self.group.get_or_insert(GroupSpec {
306            group_fields: Vec::new(),
307            aggregates: Vec::new(),
308            execution: GroupedExecutionConfig::unbounded(),
309        });
310        if !group
311            .group_fields
312            .iter()
313            .any(|existing| existing.index() == field_slot.index())
314        {
315            group.group_fields.push(field_slot);
316        }
317
318        Ok(self)
319    }
320
321    // Append one grouped aggregate terminal to the grouped declarative spec.
322    fn push_group_aggregate(
323        mut self,
324        kind: GroupAggregateKind,
325        target_field: Option<String>,
326        distinct: bool,
327    ) -> Self {
328        let group = self.group.get_or_insert(GroupSpec {
329            group_fields: Vec::new(),
330            aggregates: Vec::new(),
331            execution: GroupedExecutionConfig::unbounded(),
332        });
333        group.aggregates.push(GroupAggregateSpec {
334            kind,
335            target_field,
336            distinct,
337        });
338
339        self
340    }
341
342    // Override grouped hard limits for this grouped query.
343    fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
344        let group = self.group.get_or_insert(GroupSpec {
345            group_fields: Vec::new(),
346            aggregates: Vec::new(),
347            execution: GroupedExecutionConfig::unbounded(),
348        });
349        group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
350
351        self
352    }
353
354    // Append one grouped HAVING compare clause after GROUP BY terminal declaration.
355    fn push_having_clause(mut self, clause: GroupHavingClause) -> Result<Self, QueryError> {
356        if self.group.is_none() {
357            return Err(QueryError::Intent(IntentError::HavingRequiresGroupBy));
358        }
359
360        let having = self.having.get_or_insert(GroupHavingSpec {
361            clauses: Vec::new(),
362        });
363        having.clauses.push(clause);
364
365        Ok(self)
366    }
367
368    // Append one grouped HAVING clause that references one grouped key field.
369    fn push_having_group_clause(
370        self,
371        field: &str,
372        op: CompareOp,
373        value: Value,
374    ) -> Result<Self, QueryError> {
375        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
376
377        self.push_having_clause(GroupHavingClause {
378            symbol: GroupHavingSymbol::GroupField(field_slot),
379            op,
380            value,
381        })
382    }
383
384    // Append one grouped HAVING clause that references one grouped aggregate output.
385    fn push_having_aggregate_clause(
386        self,
387        aggregate_index: usize,
388        op: CompareOp,
389        value: Value,
390    ) -> Result<Self, QueryError> {
391        self.push_having_clause(GroupHavingClause {
392            symbol: GroupHavingSymbol::AggregateIndex(aggregate_index),
393            op,
394            value,
395        })
396    }
397
398    /// Track key-only access paths and detect conflicting key intents.
399    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
400        if let Some(existing) = &self.key_access
401            && existing.kind != kind
402        {
403            self.key_access_conflict = true;
404        }
405
406        self.key_access = Some(KeyAccessState { kind, access });
407
408        self
409    }
410
411    /// Set the access path to a single primary key lookup.
412    pub(crate) fn by_id(self, id: K) -> Self {
413        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
414    }
415
416    /// Set the access path to a primary key batch lookup.
417    pub(crate) fn by_ids<I>(self, ids: I) -> Self
418    where
419        I: IntoIterator<Item = K>,
420    {
421        self.set_key_access(
422            KeyAccessKind::Many,
423            KeyAccess::Many(ids.into_iter().collect()),
424        )
425    }
426
427    /// Set the access path to the singleton primary key.
428    pub(crate) fn only(self, id: K) -> Self {
429        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
430    }
431
432    /// Mark this intent as a delete query.
433    #[must_use]
434    pub(crate) const fn delete(mut self) -> Self {
435        if self.mode.is_load() {
436            self.mode = QueryMode::Delete(DeleteSpec::new());
437        }
438        self
439    }
440
441    /// Apply a limit to the current mode.
442    ///
443    /// Load limits bound result size; delete limits bound mutation size.
444    #[must_use]
445    pub(crate) const fn limit(mut self, limit: u32) -> Self {
446        match self.mode {
447            QueryMode::Load(mut spec) => {
448                spec.limit = Some(limit);
449                self.mode = QueryMode::Load(spec);
450            }
451            QueryMode::Delete(mut spec) => {
452                spec.limit = Some(limit);
453                self.mode = QueryMode::Delete(spec);
454            }
455        }
456        self
457    }
458
459    /// Apply an offset to a load intent.
460    #[must_use]
461    pub(crate) const fn offset(mut self, offset: u32) -> Self {
462        if let QueryMode::Load(mut spec) = self.mode {
463            spec.offset = offset;
464            self.mode = QueryMode::Load(spec);
465        }
466        self
467    }
468
469    /// Build a model-level logical plan using Value-based access keys.
470    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
471        // Phase 1: schema surface and intent validation.
472        let schema_info = SchemaInfo::from_entity_model(self.model)?;
473        self.validate_intent()?;
474
475        // Phase 2: predicate normalization and access planning.
476        let normalized_predicate = self
477            .predicate
478            .as_ref()
479            .map(|predicate| {
480                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
481                let predicate = normalize_enum_literals(&schema_info, predicate)?;
482                Ok::<Predicate, ValidateError>(normalize(&predicate))
483            })
484            .transpose()?;
485        let access_plan_value = match &self.key_access {
486            Some(state) => access_plan_from_keys_value(&state.access),
487            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
488        };
489
490        // Phase 3: assemble the executor-ready plan.
491        let scalar = ScalarPlan {
492            mode: self.mode,
493            predicate: normalized_predicate,
494            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
495            // This ensures explain/fingerprint/execution share one deterministic order shape.
496            order: canonicalize_order_spec(self.model, self.order.clone()),
497            distinct: self.distinct,
498            delete_limit: match self.mode {
499                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
500                QueryMode::Load(_) => None,
501            },
502            page: match self.mode {
503                QueryMode::Load(spec) => {
504                    if spec.limit.is_some() || spec.offset > 0 {
505                        Some(PageSpec {
506                            limit: spec.limit,
507                            offset: spec.offset,
508                        })
509                    } else {
510                        None
511                    }
512                }
513                QueryMode::Delete(_) => None,
514            },
515            consistency: self.consistency,
516        };
517        let mut plan =
518            AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
519        if let Some(group) = self.group.clone() {
520            plan = match self.having.clone() {
521                Some(having) => plan.into_grouped_with_having(group, Some(having)),
522                None => plan.into_grouped(group),
523            };
524        }
525
526        if plan.grouped_plan().is_some() {
527            validate_group_query_semantics(&schema_info, self.model, &plan)?;
528        } else {
529            validate_query_semantics(&schema_info, self.model, &plan)?;
530        }
531
532        Ok(plan)
533    }
534
535    // Validate pre-plan policy invariants and key-access rules before planning.
536    fn validate_intent(&self) -> Result<(), IntentError> {
537        validate_intent_plan_shape(self.mode, self.order.as_ref()).map_err(IntentError::from)?;
538
539        let key_access_kind = self.key_access.as_ref().map(|state| match state.kind {
540            KeyAccessKind::Single => IntentValidationKeyAccessKind::Single,
541            KeyAccessKind::Many => IntentValidationKeyAccessKind::Many,
542            KeyAccessKind::Only => IntentValidationKeyAccessKind::Only,
543        });
544        validate_intent_key_access_policy(
545            self.key_access_conflict,
546            key_access_kind,
547            self.predicate.is_some(),
548        )
549        .map_err(IntentError::from)?;
550        if self.having.is_some() && self.group.is_none() {
551            return Err(IntentError::HavingRequiresGroupBy);
552        }
553
554        Ok(())
555    }
556}
557
558///
559/// Query
560///
561/// Typed, declarative query intent for a specific entity type.
562///
563/// This intent is:
564/// - schema-agnostic at construction
565/// - normalized and validated only during planning
566/// - free of access-path decisions
567///
568
569///
570/// PlannedQuery
571///
572/// Neutral query-owned planned contract produced by query planning.
573/// Stores logical + access shape without executor compilation state.
574///
575#[derive(Debug)]
576pub struct PlannedQuery<E: EntityKind> {
577    plan: AccessPlannedQuery<E::Key>,
578    _marker: PhantomData<E>,
579}
580
581impl<E: EntityKind> PlannedQuery<E> {
582    #[must_use]
583    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
584        Self {
585            plan,
586            _marker: PhantomData,
587        }
588    }
589
590    #[must_use]
591    pub fn explain(&self) -> ExplainPlan {
592        self.plan.explain_with_model(E::MODEL)
593    }
594}
595
596///
597/// CompiledQuery
598///
599/// Query-owned compiled handoff produced by `Query::plan()`.
600/// This type intentionally carries only logical/access query semantics.
601/// Executor runtime shape is derived explicitly at the executor boundary.
602///
603#[derive(Clone, Debug)]
604pub struct CompiledQuery<E: EntityKind> {
605    plan: AccessPlannedQuery<E::Key>,
606    _marker: PhantomData<E>,
607}
608
609impl<E: EntityKind> CompiledQuery<E> {
610    #[must_use]
611    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
612        Self {
613            plan,
614            _marker: PhantomData,
615        }
616    }
617
618    #[must_use]
619    pub fn explain(&self) -> ExplainPlan {
620        self.plan.explain_with_model(E::MODEL)
621    }
622
623    #[must_use]
624    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
625        self.plan
626    }
627}
628
629#[derive(Debug)]
630pub struct Query<E: EntityKind> {
631    intent: QueryModel<'static, E::Key>,
632    _marker: PhantomData<E>,
633}
634
635impl<E: EntityKind> Query<E> {
636    /// Create a new intent with an explicit missing-row policy.
637    /// Ignore favors idempotency and may mask index/data divergence on deletes.
638    /// Use Error to surface missing rows during scan/delete execution.
639    #[must_use]
640    pub const fn new(consistency: MissingRowPolicy) -> Self {
641        Self {
642            intent: QueryModel::new(E::MODEL, consistency),
643            _marker: PhantomData,
644        }
645    }
646
647    /// Return the intent mode (load vs delete).
648    #[must_use]
649    pub const fn mode(&self) -> QueryMode {
650        self.intent.mode()
651    }
652
653    #[must_use]
654    pub(crate) fn has_explicit_order(&self) -> bool {
655        self.intent.has_explicit_order()
656    }
657
658    #[must_use]
659    pub(crate) const fn has_grouping(&self) -> bool {
660        self.intent.has_grouping()
661    }
662
663    #[must_use]
664    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
665        self.intent.load_spec()
666    }
667
668    /// Add a predicate, implicitly AND-ing with any existing predicate.
669    #[must_use]
670    pub fn filter(mut self, predicate: Predicate) -> Self {
671        self.intent = self.intent.filter(predicate);
672        self
673    }
674
675    /// Apply a dynamic filter expression.
676    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
677        let Self { intent, _marker } = self;
678        let intent = intent.filter_expr(expr)?;
679
680        Ok(Self { intent, _marker })
681    }
682
683    /// Apply a dynamic sort expression.
684    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
685        let Self { intent, _marker } = self;
686        let intent = intent.sort_expr(expr)?;
687
688        Ok(Self { intent, _marker })
689    }
690
691    /// Append an ascending sort key.
692    #[must_use]
693    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
694        self.intent = self.intent.order_by(field);
695        self
696    }
697
698    /// Append a descending sort key.
699    #[must_use]
700    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
701        self.intent = self.intent.order_by_desc(field);
702        self
703    }
704
705    /// Enable DISTINCT semantics for this query.
706    #[must_use]
707    pub fn distinct(mut self) -> Self {
708        self.intent = self.intent.distinct();
709        self
710    }
711
712    /// Add one GROUP BY field.
713    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
714        let Self { intent, _marker } = self;
715        let intent = intent.push_group_field(field.as_ref())?;
716
717        Ok(Self { intent, _marker })
718    }
719
720    /// Add one aggregate terminal via composable aggregate expression.
721    #[must_use]
722    pub fn aggregate(mut self, aggregate: AggregateExpr) -> Self {
723        self.intent = self.intent.push_group_aggregate(
724            aggregate.kind(),
725            aggregate.target_field().map(str::to_string),
726            aggregate.is_distinct(),
727        );
728        self
729    }
730
731    /// Override grouped hard limits for grouped execution budget enforcement.
732    #[must_use]
733    pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
734        self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
735        self
736    }
737
738    /// Add one grouped HAVING compare clause over one grouped key field.
739    pub fn having_group(
740        self,
741        field: impl AsRef<str>,
742        op: CompareOp,
743        value: Value,
744    ) -> Result<Self, QueryError> {
745        let field = field.as_ref().to_owned();
746        let Self { intent, _marker } = self;
747        let intent = intent.push_having_group_clause(&field, op, value)?;
748
749        Ok(Self { intent, _marker })
750    }
751
752    /// Add one grouped HAVING compare clause over one grouped aggregate output.
753    pub fn having_aggregate(
754        self,
755        aggregate_index: usize,
756        op: CompareOp,
757        value: Value,
758    ) -> Result<Self, QueryError> {
759        let Self { intent, _marker } = self;
760        let intent = intent.push_having_aggregate_clause(aggregate_index, op, value)?;
761
762        Ok(Self { intent, _marker })
763    }
764
765    /// Set the access path to a single primary key lookup.
766    pub(crate) fn by_id(self, id: E::Key) -> Self {
767        let Self { intent, _marker } = self;
768        Self {
769            intent: intent.by_id(id),
770            _marker,
771        }
772    }
773
774    /// Set the access path to a primary key batch lookup.
775    pub(crate) fn by_ids<I>(self, ids: I) -> Self
776    where
777        I: IntoIterator<Item = E::Key>,
778    {
779        let Self { intent, _marker } = self;
780        Self {
781            intent: intent.by_ids(ids),
782            _marker,
783        }
784    }
785
786    /// Mark this intent as a delete query.
787    #[must_use]
788    pub fn delete(mut self) -> Self {
789        self.intent = self.intent.delete();
790        self
791    }
792
793    /// Apply a limit to the current mode.
794    ///
795    /// Load limits bound result size; delete limits bound mutation size.
796    /// For scalar load queries, any use of `limit` or `offset` requires an
797    /// explicit `order_by(...)` so pagination is deterministic.
798    /// GROUP BY queries use canonical grouped-key order by default.
799    #[must_use]
800    pub fn limit(mut self, limit: u32) -> Self {
801        self.intent = self.intent.limit(limit);
802        self
803    }
804
805    /// Apply an offset to a load intent.
806    ///
807    /// Scalar pagination requires an explicit `order_by(...)`.
808    /// GROUP BY queries use canonical grouped-key order by default.
809    #[must_use]
810    pub fn offset(mut self, offset: u32) -> Self {
811        self.intent = self.intent.offset(offset);
812        self
813    }
814
815    /// Explain this intent without executing it.
816    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
817        let plan = self.planned()?;
818
819        Ok(plan.explain())
820    }
821
822    /// Plan this intent into a neutral planned query contract.
823    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
824        let plan = self.build_plan()?;
825
826        Ok(PlannedQuery::new(plan))
827    }
828
829    /// Compile this intent into query-owned handoff state.
830    ///
831    /// This boundary intentionally does not expose executor runtime shape.
832    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
833        let plan = self.build_plan()?;
834
835        Ok(CompiledQuery::new(plan))
836    }
837
838    // Build a logical plan for the current intent.
839    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
840        let plan_value = self.intent.build_plan_model()?;
841        let (logical, access) = plan_value.into_parts();
842        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
843        let plan = AccessPlannedQuery::from_parts(logical, access);
844
845        Ok(plan)
846    }
847}
848
849impl<E> Query<E>
850where
851    E: EntityKind + SingletonEntity,
852    E::Key: Default,
853{
854    /// Set the access path to the singleton primary key.
855    pub(crate) fn only(self) -> Self {
856        let Self { intent, _marker } = self;
857
858        Self {
859            intent: intent.only(E::Key::default()),
860            _marker,
861        }
862    }
863}
864
865///
866/// QueryError
867///
868
869#[derive(Debug, ThisError)]
870pub enum QueryError {
871    #[error("{0}")]
872    Validate(#[from] ValidateError),
873
874    #[error("{0}")]
875    Plan(Box<PlanError>),
876
877    #[error("{0}")]
878    Intent(#[from] IntentError),
879
880    #[error("{0}")]
881    Response(#[from] ResponseError),
882
883    #[error("{0}")]
884    Execute(#[from] InternalError),
885}
886
887impl From<PlannerError> for QueryError {
888    fn from(err: PlannerError) -> Self {
889        match err {
890            PlannerError::Plan(err) => Self::from(*err),
891            PlannerError::Internal(err) => Self::Execute(*err),
892        }
893    }
894}
895
896impl From<PlanError> for QueryError {
897    fn from(err: PlanError) -> Self {
898        Self::Plan(Box::new(err))
899    }
900}
901
902///
903/// IntentError
904///
905
906#[derive(Clone, Copy, Debug, ThisError)]
907pub enum IntentError {
908    #[error("{0}")]
909    PlanShape(#[from] PolicyPlanError),
910
911    #[error("by_ids() cannot be combined with predicates")]
912    ByIdsWithPredicate,
913
914    #[error("only() cannot be combined with predicates")]
915    OnlyWithPredicate,
916
917    #[error("multiple key access methods were used on the same query")]
918    KeyAccessConflict,
919
920    #[error(
921        "{message}",
922        message = CursorPlanError::cursor_requires_order_message()
923    )]
924    CursorRequiresOrder,
925
926    #[error(
927        "{message}",
928        message = CursorPlanError::cursor_requires_limit_message()
929    )]
930    CursorRequiresLimit,
931
932    #[error("cursor tokens can only be used with .page().execute()")]
933    CursorRequiresPagedExecution,
934
935    #[error("grouped queries require execute_grouped(...)")]
936    GroupedRequiresExecuteGrouped,
937
938    #[error("HAVING requires GROUP BY")]
939    HavingRequiresGroupBy,
940}
941
942impl From<CursorPagingPolicyError> for IntentError {
943    fn from(err: CursorPagingPolicyError) -> Self {
944        match err {
945            CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
946            CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
947        }
948    }
949}
950
951impl From<IntentKeyAccessPolicyViolation> for IntentError {
952    fn from(err: IntentKeyAccessPolicyViolation) -> Self {
953        match err {
954            IntentKeyAccessPolicyViolation::KeyAccessConflict => Self::KeyAccessConflict,
955            IntentKeyAccessPolicyViolation::ByIdsWithPredicate => Self::ByIdsWithPredicate,
956            IntentKeyAccessPolicyViolation::OnlyWithPredicate => Self::OnlyWithPredicate,
957        }
958    }
959}
960
961impl From<FluentLoadPolicyViolation> for IntentError {
962    fn from(err: FluentLoadPolicyViolation) -> Self {
963        match err {
964            FluentLoadPolicyViolation::CursorRequiresPagedExecution => {
965                Self::CursorRequiresPagedExecution
966            }
967            FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped => {
968                Self::GroupedRequiresExecuteGrouped
969            }
970            FluentLoadPolicyViolation::CursorRequiresOrder => Self::CursorRequiresOrder,
971            FluentLoadPolicyViolation::CursorRequiresLimit => Self::CursorRequiresLimit,
972        }
973    }
974}
975
976/// Helper to append an ordering field while preserving existing order spec.
977fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
978    match order {
979        Some(mut spec) => {
980            spec.fields.push((field.to_string(), direction));
981            spec
982        }
983        None => OrderSpec {
984            fields: vec![(field.to_string(), direction)],
985        },
986    }
987}
988
989// Normalize ORDER BY into a canonical, deterministic shape:
990// - preserve user field order
991// - remove explicit primary-key references from the user segment
992// - append exactly one primary-key field as the terminal tie-break
993fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
994    let mut order = order?;
995    if order.fields.is_empty() {
996        return Some(order);
997    }
998
999    let pk_field = model.primary_key.name;
1000    let mut pk_direction = None;
1001    order.fields.retain(|(field, direction)| {
1002        if field == pk_field {
1003            if pk_direction.is_none() {
1004                pk_direction = Some(*direction);
1005            }
1006            false
1007        } else {
1008            true
1009        }
1010    });
1011
1012    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
1013    order.fields.push((pk_field.to_string(), pk_direction));
1014
1015    Some(order)
1016}