Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2
3//! Module: query::intent
4//! Responsibility: query intent construction, coercion, and semantic-plan compilation.
5//! Does not own: executor runtime behavior or index storage details.
6//! Boundary: typed/fluent query inputs lowered into validated logical plans.
7
8#[cfg(test)]
9mod tests;
10
11// Key-only access intent and helpers.
12pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
13pub type LoadSpec = crate::db::query::plan::LoadSpec;
14pub type QueryMode = crate::db::query::plan::QueryMode;
15
16use crate::{
17    db::{
18        access::{AccessPath, AccessPlan, AccessPlanError, canonical_by_keys_path},
19        predicate::{
20            CompareOp, MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
21            normalize_enum_literals, reject_unsupported_query_features,
22        },
23        query::{
24            explain::ExplainPlan,
25            expr::{FilterExpr, SortExpr, SortLowerError},
26            plan::{
27                AccessPlannedQuery, CursorPagingPolicyError, DeleteLimitSpec,
28                FluentLoadPolicyViolation, GroupAggregateKind, GroupAggregateSpec,
29                GroupHavingClause, GroupHavingSpec, GroupHavingSymbol, GroupSpec,
30                GroupedExecutionConfig, IntentKeyAccessKind as IntentValidationKeyAccessKind,
31                IntentKeyAccessPolicyViolation, IntentTerminalPolicyViolation, LogicalPlan,
32                OrderDirection, OrderSpec, PageSpec, PlanError, PlannerError, PolicyPlanError,
33                ScalarPlan, has_explicit_order, plan_access, resolve_group_field_slot,
34                validate_group_query_semantics, validate_grouped_field_target_extrema_policy,
35                validate_intent_key_access_policy, validate_intent_plan_shape,
36                validate_order_shape, validate_query_semantics,
37            },
38        },
39        response::ResponseError,
40    },
41    error::InternalError,
42    model::entity::EntityModel,
43    traits::{EntityKind, FieldValue, SingletonEntity},
44    value::Value,
45};
46use std::marker::PhantomData;
47use thiserror::Error as ThisError;
48
49///
50/// KeyAccess
51/// Primary-key-only access hints for query planning.
52///
53
54#[derive(Clone, Debug, Eq, PartialEq)]
55pub(crate) enum KeyAccess<K> {
56    Single(K),
57    Many(Vec<K>),
58}
59
60///
61/// KeyAccessKind
62/// Identifies which key-only builder set the access path.
63///
64
65#[derive(Clone, Copy, Debug, Eq, PartialEq)]
66pub(crate) enum KeyAccessKind {
67    Single,
68    Many,
69    Only,
70}
71
72///
73/// KeyAccessState
74/// Tracks key-only access plus its origin for intent validation.
75///
76
77#[derive(Clone, Debug, Eq, PartialEq)]
78pub(crate) struct KeyAccessState<K> {
79    pub kind: KeyAccessKind,
80    pub access: KeyAccess<K>,
81}
82
83// Build a model-level access plan for key-only intents.
84pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
85where
86    K: FieldValue,
87{
88    // Phase 1: map typed keys into model-level Value access paths.
89    match access {
90        KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
91        KeyAccess::Many(keys) => {
92            let values = keys.iter().map(FieldValue::to_value).collect();
93            AccessPlan::path(canonical_by_keys_path(values))
94        }
95    }
96}
97
98// Convert model-level access plans into entity-keyed access plans.
99pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
100    model: &EntityModel,
101    access: AccessPlan<Value>,
102) -> Result<AccessPlan<E::Key>, PlanError> {
103    access.into_executable::<E>(model)
104}
105
106// Convert model-level key values into typed entity keys.
107pub(crate) fn coerce_entity_key<E: EntityKind>(
108    model: &EntityModel,
109    key: &Value,
110) -> Result<E::Key, PlanError> {
111    E::Key::from_value(key).ok_or_else(|| {
112        PlanError::from(AccessPlanError::PrimaryKeyMismatch {
113            field: model.primary_key.name.to_string(),
114            key: key.clone(),
115        })
116    })
117}
118
119impl AccessPlan<Value> {
120    /// Convert model-level access plans into typed executable access plans.
121    pub(crate) fn into_executable<E: EntityKind>(
122        self,
123        model: &EntityModel,
124    ) -> Result<AccessPlan<E::Key>, PlanError> {
125        match self {
126            Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
127            Self::Union(children) => {
128                let mut out = Vec::with_capacity(children.len());
129                for child in children {
130                    out.push(child.into_executable::<E>(model)?);
131                }
132
133                Ok(AccessPlan::union(out))
134            }
135            Self::Intersection(children) => {
136                let mut out = Vec::with_capacity(children.len());
137                for child in children {
138                    out.push(child.into_executable::<E>(model)?);
139                }
140
141                Ok(AccessPlan::intersection(out))
142            }
143        }
144    }
145}
146
147impl AccessPath<Value> {
148    /// Convert one model-level access path into a typed executable access path.
149    pub(crate) fn into_executable<E: EntityKind>(
150        self,
151        model: &EntityModel,
152    ) -> Result<AccessPath<E::Key>, PlanError> {
153        match self {
154            Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
155            Self::ByKeys(keys) => {
156                let mut out = Vec::with_capacity(keys.len());
157                for key in keys {
158                    out.push(coerce_entity_key::<E>(model, &key)?);
159                }
160
161                Ok(AccessPath::ByKeys(out))
162            }
163            Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
164                start: coerce_entity_key::<E>(model, &start)?,
165                end: coerce_entity_key::<E>(model, &end)?,
166            }),
167            Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
168            Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
169            Self::FullScan => Ok(AccessPath::FullScan),
170        }
171    }
172}
173
174///
175/// QueryModel
176///
177/// Model-level query intent and planning context.
178/// Consumes an `EntityModel` derived from typed entity definitions.
179///
180
181#[derive(Debug)]
182pub(crate) struct QueryModel<'m, K> {
183    model: &'m EntityModel,
184    mode: QueryMode,
185    predicate: Option<Predicate>,
186    key_access: Option<KeyAccessState<K>>,
187    key_access_conflict: bool,
188    group: Option<crate::db::query::plan::GroupSpec>,
189    having: Option<GroupHavingSpec>,
190    order: Option<OrderSpec>,
191    distinct: bool,
192    consistency: MissingRowPolicy,
193}
194
195impl<'m, K: FieldValue> QueryModel<'m, K> {
196    #[must_use]
197    pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
198        Self {
199            model,
200            mode: QueryMode::Load(LoadSpec::new()),
201            predicate: None,
202            key_access: None,
203            key_access_conflict: false,
204            group: None,
205            having: None,
206            order: None,
207            distinct: false,
208            consistency,
209        }
210    }
211
212    /// Return the intent mode (load vs delete).
213    #[must_use]
214    pub(crate) const fn mode(&self) -> QueryMode {
215        self.mode
216    }
217
218    #[must_use]
219    fn has_explicit_order(&self) -> bool {
220        has_explicit_order(self.order.as_ref())
221    }
222
223    #[must_use]
224    const fn has_grouping(&self) -> bool {
225        self.group.is_some()
226    }
227
228    #[must_use]
229    const fn load_spec(&self) -> Option<LoadSpec> {
230        match self.mode {
231            QueryMode::Load(spec) => Some(spec),
232            QueryMode::Delete(_) => None,
233        }
234    }
235
236    /// Add a predicate, implicitly AND-ing with any existing predicate.
237    #[must_use]
238    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
239        self.predicate = match self.predicate.take() {
240            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
241            None => Some(predicate),
242        };
243        self
244    }
245
246    /// Apply a dynamic filter expression using the model schema.
247    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
248        let schema = SchemaInfo::from_entity_model(self.model)?;
249        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
250
251        Ok(self.filter(predicate))
252    }
253
254    /// Apply a dynamic sort expression using the model schema.
255    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
256        let schema = SchemaInfo::from_entity_model(self.model)?;
257        let order = match expr.lower_with(&schema) {
258            Ok(order) => order,
259            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
260            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
261        };
262
263        validate_order_shape(Some(&order))
264            .map_err(IntentError::from)
265            .map_err(QueryError::from)?;
266
267        Ok(self.order_spec(order))
268    }
269
270    /// Append an ascending sort key.
271    #[must_use]
272    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
273        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
274        self
275    }
276
277    /// Append a descending sort key.
278    #[must_use]
279    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
280        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
281        self
282    }
283
284    /// Set a fully-specified order spec (validated before reaching this boundary).
285    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
286        self.order = Some(order);
287        self
288    }
289
290    /// Enable DISTINCT semantics for this query intent.
291    #[must_use]
292    pub(crate) const fn distinct(mut self) -> Self {
293        self.distinct = true;
294        self
295    }
296
297    // Resolve one grouped field into one stable field slot and append it to the
298    // grouped spec in declaration order.
299    fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
300        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
301        let group = self.group.get_or_insert(GroupSpec {
302            group_fields: Vec::new(),
303            aggregates: Vec::new(),
304            execution: GroupedExecutionConfig::unbounded(),
305        });
306        if !group
307            .group_fields
308            .iter()
309            .any(|existing| existing.index() == field_slot.index())
310        {
311            group.group_fields.push(field_slot);
312        }
313
314        Ok(self)
315    }
316
317    // Append one grouped aggregate terminal to the grouped declarative spec.
318    fn push_group_aggregate(
319        mut self,
320        kind: GroupAggregateKind,
321        target_field: Option<String>,
322    ) -> Self {
323        let group = self.group.get_or_insert(GroupSpec {
324            group_fields: Vec::new(),
325            aggregates: Vec::new(),
326            execution: GroupedExecutionConfig::unbounded(),
327        });
328        group
329            .aggregates
330            .push(GroupAggregateSpec { kind, target_field });
331
332        self
333    }
334
335    // Override grouped hard limits for this grouped query.
336    fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
337        let group = self.group.get_or_insert(GroupSpec {
338            group_fields: Vec::new(),
339            aggregates: Vec::new(),
340            execution: GroupedExecutionConfig::unbounded(),
341        });
342        group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
343
344        self
345    }
346
347    // Append one grouped HAVING compare clause after GROUP BY terminal declaration.
348    fn push_having_clause(mut self, clause: GroupHavingClause) -> Result<Self, QueryError> {
349        if self.group.is_none() {
350            return Err(QueryError::Intent(IntentError::HavingRequiresGroupBy));
351        }
352
353        let having = self.having.get_or_insert(GroupHavingSpec {
354            clauses: Vec::new(),
355        });
356        having.clauses.push(clause);
357
358        Ok(self)
359    }
360
361    // Append one grouped HAVING clause that references one grouped key field.
362    fn push_having_group_clause(
363        self,
364        field: &str,
365        op: CompareOp,
366        value: Value,
367    ) -> Result<Self, QueryError> {
368        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
369
370        self.push_having_clause(GroupHavingClause {
371            symbol: GroupHavingSymbol::GroupField(field_slot),
372            op,
373            value,
374        })
375    }
376
377    // Append one grouped HAVING clause that references one grouped aggregate output.
378    fn push_having_aggregate_clause(
379        self,
380        aggregate_index: usize,
381        op: CompareOp,
382        value: Value,
383    ) -> Result<Self, QueryError> {
384        self.push_having_clause(GroupHavingClause {
385            symbol: GroupHavingSymbol::AggregateIndex(aggregate_index),
386            op,
387            value,
388        })
389    }
390
391    /// Track key-only access paths and detect conflicting key intents.
392    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
393        if let Some(existing) = &self.key_access
394            && existing.kind != kind
395        {
396            self.key_access_conflict = true;
397        }
398
399        self.key_access = Some(KeyAccessState { kind, access });
400
401        self
402    }
403
404    /// Set the access path to a single primary key lookup.
405    pub(crate) fn by_id(self, id: K) -> Self {
406        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
407    }
408
409    /// Set the access path to a primary key batch lookup.
410    pub(crate) fn by_ids<I>(self, ids: I) -> Self
411    where
412        I: IntoIterator<Item = K>,
413    {
414        self.set_key_access(
415            KeyAccessKind::Many,
416            KeyAccess::Many(ids.into_iter().collect()),
417        )
418    }
419
420    /// Set the access path to the singleton primary key.
421    pub(crate) fn only(self, id: K) -> Self {
422        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
423    }
424
425    /// Mark this intent as a delete query.
426    #[must_use]
427    pub(crate) const fn delete(mut self) -> Self {
428        if self.mode.is_load() {
429            self.mode = QueryMode::Delete(DeleteSpec::new());
430        }
431        self
432    }
433
434    /// Apply a limit to the current mode.
435    ///
436    /// Load limits bound result size; delete limits bound mutation size.
437    #[must_use]
438    pub(crate) const fn limit(mut self, limit: u32) -> Self {
439        match self.mode {
440            QueryMode::Load(mut spec) => {
441                spec.limit = Some(limit);
442                self.mode = QueryMode::Load(spec);
443            }
444            QueryMode::Delete(mut spec) => {
445                spec.limit = Some(limit);
446                self.mode = QueryMode::Delete(spec);
447            }
448        }
449        self
450    }
451
452    /// Apply an offset to a load intent.
453    #[must_use]
454    pub(crate) const fn offset(mut self, offset: u32) -> Self {
455        if let QueryMode::Load(mut spec) = self.mode {
456            spec.offset = offset;
457            self.mode = QueryMode::Load(spec);
458        }
459        self
460    }
461
462    /// Build a model-level logical plan using Value-based access keys.
463    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
464        // Phase 1: schema surface and intent validation.
465        let schema_info = SchemaInfo::from_entity_model(self.model)?;
466        self.validate_intent()?;
467
468        // Phase 2: predicate normalization and access planning.
469        let normalized_predicate = self
470            .predicate
471            .as_ref()
472            .map(|predicate| {
473                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
474                let predicate = normalize_enum_literals(&schema_info, predicate)?;
475                Ok::<Predicate, ValidateError>(normalize(&predicate))
476            })
477            .transpose()?;
478        let access_plan_value = match &self.key_access {
479            Some(state) => access_plan_from_keys_value(&state.access),
480            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
481        };
482
483        // Phase 3: assemble the executor-ready plan.
484        let scalar = ScalarPlan {
485            mode: self.mode,
486            predicate: normalized_predicate,
487            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
488            // This ensures explain/fingerprint/execution share one deterministic order shape.
489            order: canonicalize_order_spec(self.model, self.order.clone()),
490            distinct: self.distinct,
491            delete_limit: match self.mode {
492                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
493                QueryMode::Load(_) => None,
494            },
495            page: match self.mode {
496                QueryMode::Load(spec) => {
497                    if spec.limit.is_some() || spec.offset > 0 {
498                        Some(PageSpec {
499                            limit: spec.limit,
500                            offset: spec.offset,
501                        })
502                    } else {
503                        None
504                    }
505                }
506                QueryMode::Delete(_) => None,
507            },
508            consistency: self.consistency,
509        };
510        let mut plan =
511            AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
512        if let Some(group) = self.group.clone() {
513            plan = match self.having.clone() {
514                Some(having) => plan.into_grouped_with_having(group, Some(having)),
515                None => plan.into_grouped(group),
516            };
517        }
518
519        if plan.grouped_plan().is_some() {
520            validate_group_query_semantics(&schema_info, self.model, &plan)?;
521        } else {
522            validate_query_semantics(&schema_info, self.model, &plan)?;
523        }
524
525        Ok(plan)
526    }
527
528    // Validate pre-plan policy invariants and key-access rules before planning.
529    fn validate_intent(&self) -> Result<(), IntentError> {
530        validate_intent_plan_shape(self.mode, self.order.as_ref()).map_err(IntentError::from)?;
531
532        let key_access_kind = self.key_access.as_ref().map(|state| match state.kind {
533            KeyAccessKind::Single => IntentValidationKeyAccessKind::Single,
534            KeyAccessKind::Many => IntentValidationKeyAccessKind::Many,
535            KeyAccessKind::Only => IntentValidationKeyAccessKind::Only,
536        });
537        validate_intent_key_access_policy(
538            self.key_access_conflict,
539            key_access_kind,
540            self.predicate.is_some(),
541        )
542        .map_err(IntentError::from)?;
543        if self.having.is_some() && self.group.is_none() {
544            return Err(IntentError::HavingRequiresGroupBy);
545        }
546
547        Ok(())
548    }
549}
550
551///
552/// Query
553///
554/// Typed, declarative query intent for a specific entity type.
555///
556/// This intent is:
557/// - schema-agnostic at construction
558/// - normalized and validated only during planning
559/// - free of access-path decisions
560///
561
562///
563/// PlannedQuery
564///
565/// Neutral query-owned planned contract produced by query planning.
566/// Stores logical + access shape without executor compilation state.
567///
568#[derive(Debug)]
569pub struct PlannedQuery<E: EntityKind> {
570    plan: AccessPlannedQuery<E::Key>,
571    _marker: PhantomData<E>,
572}
573
574impl<E: EntityKind> PlannedQuery<E> {
575    #[must_use]
576    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
577        Self {
578            plan,
579            _marker: PhantomData,
580        }
581    }
582
583    #[must_use]
584    pub fn explain(&self) -> ExplainPlan {
585        self.plan.explain_with_model(E::MODEL)
586    }
587}
588
589///
590/// CompiledQuery
591///
592/// Query-owned compiled handoff produced by `Query::plan()`.
593/// This type intentionally carries only logical/access query semantics.
594/// Executor runtime shape is derived explicitly at the executor boundary.
595///
596#[derive(Clone, Debug)]
597pub struct CompiledQuery<E: EntityKind> {
598    plan: AccessPlannedQuery<E::Key>,
599    _marker: PhantomData<E>,
600}
601
602impl<E: EntityKind> CompiledQuery<E> {
603    #[must_use]
604    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
605        Self {
606            plan,
607            _marker: PhantomData,
608        }
609    }
610
611    #[must_use]
612    pub fn explain(&self) -> ExplainPlan {
613        self.plan.explain_with_model(E::MODEL)
614    }
615
616    #[must_use]
617    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
618        self.plan
619    }
620}
621
622#[derive(Debug)]
623pub struct Query<E: EntityKind> {
624    intent: QueryModel<'static, E::Key>,
625    _marker: PhantomData<E>,
626}
627
628impl<E: EntityKind> Query<E> {
629    /// Create a new intent with an explicit missing-row policy.
630    /// Ignore favors idempotency and may mask index/data divergence on deletes.
631    /// Use Error to surface missing rows during scan/delete execution.
632    #[must_use]
633    pub const fn new(consistency: MissingRowPolicy) -> Self {
634        Self {
635            intent: QueryModel::new(E::MODEL, consistency),
636            _marker: PhantomData,
637        }
638    }
639
640    /// Return the intent mode (load vs delete).
641    #[must_use]
642    pub const fn mode(&self) -> QueryMode {
643        self.intent.mode()
644    }
645
646    #[must_use]
647    pub(crate) fn has_explicit_order(&self) -> bool {
648        self.intent.has_explicit_order()
649    }
650
651    #[must_use]
652    pub(crate) const fn has_grouping(&self) -> bool {
653        self.intent.has_grouping()
654    }
655
656    #[must_use]
657    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
658        self.intent.load_spec()
659    }
660
661    /// Add a predicate, implicitly AND-ing with any existing predicate.
662    #[must_use]
663    pub fn filter(mut self, predicate: Predicate) -> Self {
664        self.intent = self.intent.filter(predicate);
665        self
666    }
667
668    /// Apply a dynamic filter expression.
669    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
670        let Self { intent, _marker } = self;
671        let intent = intent.filter_expr(expr)?;
672
673        Ok(Self { intent, _marker })
674    }
675
676    /// Apply a dynamic sort expression.
677    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
678        let Self { intent, _marker } = self;
679        let intent = intent.sort_expr(expr)?;
680
681        Ok(Self { intent, _marker })
682    }
683
684    /// Append an ascending sort key.
685    #[must_use]
686    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
687        self.intent = self.intent.order_by(field);
688        self
689    }
690
691    /// Append a descending sort key.
692    #[must_use]
693    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
694        self.intent = self.intent.order_by_desc(field);
695        self
696    }
697
698    /// Enable DISTINCT semantics for this query.
699    #[must_use]
700    pub fn distinct(mut self) -> Self {
701        self.intent = self.intent.distinct();
702        self
703    }
704
705    /// Add one GROUP BY field.
706    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
707        let Self { intent, _marker } = self;
708        let intent = intent.push_group_field(field.as_ref())?;
709
710        Ok(Self { intent, _marker })
711    }
712
713    fn push_group_terminal(mut self, kind: GroupAggregateKind) -> Self {
714        self.intent = self.intent.push_group_aggregate(kind, None);
715        self
716    }
717
718    /// Add one grouped `count(*)` terminal.
719    #[must_use]
720    pub fn group_count(self) -> Self {
721        self.push_group_terminal(GroupAggregateKind::Count)
722    }
723
724    /// Add one grouped `exists` terminal.
725    #[must_use]
726    pub fn group_exists(self) -> Self {
727        self.push_group_terminal(GroupAggregateKind::Exists)
728    }
729
730    /// Add one grouped `first` terminal.
731    #[must_use]
732    pub fn group_first(self) -> Self {
733        self.push_group_terminal(GroupAggregateKind::First)
734    }
735
736    /// Add one grouped `last` terminal.
737    #[must_use]
738    pub fn group_last(self) -> Self {
739        self.push_group_terminal(GroupAggregateKind::Last)
740    }
741
742    /// Add one grouped `min` terminal (id extrema).
743    #[must_use]
744    pub fn group_min(self) -> Self {
745        self.push_group_terminal(GroupAggregateKind::Min)
746    }
747
748    /// Add one grouped `max` terminal (id extrema).
749    #[must_use]
750    pub fn group_max(self) -> Self {
751        self.push_group_terminal(GroupAggregateKind::Max)
752    }
753
754    /// Add one grouped `min(field)` terminal.
755    ///
756    /// Grouped field-target extrema are deferred in grouped v1.
757    pub fn group_min_by(self, _field: impl AsRef<str>) -> Result<Self, QueryError> {
758        validate_grouped_field_target_extrema_policy()
759            .map_err(IntentError::from)
760            .map_err(QueryError::Intent)?;
761
762        Ok(self)
763    }
764
765    /// Add one grouped `max(field)` terminal.
766    ///
767    /// Grouped field-target extrema are deferred in grouped v1.
768    pub fn group_max_by(self, _field: impl AsRef<str>) -> Result<Self, QueryError> {
769        validate_grouped_field_target_extrema_policy()
770            .map_err(IntentError::from)
771            .map_err(QueryError::Intent)?;
772
773        Ok(self)
774    }
775
776    /// Override grouped hard limits for grouped execution budget enforcement.
777    #[must_use]
778    pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
779        self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
780        self
781    }
782
783    /// Add one grouped HAVING compare clause over one grouped key field.
784    pub fn having_group(
785        self,
786        field: impl AsRef<str>,
787        op: CompareOp,
788        value: Value,
789    ) -> Result<Self, QueryError> {
790        let field = field.as_ref().to_owned();
791        let Self { intent, _marker } = self;
792        let intent = intent.push_having_group_clause(&field, op, value)?;
793
794        Ok(Self { intent, _marker })
795    }
796
797    /// Add one grouped HAVING compare clause over one grouped aggregate output.
798    pub fn having_aggregate(
799        self,
800        aggregate_index: usize,
801        op: CompareOp,
802        value: Value,
803    ) -> Result<Self, QueryError> {
804        let Self { intent, _marker } = self;
805        let intent = intent.push_having_aggregate_clause(aggregate_index, op, value)?;
806
807        Ok(Self { intent, _marker })
808    }
809
810    /// Set the access path to a single primary key lookup.
811    pub(crate) fn by_id(self, id: E::Key) -> Self {
812        let Self { intent, _marker } = self;
813        Self {
814            intent: intent.by_id(id),
815            _marker,
816        }
817    }
818
819    /// Set the access path to a primary key batch lookup.
820    pub(crate) fn by_ids<I>(self, ids: I) -> Self
821    where
822        I: IntoIterator<Item = E::Key>,
823    {
824        let Self { intent, _marker } = self;
825        Self {
826            intent: intent.by_ids(ids),
827            _marker,
828        }
829    }
830
831    /// Mark this intent as a delete query.
832    #[must_use]
833    pub fn delete(mut self) -> Self {
834        self.intent = self.intent.delete();
835        self
836    }
837
838    /// Apply a limit to the current mode.
839    ///
840    /// Load limits bound result size; delete limits bound mutation size.
841    /// For scalar load queries, any use of `limit` or `offset` requires an
842    /// explicit `order_by(...)` so pagination is deterministic.
843    /// GROUP BY queries use canonical grouped-key order by default.
844    #[must_use]
845    pub fn limit(mut self, limit: u32) -> Self {
846        self.intent = self.intent.limit(limit);
847        self
848    }
849
850    /// Apply an offset to a load intent.
851    ///
852    /// Scalar pagination requires an explicit `order_by(...)`.
853    /// GROUP BY queries use canonical grouped-key order by default.
854    #[must_use]
855    pub fn offset(mut self, offset: u32) -> Self {
856        self.intent = self.intent.offset(offset);
857        self
858    }
859
860    /// Explain this intent without executing it.
861    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
862        let plan = self.planned()?;
863
864        Ok(plan.explain())
865    }
866
867    /// Plan this intent into a neutral planned query contract.
868    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
869        let plan = self.build_plan()?;
870
871        Ok(PlannedQuery::new(plan))
872    }
873
874    /// Compile this intent into query-owned handoff state.
875    ///
876    /// This boundary intentionally does not expose executor runtime shape.
877    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
878        let plan = self.build_plan()?;
879
880        Ok(CompiledQuery::new(plan))
881    }
882
883    // Build a logical plan for the current intent.
884    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
885        let plan_value = self.intent.build_plan_model()?;
886        let (logical, access) = plan_value.into_parts();
887        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
888        let plan = AccessPlannedQuery::from_parts(logical, access);
889
890        Ok(plan)
891    }
892}
893
894impl<E> Query<E>
895where
896    E: EntityKind + SingletonEntity,
897    E::Key: Default,
898{
899    /// Set the access path to the singleton primary key.
900    pub(crate) fn only(self) -> Self {
901        let Self { intent, _marker } = self;
902
903        Self {
904            intent: intent.only(E::Key::default()),
905            _marker,
906        }
907    }
908}
909
910///
911/// QueryError
912///
913
914#[derive(Debug, ThisError)]
915pub enum QueryError {
916    #[error("{0}")]
917    Validate(#[from] ValidateError),
918
919    #[error("{0}")]
920    Plan(Box<PlanError>),
921
922    #[error("{0}")]
923    Intent(#[from] IntentError),
924
925    #[error("{0}")]
926    Response(#[from] ResponseError),
927
928    #[error("{0}")]
929    Execute(#[from] InternalError),
930}
931
932impl From<PlannerError> for QueryError {
933    fn from(err: PlannerError) -> Self {
934        match err {
935            PlannerError::Plan(err) => Self::from(*err),
936            PlannerError::Internal(err) => Self::Execute(*err),
937        }
938    }
939}
940
941impl From<PlanError> for QueryError {
942    fn from(err: PlanError) -> Self {
943        Self::Plan(Box::new(err))
944    }
945}
946
947///
948/// IntentError
949///
950
951#[derive(Clone, Copy, Debug, ThisError)]
952pub enum IntentError {
953    #[error("{0}")]
954    PlanShape(#[from] PolicyPlanError),
955
956    #[error("by_ids() cannot be combined with predicates")]
957    ByIdsWithPredicate,
958
959    #[error("only() cannot be combined with predicates")]
960    OnlyWithPredicate,
961
962    #[error("multiple key access methods were used on the same query")]
963    KeyAccessConflict,
964
965    #[error("cursor pagination requires an explicit ordering")]
966    CursorRequiresOrder,
967
968    #[error("cursor pagination requires an explicit limit")]
969    CursorRequiresLimit,
970
971    #[error("cursor tokens can only be used with .page().execute()")]
972    CursorRequiresPagedExecution,
973
974    #[error("grouped queries require execute_grouped(...)")]
975    GroupedRequiresExecuteGrouped,
976
977    #[error("grouped field-target extrema are not supported in grouped v1")]
978    GroupedFieldTargetExtremaUnsupported,
979
980    #[error("HAVING requires GROUP BY")]
981    HavingRequiresGroupBy,
982}
983
984impl From<CursorPagingPolicyError> for IntentError {
985    fn from(err: CursorPagingPolicyError) -> Self {
986        match err {
987            CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
988            CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
989        }
990    }
991}
992
993impl From<IntentKeyAccessPolicyViolation> for IntentError {
994    fn from(err: IntentKeyAccessPolicyViolation) -> Self {
995        match err {
996            IntentKeyAccessPolicyViolation::KeyAccessConflict => Self::KeyAccessConflict,
997            IntentKeyAccessPolicyViolation::ByIdsWithPredicate => Self::ByIdsWithPredicate,
998            IntentKeyAccessPolicyViolation::OnlyWithPredicate => Self::OnlyWithPredicate,
999        }
1000    }
1001}
1002
1003impl From<IntentTerminalPolicyViolation> for IntentError {
1004    fn from(err: IntentTerminalPolicyViolation) -> Self {
1005        match err {
1006            IntentTerminalPolicyViolation::GroupedFieldTargetExtremaUnsupported => {
1007                Self::GroupedFieldTargetExtremaUnsupported
1008            }
1009        }
1010    }
1011}
1012
1013impl From<FluentLoadPolicyViolation> for IntentError {
1014    fn from(err: FluentLoadPolicyViolation) -> Self {
1015        match err {
1016            FluentLoadPolicyViolation::CursorRequiresPagedExecution => {
1017                Self::CursorRequiresPagedExecution
1018            }
1019            FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped => {
1020                Self::GroupedRequiresExecuteGrouped
1021            }
1022            FluentLoadPolicyViolation::CursorRequiresOrder => Self::CursorRequiresOrder,
1023            FluentLoadPolicyViolation::CursorRequiresLimit => Self::CursorRequiresLimit,
1024        }
1025    }
1026}
1027
1028/// Helper to append an ordering field while preserving existing order spec.
1029fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
1030    match order {
1031        Some(mut spec) => {
1032            spec.fields.push((field.to_string(), direction));
1033            spec
1034        }
1035        None => OrderSpec {
1036            fields: vec![(field.to_string(), direction)],
1037        },
1038    }
1039}
1040
1041// Normalize ORDER BY into a canonical, deterministic shape:
1042// - preserve user field order
1043// - remove explicit primary-key references from the user segment
1044// - append exactly one primary-key field as the terminal tie-break
1045fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
1046    let mut order = order?;
1047    if order.fields.is_empty() {
1048        return Some(order);
1049    }
1050
1051    let pk_field = model.primary_key.name;
1052    let mut pk_direction = None;
1053    order.fields.retain(|(field, direction)| {
1054        if field == pk_field {
1055            if pk_direction.is_none() {
1056                pk_direction = Some(*direction);
1057            }
1058            false
1059        } else {
1060            true
1061        }
1062    });
1063
1064    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
1065    order.fields.push((pk_field.to_string(), pk_direction));
1066
1067    Some(order)
1068}