Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2
3//! Module: query::intent
4//! Responsibility: query intent construction, coercion, and semantic-plan compilation.
5//! Does not own: executor runtime behavior or index storage details.
6//! Boundary: typed/fluent query inputs lowered into validated logical plans.
7
8#[cfg(test)]
9mod tests;
10
11// Key-only access intent and helpers.
12pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
13pub type LoadSpec = crate::db::query::plan::LoadSpec;
14pub type QueryMode = crate::db::query::plan::QueryMode;
15
16use crate::{
17    db::{
18        access::{AccessPath, AccessPlan, AccessPlanError, canonical_by_keys_path},
19        predicate::{
20            CompareOp, MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
21            normalize_enum_literals, reject_unsupported_query_features,
22        },
23        query::{
24            explain::ExplainPlan,
25            expr::{FilterExpr, SortExpr, SortLowerError},
26            plan::{
27                AccessPlannedQuery, CursorPagingPolicyError, DeleteLimitSpec,
28                FluentLoadPolicyViolation, GroupAggregateKind, GroupAggregateSpec,
29                GroupHavingClause, GroupHavingSpec, GroupHavingSymbol, GroupSpec,
30                GroupedExecutionConfig, IntentKeyAccessKind as IntentValidationKeyAccessKind,
31                IntentKeyAccessPolicyViolation, IntentTerminalPolicyViolation, LogicalPlan,
32                OrderDirection, OrderSpec, PageSpec, PlanError, PlannerError, PolicyPlanError,
33                ScalarPlan, has_explicit_order, plan_access, resolve_group_field_slot,
34                validate_group_query_semantics, validate_grouped_field_target_extrema_policy,
35                validate_intent_key_access_policy, validate_intent_plan_shape,
36                validate_order_shape, validate_query_semantics,
37            },
38        },
39        response::ResponseError,
40    },
41    error::InternalError,
42    model::entity::EntityModel,
43    traits::{EntityKind, FieldValue, SingletonEntity},
44    value::Value,
45};
46use std::marker::PhantomData;
47use thiserror::Error as ThisError;
48
49///
50/// KeyAccess
51/// Primary-key-only access hints for query planning.
52///
53
54#[derive(Clone, Debug, Eq, PartialEq)]
55pub(crate) enum KeyAccess<K> {
56    Single(K),
57    Many(Vec<K>),
58}
59
60///
61/// KeyAccessKind
62/// Identifies which key-only builder set the access path.
63///
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub(crate) enum KeyAccessKind {
67    Single,
68    Many,
69    Only,
70}
71
72///
73/// KeyAccessState
74/// Tracks key-only access plus its origin for intent validation.
75///
76
77#[derive(Clone, Debug, Eq, PartialEq)]
78pub(crate) struct KeyAccessState<K> {
79    pub kind: KeyAccessKind,
80    pub access: KeyAccess<K>,
81}
82
83// Build a model-level access plan for key-only intents.
84pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
85where
86    K: FieldValue,
87{
88    // Phase 1: map typed keys into model-level Value access paths.
89    match access {
90        KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
91        KeyAccess::Many(keys) => {
92            let values = keys.iter().map(FieldValue::to_value).collect();
93            AccessPlan::path(canonical_by_keys_path(values))
94        }
95    }
96}
97
98// Convert model-level access plans into entity-keyed access plans.
99pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
100    model: &EntityModel,
101    access: AccessPlan<Value>,
102) -> Result<AccessPlan<E::Key>, PlanError> {
103    access.into_executable::<E>(model)
104}
105
106// Convert model-level key values into typed entity keys.
107pub(crate) fn coerce_entity_key<E: EntityKind>(
108    model: &EntityModel,
109    key: &Value,
110) -> Result<E::Key, PlanError> {
111    E::Key::from_value(key).ok_or_else(|| {
112        PlanError::from(AccessPlanError::PrimaryKeyMismatch {
113            field: model.primary_key.name.to_string(),
114            key: key.clone(),
115        })
116    })
117}
118
119impl AccessPlan<Value> {
120    /// Convert model-level access plans into typed executable access plans.
121    pub(crate) fn into_executable<E: EntityKind>(
122        self,
123        model: &EntityModel,
124    ) -> Result<AccessPlan<E::Key>, PlanError> {
125        match self {
126            Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
127            Self::Union(children) => {
128                let mut out = Vec::with_capacity(children.len());
129                for child in children {
130                    out.push(child.into_executable::<E>(model)?);
131                }
132
133                Ok(AccessPlan::union(out))
134            }
135            Self::Intersection(children) => {
136                let mut out = Vec::with_capacity(children.len());
137                for child in children {
138                    out.push(child.into_executable::<E>(model)?);
139                }
140
141                Ok(AccessPlan::intersection(out))
142            }
143        }
144    }
145}
146
147impl AccessPath<Value> {
148    /// Convert one model-level access path into a typed executable access path.
149    pub(crate) fn into_executable<E: EntityKind>(
150        self,
151        model: &EntityModel,
152    ) -> Result<AccessPath<E::Key>, PlanError> {
153        match self {
154            Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
155            Self::ByKeys(keys) => {
156                let mut out = Vec::with_capacity(keys.len());
157                for key in keys {
158                    out.push(coerce_entity_key::<E>(model, &key)?);
159                }
160
161                Ok(AccessPath::ByKeys(out))
162            }
163            Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
164                start: coerce_entity_key::<E>(model, &start)?,
165                end: coerce_entity_key::<E>(model, &end)?,
166            }),
167            Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
168            Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
169            Self::FullScan => Ok(AccessPath::FullScan),
170        }
171    }
172}
173
174///
175/// QueryModel
176///
177/// Model-level query intent and planning context.
178/// Consumes an `EntityModel` derived from typed entity definitions.
179///
180
181#[derive(Debug)]
182pub(crate) struct QueryModel<'m, K> {
183    model: &'m EntityModel,
184    mode: QueryMode,
185    predicate: Option<Predicate>,
186    key_access: Option<KeyAccessState<K>>,
187    key_access_conflict: bool,
188    group: Option<crate::db::query::plan::GroupSpec>,
189    having: Option<GroupHavingSpec>,
190    order: Option<OrderSpec>,
191    distinct: bool,
192    consistency: MissingRowPolicy,
193}
194
195impl<'m, K: FieldValue> QueryModel<'m, K> {
196    #[must_use]
197    pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
198        Self {
199            model,
200            mode: QueryMode::Load(LoadSpec::new()),
201            predicate: None,
202            key_access: None,
203            key_access_conflict: false,
204            group: None,
205            having: None,
206            order: None,
207            distinct: false,
208            consistency,
209        }
210    }
211
212    /// Return the intent mode (load vs delete).
213    #[must_use]
214    pub(crate) const fn mode(&self) -> QueryMode {
215        self.mode
216    }
217
218    #[must_use]
219    fn has_explicit_order(&self) -> bool {
220        has_explicit_order(self.order.as_ref())
221    }
222
223    #[must_use]
224    const fn has_grouping(&self) -> bool {
225        self.group.is_some()
226    }
227
228    #[must_use]
229    const fn load_spec(&self) -> Option<LoadSpec> {
230        match self.mode {
231            QueryMode::Load(spec) => Some(spec),
232            QueryMode::Delete(_) => None,
233        }
234    }
235
236    /// Add a predicate, implicitly AND-ing with any existing predicate.
237    #[must_use]
238    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
239        self.predicate = match self.predicate.take() {
240            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
241            None => Some(predicate),
242        };
243        self
244    }
245
246    /// Apply a dynamic filter expression using the model schema.
247    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
248        let schema = SchemaInfo::from_entity_model(self.model)?;
249        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
250
251        Ok(self.filter(predicate))
252    }
253
254    /// Apply a dynamic sort expression using the model schema.
255    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
256        let schema = SchemaInfo::from_entity_model(self.model)?;
257        let order = match expr.lower_with(&schema) {
258            Ok(order) => order,
259            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
260            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
261        };
262
263        validate_order_shape(Some(&order))
264            .map_err(IntentError::from)
265            .map_err(QueryError::from)?;
266
267        Ok(self.order_spec(order))
268    }
269
270    /// Append an ascending sort key.
271    #[must_use]
272    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
273        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
274        self
275    }
276
277    /// Append a descending sort key.
278    #[must_use]
279    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
280        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
281        self
282    }
283
284    /// Set a fully-specified order spec (validated before reaching this boundary).
285    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
286        self.order = Some(order);
287        self
288    }
289
290    /// Enable DISTINCT semantics for this query intent.
291    #[must_use]
292    pub(crate) const fn distinct(mut self) -> Self {
293        self.distinct = true;
294        self
295    }
296
297    // Resolve one grouped field into one stable field slot and append it to the
298    // grouped spec in declaration order.
299    fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
300        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
301        let group = self.group.get_or_insert(GroupSpec {
302            group_fields: Vec::new(),
303            aggregates: Vec::new(),
304            execution: GroupedExecutionConfig::unbounded(),
305        });
306        if !group
307            .group_fields
308            .iter()
309            .any(|existing| existing.index() == field_slot.index())
310        {
311            group.group_fields.push(field_slot);
312        }
313
314        Ok(self)
315    }
316
317    // Append one grouped aggregate terminal to the grouped declarative spec.
318    fn push_group_aggregate(
319        mut self,
320        kind: GroupAggregateKind,
321        target_field: Option<String>,
322        distinct: bool,
323    ) -> Self {
324        let group = self.group.get_or_insert(GroupSpec {
325            group_fields: Vec::new(),
326            aggregates: Vec::new(),
327            execution: GroupedExecutionConfig::unbounded(),
328        });
329        group.aggregates.push(GroupAggregateSpec {
330            kind,
331            target_field,
332            distinct,
333        });
334
335        self
336    }
337
338    // Override grouped hard limits for this grouped query.
339    fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
340        let group = self.group.get_or_insert(GroupSpec {
341            group_fields: Vec::new(),
342            aggregates: Vec::new(),
343            execution: GroupedExecutionConfig::unbounded(),
344        });
345        group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
346
347        self
348    }
349
350    // Append one grouped HAVING compare clause after GROUP BY terminal declaration.
351    fn push_having_clause(mut self, clause: GroupHavingClause) -> Result<Self, QueryError> {
352        if self.group.is_none() {
353            return Err(QueryError::Intent(IntentError::HavingRequiresGroupBy));
354        }
355
356        let having = self.having.get_or_insert(GroupHavingSpec {
357            clauses: Vec::new(),
358        });
359        having.clauses.push(clause);
360
361        Ok(self)
362    }
363
364    // Append one grouped HAVING clause that references one grouped key field.
365    fn push_having_group_clause(
366        self,
367        field: &str,
368        op: CompareOp,
369        value: Value,
370    ) -> Result<Self, QueryError> {
371        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
372
373        self.push_having_clause(GroupHavingClause {
374            symbol: GroupHavingSymbol::GroupField(field_slot),
375            op,
376            value,
377        })
378    }
379
380    // Append one grouped HAVING clause that references one grouped aggregate output.
381    fn push_having_aggregate_clause(
382        self,
383        aggregate_index: usize,
384        op: CompareOp,
385        value: Value,
386    ) -> Result<Self, QueryError> {
387        self.push_having_clause(GroupHavingClause {
388            symbol: GroupHavingSymbol::AggregateIndex(aggregate_index),
389            op,
390            value,
391        })
392    }
393
394    /// Track key-only access paths and detect conflicting key intents.
395    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
396        if let Some(existing) = &self.key_access
397            && existing.kind != kind
398        {
399            self.key_access_conflict = true;
400        }
401
402        self.key_access = Some(KeyAccessState { kind, access });
403
404        self
405    }
406
407    /// Set the access path to a single primary key lookup.
408    pub(crate) fn by_id(self, id: K) -> Self {
409        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
410    }
411
412    /// Set the access path to a primary key batch lookup.
413    pub(crate) fn by_ids<I>(self, ids: I) -> Self
414    where
415        I: IntoIterator<Item = K>,
416    {
417        self.set_key_access(
418            KeyAccessKind::Many,
419            KeyAccess::Many(ids.into_iter().collect()),
420        )
421    }
422
423    /// Set the access path to the singleton primary key.
424    pub(crate) fn only(self, id: K) -> Self {
425        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
426    }
427
428    /// Mark this intent as a delete query.
429    #[must_use]
430    pub(crate) const fn delete(mut self) -> Self {
431        if self.mode.is_load() {
432            self.mode = QueryMode::Delete(DeleteSpec::new());
433        }
434        self
435    }
436
437    /// Apply a limit to the current mode.
438    ///
439    /// Load limits bound result size; delete limits bound mutation size.
440    #[must_use]
441    pub(crate) const fn limit(mut self, limit: u32) -> Self {
442        match self.mode {
443            QueryMode::Load(mut spec) => {
444                spec.limit = Some(limit);
445                self.mode = QueryMode::Load(spec);
446            }
447            QueryMode::Delete(mut spec) => {
448                spec.limit = Some(limit);
449                self.mode = QueryMode::Delete(spec);
450            }
451        }
452        self
453    }
454
455    /// Apply an offset to a load intent.
456    #[must_use]
457    pub(crate) const fn offset(mut self, offset: u32) -> Self {
458        if let QueryMode::Load(mut spec) = self.mode {
459            spec.offset = offset;
460            self.mode = QueryMode::Load(spec);
461        }
462        self
463    }
464
465    /// Build a model-level logical plan using Value-based access keys.
466    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
467        // Phase 1: schema surface and intent validation.
468        let schema_info = SchemaInfo::from_entity_model(self.model)?;
469        self.validate_intent()?;
470
471        // Phase 2: predicate normalization and access planning.
472        let normalized_predicate = self
473            .predicate
474            .as_ref()
475            .map(|predicate| {
476                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
477                let predicate = normalize_enum_literals(&schema_info, predicate)?;
478                Ok::<Predicate, ValidateError>(normalize(&predicate))
479            })
480            .transpose()?;
481        let access_plan_value = match &self.key_access {
482            Some(state) => access_plan_from_keys_value(&state.access),
483            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
484        };
485
486        // Phase 3: assemble the executor-ready plan.
487        let scalar = ScalarPlan {
488            mode: self.mode,
489            predicate: normalized_predicate,
490            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
491            // This ensures explain/fingerprint/execution share one deterministic order shape.
492            order: canonicalize_order_spec(self.model, self.order.clone()),
493            distinct: self.distinct,
494            delete_limit: match self.mode {
495                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
496                QueryMode::Load(_) => None,
497            },
498            page: match self.mode {
499                QueryMode::Load(spec) => {
500                    if spec.limit.is_some() || spec.offset > 0 {
501                        Some(PageSpec {
502                            limit: spec.limit,
503                            offset: spec.offset,
504                        })
505                    } else {
506                        None
507                    }
508                }
509                QueryMode::Delete(_) => None,
510            },
511            consistency: self.consistency,
512        };
513        let mut plan =
514            AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
515        if let Some(group) = self.group.clone() {
516            plan = match self.having.clone() {
517                Some(having) => plan.into_grouped_with_having(group, Some(having)),
518                None => plan.into_grouped(group),
519            };
520        }
521
522        if plan.grouped_plan().is_some() {
523            validate_group_query_semantics(&schema_info, self.model, &plan)?;
524        } else {
525            validate_query_semantics(&schema_info, self.model, &plan)?;
526        }
527
528        Ok(plan)
529    }
530
531    // Validate pre-plan policy invariants and key-access rules before planning.
532    fn validate_intent(&self) -> Result<(), IntentError> {
533        validate_intent_plan_shape(self.mode, self.order.as_ref()).map_err(IntentError::from)?;
534
535        let key_access_kind = self.key_access.as_ref().map(|state| match state.kind {
536            KeyAccessKind::Single => IntentValidationKeyAccessKind::Single,
537            KeyAccessKind::Many => IntentValidationKeyAccessKind::Many,
538            KeyAccessKind::Only => IntentValidationKeyAccessKind::Only,
539        });
540        validate_intent_key_access_policy(
541            self.key_access_conflict,
542            key_access_kind,
543            self.predicate.is_some(),
544        )
545        .map_err(IntentError::from)?;
546        if self.having.is_some() && self.group.is_none() {
547            return Err(IntentError::HavingRequiresGroupBy);
548        }
549
550        Ok(())
551    }
552}
553
554///
555/// Query
556///
557/// Typed, declarative query intent for a specific entity type.
558///
559/// This intent is:
560/// - schema-agnostic at construction
561/// - normalized and validated only during planning
562/// - free of access-path decisions
563///
564
565///
566/// PlannedQuery
567///
568/// Neutral query-owned planned contract produced by query planning.
569/// Stores logical + access shape without executor compilation state.
570///
571#[derive(Debug)]
572pub struct PlannedQuery<E: EntityKind> {
573    plan: AccessPlannedQuery<E::Key>,
574    _marker: PhantomData<E>,
575}
576
577impl<E: EntityKind> PlannedQuery<E> {
578    #[must_use]
579    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
580        Self {
581            plan,
582            _marker: PhantomData,
583        }
584    }
585
586    #[must_use]
587    pub fn explain(&self) -> ExplainPlan {
588        self.plan.explain_with_model(E::MODEL)
589    }
590}
591
592///
593/// CompiledQuery
594///
595/// Query-owned compiled handoff produced by `Query::plan()`.
596/// This type intentionally carries only logical/access query semantics.
597/// Executor runtime shape is derived explicitly at the executor boundary.
598///
599#[derive(Clone, Debug)]
600pub struct CompiledQuery<E: EntityKind> {
601    plan: AccessPlannedQuery<E::Key>,
602    _marker: PhantomData<E>,
603}
604
605impl<E: EntityKind> CompiledQuery<E> {
606    #[must_use]
607    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
608        Self {
609            plan,
610            _marker: PhantomData,
611        }
612    }
613
614    #[must_use]
615    pub fn explain(&self) -> ExplainPlan {
616        self.plan.explain_with_model(E::MODEL)
617    }
618
619    #[must_use]
620    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
621        self.plan
622    }
623}
624
625#[derive(Debug)]
626pub struct Query<E: EntityKind> {
627    intent: QueryModel<'static, E::Key>,
628    _marker: PhantomData<E>,
629}
630
631impl<E: EntityKind> Query<E> {
632    /// Create a new intent with an explicit missing-row policy.
633    /// Ignore favors idempotency and may mask index/data divergence on deletes.
634    /// Use Error to surface missing rows during scan/delete execution.
635    #[must_use]
636    pub const fn new(consistency: MissingRowPolicy) -> Self {
637        Self {
638            intent: QueryModel::new(E::MODEL, consistency),
639            _marker: PhantomData,
640        }
641    }
642
643    /// Return the intent mode (load vs delete).
644    #[must_use]
645    pub const fn mode(&self) -> QueryMode {
646        self.intent.mode()
647    }
648
649    #[must_use]
650    pub(crate) fn has_explicit_order(&self) -> bool {
651        self.intent.has_explicit_order()
652    }
653
654    #[must_use]
655    pub(crate) const fn has_grouping(&self) -> bool {
656        self.intent.has_grouping()
657    }
658
659    #[must_use]
660    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
661        self.intent.load_spec()
662    }
663
664    /// Add a predicate, implicitly AND-ing with any existing predicate.
665    #[must_use]
666    pub fn filter(mut self, predicate: Predicate) -> Self {
667        self.intent = self.intent.filter(predicate);
668        self
669    }
670
671    /// Apply a dynamic filter expression.
672    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
673        let Self { intent, _marker } = self;
674        let intent = intent.filter_expr(expr)?;
675
676        Ok(Self { intent, _marker })
677    }
678
679    /// Apply a dynamic sort expression.
680    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
681        let Self { intent, _marker } = self;
682        let intent = intent.sort_expr(expr)?;
683
684        Ok(Self { intent, _marker })
685    }
686
687    /// Append an ascending sort key.
688    #[must_use]
689    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
690        self.intent = self.intent.order_by(field);
691        self
692    }
693
694    /// Append a descending sort key.
695    #[must_use]
696    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
697        self.intent = self.intent.order_by_desc(field);
698        self
699    }
700
701    /// Enable DISTINCT semantics for this query.
702    #[must_use]
703    pub fn distinct(mut self) -> Self {
704        self.intent = self.intent.distinct();
705        self
706    }
707
708    /// Add one GROUP BY field.
709    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
710        let Self { intent, _marker } = self;
711        let intent = intent.push_group_field(field.as_ref())?;
712
713        Ok(Self { intent, _marker })
714    }
715
716    fn push_group_terminal(mut self, kind: GroupAggregateKind) -> Self {
717        self.intent = self.intent.push_group_aggregate(kind, None, false);
718        self
719    }
720
721    fn push_group_terminal_distinct(mut self, kind: GroupAggregateKind) -> Self {
722        self.intent = self.intent.push_group_aggregate(kind, None, true);
723        self
724    }
725
726    /// Add one grouped `count(*)` terminal.
727    #[must_use]
728    pub fn group_count(self) -> Self {
729        self.push_group_terminal(GroupAggregateKind::Count)
730    }
731
732    /// Add one grouped `count(distinct id)` terminal.
733    #[must_use]
734    pub fn group_count_distinct(self) -> Self {
735        self.push_group_terminal_distinct(GroupAggregateKind::Count)
736    }
737
738    /// Add one grouped global `count(distinct field)` terminal.
739    #[must_use]
740    pub fn group_count_distinct_by(mut self, field: impl AsRef<str>) -> Self {
741        self.intent = self.intent.push_group_aggregate(
742            GroupAggregateKind::Count,
743            Some(field.as_ref().to_string()),
744            true,
745        );
746        self
747    }
748
749    /// Add one grouped global `sum(distinct field)` terminal.
750    #[must_use]
751    pub fn group_sum_distinct_by(mut self, field: impl AsRef<str>) -> Self {
752        self.intent = self.intent.push_group_aggregate(
753            GroupAggregateKind::Sum,
754            Some(field.as_ref().to_string()),
755            true,
756        );
757        self
758    }
759
760    /// Add one grouped `exists` terminal.
761    #[must_use]
762    pub fn group_exists(self) -> Self {
763        self.push_group_terminal(GroupAggregateKind::Exists)
764    }
765
766    /// Add one grouped `first` terminal.
767    #[must_use]
768    pub fn group_first(self) -> Self {
769        self.push_group_terminal(GroupAggregateKind::First)
770    }
771
772    /// Add one grouped `last` terminal.
773    #[must_use]
774    pub fn group_last(self) -> Self {
775        self.push_group_terminal(GroupAggregateKind::Last)
776    }
777
778    /// Add one grouped `min` terminal (id extrema).
779    #[must_use]
780    pub fn group_min(self) -> Self {
781        self.push_group_terminal(GroupAggregateKind::Min)
782    }
783
784    /// Add one grouped `min(distinct id)` terminal.
785    #[must_use]
786    pub fn group_min_distinct(self) -> Self {
787        self.push_group_terminal_distinct(GroupAggregateKind::Min)
788    }
789
790    /// Add one grouped `max` terminal (id extrema).
791    #[must_use]
792    pub fn group_max(self) -> Self {
793        self.push_group_terminal(GroupAggregateKind::Max)
794    }
795
796    /// Add one grouped `max(distinct id)` terminal.
797    #[must_use]
798    pub fn group_max_distinct(self) -> Self {
799        self.push_group_terminal_distinct(GroupAggregateKind::Max)
800    }
801
802    /// Add one grouped `min(field)` terminal.
803    ///
804    /// Grouped field-target extrema are deferred in grouped v1.
805    pub fn group_min_by(self, _field: impl AsRef<str>) -> Result<Self, QueryError> {
806        validate_grouped_field_target_extrema_policy()
807            .map_err(IntentError::from)
808            .map_err(QueryError::Intent)?;
809
810        Ok(self)
811    }
812
813    /// Add one grouped `max(field)` terminal.
814    ///
815    /// Grouped field-target extrema are deferred in grouped v1.
816    pub fn group_max_by(self, _field: impl AsRef<str>) -> Result<Self, QueryError> {
817        validate_grouped_field_target_extrema_policy()
818            .map_err(IntentError::from)
819            .map_err(QueryError::Intent)?;
820
821        Ok(self)
822    }
823
824    /// Override grouped hard limits for grouped execution budget enforcement.
825    #[must_use]
826    pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
827        self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
828        self
829    }
830
831    /// Add one grouped HAVING compare clause over one grouped key field.
832    pub fn having_group(
833        self,
834        field: impl AsRef<str>,
835        op: CompareOp,
836        value: Value,
837    ) -> Result<Self, QueryError> {
838        let field = field.as_ref().to_owned();
839        let Self { intent, _marker } = self;
840        let intent = intent.push_having_group_clause(&field, op, value)?;
841
842        Ok(Self { intent, _marker })
843    }
844
845    /// Add one grouped HAVING compare clause over one grouped aggregate output.
846    pub fn having_aggregate(
847        self,
848        aggregate_index: usize,
849        op: CompareOp,
850        value: Value,
851    ) -> Result<Self, QueryError> {
852        let Self { intent, _marker } = self;
853        let intent = intent.push_having_aggregate_clause(aggregate_index, op, value)?;
854
855        Ok(Self { intent, _marker })
856    }
857
858    /// Set the access path to a single primary key lookup.
859    pub(crate) fn by_id(self, id: E::Key) -> Self {
860        let Self { intent, _marker } = self;
861        Self {
862            intent: intent.by_id(id),
863            _marker,
864        }
865    }
866
867    /// Set the access path to a primary key batch lookup.
868    pub(crate) fn by_ids<I>(self, ids: I) -> Self
869    where
870        I: IntoIterator<Item = E::Key>,
871    {
872        let Self { intent, _marker } = self;
873        Self {
874            intent: intent.by_ids(ids),
875            _marker,
876        }
877    }
878
879    /// Mark this intent as a delete query.
880    #[must_use]
881    pub fn delete(mut self) -> Self {
882        self.intent = self.intent.delete();
883        self
884    }
885
886    /// Apply a limit to the current mode.
887    ///
888    /// Load limits bound result size; delete limits bound mutation size.
889    /// For scalar load queries, any use of `limit` or `offset` requires an
890    /// explicit `order_by(...)` so pagination is deterministic.
891    /// GROUP BY queries use canonical grouped-key order by default.
892    #[must_use]
893    pub fn limit(mut self, limit: u32) -> Self {
894        self.intent = self.intent.limit(limit);
895        self
896    }
897
898    /// Apply an offset to a load intent.
899    ///
900    /// Scalar pagination requires an explicit `order_by(...)`.
901    /// GROUP BY queries use canonical grouped-key order by default.
902    #[must_use]
903    pub fn offset(mut self, offset: u32) -> Self {
904        self.intent = self.intent.offset(offset);
905        self
906    }
907
908    /// Explain this intent without executing it.
909    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
910        let plan = self.planned()?;
911
912        Ok(plan.explain())
913    }
914
915    /// Plan this intent into a neutral planned query contract.
916    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
917        let plan = self.build_plan()?;
918
919        Ok(PlannedQuery::new(plan))
920    }
921
922    /// Compile this intent into query-owned handoff state.
923    ///
924    /// This boundary intentionally does not expose executor runtime shape.
925    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
926        let plan = self.build_plan()?;
927
928        Ok(CompiledQuery::new(plan))
929    }
930
931    // Build a logical plan for the current intent.
932    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
933        let plan_value = self.intent.build_plan_model()?;
934        let (logical, access) = plan_value.into_parts();
935        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
936        let plan = AccessPlannedQuery::from_parts(logical, access);
937
938        Ok(plan)
939    }
940}
941
942impl<E> Query<E>
943where
944    E: EntityKind + SingletonEntity,
945    E::Key: Default,
946{
947    /// Set the access path to the singleton primary key.
948    pub(crate) fn only(self) -> Self {
949        let Self { intent, _marker } = self;
950
951        Self {
952            intent: intent.only(E::Key::default()),
953            _marker,
954        }
955    }
956}
957
958///
959/// QueryError
960///
961
962#[derive(Debug, ThisError)]
963pub enum QueryError {
964    #[error("{0}")]
965    Validate(#[from] ValidateError),
966
967    #[error("{0}")]
968    Plan(Box<PlanError>),
969
970    #[error("{0}")]
971    Intent(#[from] IntentError),
972
973    #[error("{0}")]
974    Response(#[from] ResponseError),
975
976    #[error("{0}")]
977    Execute(#[from] InternalError),
978}
979
980impl From<PlannerError> for QueryError {
981    fn from(err: PlannerError) -> Self {
982        match err {
983            PlannerError::Plan(err) => Self::from(*err),
984            PlannerError::Internal(err) => Self::Execute(*err),
985        }
986    }
987}
988
989impl From<PlanError> for QueryError {
990    fn from(err: PlanError) -> Self {
991        Self::Plan(Box::new(err))
992    }
993}
994
995///
996/// IntentError
997///
998
999#[derive(Clone, Copy, Debug, ThisError)]
1000pub enum IntentError {
1001    #[error("{0}")]
1002    PlanShape(#[from] PolicyPlanError),
1003
1004    #[error("by_ids() cannot be combined with predicates")]
1005    ByIdsWithPredicate,
1006
1007    #[error("only() cannot be combined with predicates")]
1008    OnlyWithPredicate,
1009
1010    #[error("multiple key access methods were used on the same query")]
1011    KeyAccessConflict,
1012
1013    #[error("cursor pagination requires an explicit ordering")]
1014    CursorRequiresOrder,
1015
1016    #[error("cursor pagination requires an explicit limit")]
1017    CursorRequiresLimit,
1018
1019    #[error("cursor tokens can only be used with .page().execute()")]
1020    CursorRequiresPagedExecution,
1021
1022    #[error("grouped queries require execute_grouped(...)")]
1023    GroupedRequiresExecuteGrouped,
1024
1025    #[error("grouped field-target extrema are not supported in grouped v1")]
1026    GroupedFieldTargetExtremaUnsupported,
1027
1028    #[error("HAVING requires GROUP BY")]
1029    HavingRequiresGroupBy,
1030}
1031
1032impl From<CursorPagingPolicyError> for IntentError {
1033    fn from(err: CursorPagingPolicyError) -> Self {
1034        match err {
1035            CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
1036            CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
1037        }
1038    }
1039}
1040
1041impl From<IntentKeyAccessPolicyViolation> for IntentError {
1042    fn from(err: IntentKeyAccessPolicyViolation) -> Self {
1043        match err {
1044            IntentKeyAccessPolicyViolation::KeyAccessConflict => Self::KeyAccessConflict,
1045            IntentKeyAccessPolicyViolation::ByIdsWithPredicate => Self::ByIdsWithPredicate,
1046            IntentKeyAccessPolicyViolation::OnlyWithPredicate => Self::OnlyWithPredicate,
1047        }
1048    }
1049}
1050
1051impl From<IntentTerminalPolicyViolation> for IntentError {
1052    fn from(err: IntentTerminalPolicyViolation) -> Self {
1053        match err {
1054            IntentTerminalPolicyViolation::GroupedFieldTargetExtremaUnsupported => {
1055                Self::GroupedFieldTargetExtremaUnsupported
1056            }
1057        }
1058    }
1059}
1060
1061impl From<FluentLoadPolicyViolation> for IntentError {
1062    fn from(err: FluentLoadPolicyViolation) -> Self {
1063        match err {
1064            FluentLoadPolicyViolation::CursorRequiresPagedExecution => {
1065                Self::CursorRequiresPagedExecution
1066            }
1067            FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped => {
1068                Self::GroupedRequiresExecuteGrouped
1069            }
1070            FluentLoadPolicyViolation::CursorRequiresOrder => Self::CursorRequiresOrder,
1071            FluentLoadPolicyViolation::CursorRequiresLimit => Self::CursorRequiresLimit,
1072        }
1073    }
1074}
1075
1076/// Helper to append an ordering field while preserving existing order spec.
1077fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
1078    match order {
1079        Some(mut spec) => {
1080            spec.fields.push((field.to_string(), direction));
1081            spec
1082        }
1083        None => OrderSpec {
1084            fields: vec![(field.to_string(), direction)],
1085        },
1086    }
1087}
1088
1089// Normalize ORDER BY into a canonical, deterministic shape:
1090// - preserve user field order
1091// - remove explicit primary-key references from the user segment
1092// - append exactly one primary-key field as the terminal tie-break
1093fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
1094    let mut order = order?;
1095    if order.fields.is_empty() {
1096        return Some(order);
1097    }
1098
1099    let pk_field = model.primary_key.name;
1100    let mut pk_direction = None;
1101    order.fields.retain(|(field, direction)| {
1102        if field == pk_field {
1103            if pk_direction.is_none() {
1104                pk_direction = Some(*direction);
1105            }
1106            false
1107        } else {
1108            true
1109        }
1110    });
1111
1112    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
1113    order.fields.push((pk_field.to_string(), pk_direction));
1114
1115    Some(order)
1116}