Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2
3//! Module: query::intent
4//! Responsibility: query intent construction, coercion, and semantic-plan compilation.
5//! Does not own: executor runtime behavior or index storage details.
6//! Boundary: typed/fluent query inputs lowered into validated logical plans.
7
8#[cfg(test)]
9mod tests;
10
11// Key-only access intent and helpers.
12pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
13pub type LoadSpec = crate::db::query::plan::LoadSpec;
14pub type QueryMode = crate::db::query::plan::QueryMode;
15
16use crate::{
17    db::{
18        access::{AccessPath, AccessPlan, AccessPlanError, normalize_access_plan_value},
19        cursor::CursorPlanError,
20        predicate::{
21            CompareOp, MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
22            normalize_enum_literals, reject_unsupported_query_features,
23        },
24        query::{
25            builder::aggregate::AggregateExpr,
26            explain::ExplainPlan,
27            expr::{FilterExpr, SortExpr, SortLowerError},
28            plan::{
29                AccessPlannedQuery, CursorPagingPolicyError, DeleteLimitSpec,
30                FluentLoadPolicyViolation, GroupAggregateSpec, GroupHavingClause, GroupHavingSpec,
31                GroupHavingSymbol, GroupSpec, GroupedExecutionConfig,
32                IntentKeyAccessKind as IntentValidationKeyAccessKind,
33                IntentKeyAccessPolicyViolation, LogicalPlan, OrderDirection, OrderSpec, PageSpec,
34                PlanError, PlannerError, PolicyPlanError, ScalarPlan, has_explicit_order,
35                plan_access, resolve_group_field_slot, validate_group_query_semantics,
36                validate_intent_key_access_policy, validate_intent_plan_shape,
37                validate_order_shape, validate_query_semantics,
38            },
39        },
40        response::ResponseError,
41    },
42    error::{ErrorClass, InternalError},
43    model::entity::EntityModel,
44    traits::{EntityKind, FieldValue, SingletonEntity},
45    value::Value,
46};
47use std::marker::PhantomData;
48use thiserror::Error as ThisError;
49
50///
51/// KeyAccess
52/// Primary-key-only access hints for query planning.
53///
54
55#[derive(Clone, Debug, Eq, PartialEq)]
56pub(crate) enum KeyAccess<K> {
57    Single(K),
58    Many(Vec<K>),
59}
60
61///
62/// KeyAccessKind
63/// Identifies which key-only builder set the access path.
64///
65
66#[derive(Clone, Copy, Debug, Eq, PartialEq)]
67pub(crate) enum KeyAccessKind {
68    Single,
69    Many,
70    Only,
71}
72
73///
74/// KeyAccessState
75/// Tracks key-only access plus its origin for intent validation.
76///
77
78#[derive(Clone, Debug, Eq, PartialEq)]
79pub(crate) struct KeyAccessState<K> {
80    pub kind: KeyAccessKind,
81    pub access: KeyAccess<K>,
82}
83
84// Build a model-level access plan for key-only intents.
85pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
86where
87    K: FieldValue,
88{
89    // Phase 1: map typed keys into model-level Value access paths.
90    let plan = match access {
91        KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
92        KeyAccess::Many(keys) => {
93            let values = keys.iter().map(FieldValue::to_value).collect();
94            AccessPlan::path(AccessPath::ByKeys(values))
95        }
96    };
97
98    // Phase 2: canonicalize the access shape via the shared access boundary.
99    normalize_access_plan_value(plan)
100}
101
102// Convert model-level access plans into entity-keyed access plans.
103pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
104    model: &EntityModel,
105    access: AccessPlan<Value>,
106) -> Result<AccessPlan<E::Key>, PlanError> {
107    access.into_executable::<E>(model)
108}
109
110// Convert model-level key values into typed entity keys.
111pub(crate) fn coerce_entity_key<E: EntityKind>(
112    model: &EntityModel,
113    key: &Value,
114) -> Result<E::Key, PlanError> {
115    E::Key::from_value(key).ok_or_else(|| {
116        PlanError::from(AccessPlanError::PrimaryKeyMismatch {
117            field: model.primary_key.name.to_string(),
118            key: key.clone(),
119        })
120    })
121}
122
123impl AccessPlan<Value> {
124    /// Convert model-level access plans into typed executable access plans.
125    pub(crate) fn into_executable<E: EntityKind>(
126        self,
127        model: &EntityModel,
128    ) -> Result<AccessPlan<E::Key>, PlanError> {
129        match self {
130            Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
131            Self::Union(children) => {
132                let mut out = Vec::with_capacity(children.len());
133                for child in children {
134                    out.push(child.into_executable::<E>(model)?);
135                }
136
137                Ok(AccessPlan::union(out))
138            }
139            Self::Intersection(children) => {
140                let mut out = Vec::with_capacity(children.len());
141                for child in children {
142                    out.push(child.into_executable::<E>(model)?);
143                }
144
145                Ok(AccessPlan::intersection(out))
146            }
147        }
148    }
149}
150
151impl AccessPath<Value> {
152    /// Convert one model-level access path into a typed executable access path.
153    pub(crate) fn into_executable<E: EntityKind>(
154        self,
155        model: &EntityModel,
156    ) -> Result<AccessPath<E::Key>, PlanError> {
157        match self {
158            Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
159            Self::ByKeys(keys) => {
160                let mut out = Vec::with_capacity(keys.len());
161                for key in keys {
162                    out.push(coerce_entity_key::<E>(model, &key)?);
163                }
164
165                Ok(AccessPath::ByKeys(out))
166            }
167            Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
168                start: coerce_entity_key::<E>(model, &start)?,
169                end: coerce_entity_key::<E>(model, &end)?,
170            }),
171            Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
172            Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
173            Self::FullScan => Ok(AccessPath::FullScan),
174        }
175    }
176}
177
178///
179/// QueryModel
180///
181/// Model-level query intent and planning context.
182/// Consumes an `EntityModel` derived from typed entity definitions.
183///
184
185#[derive(Debug)]
186pub(crate) struct QueryModel<'m, K> {
187    model: &'m EntityModel,
188    mode: QueryMode,
189    predicate: Option<Predicate>,
190    key_access: Option<KeyAccessState<K>>,
191    key_access_conflict: bool,
192    group: Option<crate::db::query::plan::GroupSpec>,
193    having: Option<GroupHavingSpec>,
194    order: Option<OrderSpec>,
195    distinct: bool,
196    consistency: MissingRowPolicy,
197}
198
199impl<'m, K: FieldValue> QueryModel<'m, K> {
200    #[must_use]
201    pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
202        Self {
203            model,
204            mode: QueryMode::Load(LoadSpec::new()),
205            predicate: None,
206            key_access: None,
207            key_access_conflict: false,
208            group: None,
209            having: None,
210            order: None,
211            distinct: false,
212            consistency,
213        }
214    }
215
216    /// Return the intent mode (load vs delete).
217    #[must_use]
218    pub(crate) const fn mode(&self) -> QueryMode {
219        self.mode
220    }
221
222    #[must_use]
223    fn has_explicit_order(&self) -> bool {
224        has_explicit_order(self.order.as_ref())
225    }
226
227    #[must_use]
228    const fn has_grouping(&self) -> bool {
229        self.group.is_some()
230    }
231
232    #[must_use]
233    const fn load_spec(&self) -> Option<LoadSpec> {
234        match self.mode {
235            QueryMode::Load(spec) => Some(spec),
236            QueryMode::Delete(_) => None,
237        }
238    }
239
240    /// Add a predicate, implicitly AND-ing with any existing predicate.
241    #[must_use]
242    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
243        self.predicate = match self.predicate.take() {
244            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
245            None => Some(predicate),
246        };
247        self
248    }
249
250    /// Apply a dynamic filter expression using the model schema.
251    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
252        let schema = SchemaInfo::from_entity_model(self.model)?;
253        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
254
255        Ok(self.filter(predicate))
256    }
257
258    /// Apply a dynamic sort expression using the model schema.
259    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
260        let schema = SchemaInfo::from_entity_model(self.model)?;
261        let order = match expr.lower_with(&schema) {
262            Ok(order) => order,
263            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
264            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
265        };
266
267        validate_order_shape(Some(&order))
268            .map_err(IntentError::from)
269            .map_err(QueryError::from)?;
270
271        Ok(self.order_spec(order))
272    }
273
274    /// Append an ascending sort key.
275    #[must_use]
276    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
277        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
278        self
279    }
280
281    /// Append a descending sort key.
282    #[must_use]
283    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
284        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
285        self
286    }
287
288    /// Set a fully-specified order spec (validated before reaching this boundary).
289    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
290        self.order = Some(order);
291        self
292    }
293
294    /// Enable DISTINCT semantics for this query intent.
295    #[must_use]
296    pub(crate) const fn distinct(mut self) -> Self {
297        self.distinct = true;
298        self
299    }
300
301    // Resolve one grouped field into one stable field slot and append it to the
302    // grouped spec in declaration order.
303    fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
304        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
305        let group = self.group.get_or_insert(GroupSpec {
306            group_fields: Vec::new(),
307            aggregates: Vec::new(),
308            execution: GroupedExecutionConfig::unbounded(),
309        });
310        if !group
311            .group_fields
312            .iter()
313            .any(|existing| existing.index() == field_slot.index())
314        {
315            group.group_fields.push(field_slot);
316        }
317
318        Ok(self)
319    }
320
321    // Append one grouped aggregate terminal to the grouped declarative spec.
322    fn push_group_aggregate(mut self, aggregate: AggregateExpr) -> Self {
323        let group = self.group.get_or_insert(GroupSpec {
324            group_fields: Vec::new(),
325            aggregates: Vec::new(),
326            execution: GroupedExecutionConfig::unbounded(),
327        });
328        group
329            .aggregates
330            .push(GroupAggregateSpec::from_aggregate_expr(&aggregate));
331
332        self
333    }
334
335    // Override grouped hard limits for this grouped query.
336    fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
337        let group = self.group.get_or_insert(GroupSpec {
338            group_fields: Vec::new(),
339            aggregates: Vec::new(),
340            execution: GroupedExecutionConfig::unbounded(),
341        });
342        group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
343
344        self
345    }
346
347    // Append one grouped HAVING compare clause after GROUP BY terminal declaration.
348    fn push_having_clause(mut self, clause: GroupHavingClause) -> Result<Self, QueryError> {
349        if self.group.is_none() {
350            return Err(QueryError::Intent(IntentError::HavingRequiresGroupBy));
351        }
352
353        let having = self.having.get_or_insert(GroupHavingSpec {
354            clauses: Vec::new(),
355        });
356        having.clauses.push(clause);
357
358        Ok(self)
359    }
360
361    // Append one grouped HAVING clause that references one grouped key field.
362    fn push_having_group_clause(
363        self,
364        field: &str,
365        op: CompareOp,
366        value: Value,
367    ) -> Result<Self, QueryError> {
368        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
369
370        self.push_having_clause(GroupHavingClause {
371            symbol: GroupHavingSymbol::GroupField(field_slot),
372            op,
373            value,
374        })
375    }
376
377    // Append one grouped HAVING clause that references one grouped aggregate output.
378    fn push_having_aggregate_clause(
379        self,
380        aggregate_index: usize,
381        op: CompareOp,
382        value: Value,
383    ) -> Result<Self, QueryError> {
384        self.push_having_clause(GroupHavingClause {
385            symbol: GroupHavingSymbol::AggregateIndex(aggregate_index),
386            op,
387            value,
388        })
389    }
390
391    /// Track key-only access paths and detect conflicting key intents.
392    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
393        if let Some(existing) = &self.key_access
394            && existing.kind != kind
395        {
396            self.key_access_conflict = true;
397        }
398
399        self.key_access = Some(KeyAccessState { kind, access });
400
401        self
402    }
403
404    /// Set the access path to a single primary key lookup.
405    pub(crate) fn by_id(self, id: K) -> Self {
406        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
407    }
408
409    /// Set the access path to a primary key batch lookup.
410    pub(crate) fn by_ids<I>(self, ids: I) -> Self
411    where
412        I: IntoIterator<Item = K>,
413    {
414        self.set_key_access(
415            KeyAccessKind::Many,
416            KeyAccess::Many(ids.into_iter().collect()),
417        )
418    }
419
420    /// Set the access path to the singleton primary key.
421    pub(crate) fn only(self, id: K) -> Self {
422        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
423    }
424
425    /// Mark this intent as a delete query.
426    #[must_use]
427    pub(crate) const fn delete(mut self) -> Self {
428        if self.mode.is_load() {
429            self.mode = QueryMode::Delete(DeleteSpec::new());
430        }
431        self
432    }
433
434    /// Apply a limit to the current mode.
435    ///
436    /// Load limits bound result size; delete limits bound mutation size.
437    #[must_use]
438    pub(crate) const fn limit(mut self, limit: u32) -> Self {
439        match self.mode {
440            QueryMode::Load(mut spec) => {
441                spec.limit = Some(limit);
442                self.mode = QueryMode::Load(spec);
443            }
444            QueryMode::Delete(mut spec) => {
445                spec.limit = Some(limit);
446                self.mode = QueryMode::Delete(spec);
447            }
448        }
449        self
450    }
451
452    /// Apply an offset to a load intent.
453    #[must_use]
454    pub(crate) const fn offset(mut self, offset: u32) -> Self {
455        if let QueryMode::Load(mut spec) = self.mode {
456            spec.offset = offset;
457            self.mode = QueryMode::Load(spec);
458        }
459        self
460    }
461
462    /// Build a model-level logical plan using Value-based access keys.
463    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
464        // Phase 1: schema surface and intent validation.
465        let schema_info = SchemaInfo::from_entity_model(self.model)?;
466        self.validate_intent()?;
467
468        // Phase 2: predicate normalization and access planning.
469        let normalized_predicate = self
470            .predicate
471            .as_ref()
472            .map(|predicate| {
473                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
474                let predicate = normalize_enum_literals(&schema_info, predicate)?;
475                Ok::<Predicate, ValidateError>(normalize(&predicate))
476            })
477            .transpose()?;
478        let access_plan_value = match &self.key_access {
479            Some(state) => access_plan_from_keys_value(&state.access),
480            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
481        };
482
483        // Phase 3: assemble the executor-ready plan.
484        let scalar = ScalarPlan {
485            mode: self.mode,
486            predicate: normalized_predicate,
487            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
488            // This ensures explain/fingerprint/execution share one deterministic order shape.
489            order: canonicalize_order_spec(self.model, self.order.clone()),
490            distinct: self.distinct,
491            delete_limit: match self.mode {
492                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
493                QueryMode::Load(_) => None,
494            },
495            page: match self.mode {
496                QueryMode::Load(spec) => {
497                    if spec.limit.is_some() || spec.offset > 0 {
498                        Some(PageSpec {
499                            limit: spec.limit,
500                            offset: spec.offset,
501                        })
502                    } else {
503                        None
504                    }
505                }
506                QueryMode::Delete(_) => None,
507            },
508            consistency: self.consistency,
509        };
510        let mut plan =
511            AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
512        if let Some(group) = self.group.clone() {
513            plan = match self.having.clone() {
514                Some(having) => plan.into_grouped_with_having(group, Some(having)),
515                None => plan.into_grouped(group),
516            };
517        }
518
519        if plan.grouped_plan().is_some() {
520            validate_group_query_semantics(&schema_info, self.model, &plan)?;
521        } else {
522            validate_query_semantics(&schema_info, self.model, &plan)?;
523        }
524
525        Ok(plan)
526    }
527
528    // Validate pre-plan policy invariants and key-access rules before planning.
529    fn validate_intent(&self) -> Result<(), IntentError> {
530        validate_intent_plan_shape(self.mode, self.order.as_ref()).map_err(IntentError::from)?;
531
532        let key_access_kind = self.key_access.as_ref().map(|state| match state.kind {
533            KeyAccessKind::Single => IntentValidationKeyAccessKind::Single,
534            KeyAccessKind::Many => IntentValidationKeyAccessKind::Many,
535            KeyAccessKind::Only => IntentValidationKeyAccessKind::Only,
536        });
537        validate_intent_key_access_policy(
538            self.key_access_conflict,
539            key_access_kind,
540            self.predicate.is_some(),
541        )
542        .map_err(IntentError::from)?;
543        if self.having.is_some() && self.group.is_none() {
544            return Err(IntentError::HavingRequiresGroupBy);
545        }
546
547        Ok(())
548    }
549}
550
551///
552/// Query
553///
554/// Typed, declarative query intent for a specific entity type.
555///
556/// This intent is:
557/// - schema-agnostic at construction
558/// - normalized and validated only during planning
559/// - free of access-path decisions
560///
561
562///
563/// PlannedQuery
564///
565/// Neutral query-owned planned contract produced by query planning.
566/// Stores logical + access shape without executor compilation state.
567///
568#[derive(Debug)]
569pub struct PlannedQuery<E: EntityKind> {
570    plan: AccessPlannedQuery<E::Key>,
571    _marker: PhantomData<E>,
572}
573
574impl<E: EntityKind> PlannedQuery<E> {
575    #[must_use]
576    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
577        Self {
578            plan,
579            _marker: PhantomData,
580        }
581    }
582
583    #[must_use]
584    pub fn explain(&self) -> ExplainPlan {
585        self.plan.explain_with_model(E::MODEL)
586    }
587}
588
589///
590/// CompiledQuery
591///
592/// Query-owned compiled handoff produced by `Query::plan()`.
593/// This type intentionally carries only logical/access query semantics.
594/// Executor runtime shape is derived explicitly at the executor boundary.
595///
596#[derive(Clone, Debug)]
597pub struct CompiledQuery<E: EntityKind> {
598    plan: AccessPlannedQuery<E::Key>,
599    _marker: PhantomData<E>,
600}
601
602impl<E: EntityKind> CompiledQuery<E> {
603    #[must_use]
604    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
605        Self {
606            plan,
607            _marker: PhantomData,
608        }
609    }
610
611    #[must_use]
612    pub fn explain(&self) -> ExplainPlan {
613        self.plan.explain_with_model(E::MODEL)
614    }
615
616    #[must_use]
617    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
618        self.plan
619    }
620}
621
622#[derive(Debug)]
623pub struct Query<E: EntityKind> {
624    intent: QueryModel<'static, E::Key>,
625    _marker: PhantomData<E>,
626}
627
628impl<E: EntityKind> Query<E> {
629    /// Create a new intent with an explicit missing-row policy.
630    /// Ignore favors idempotency and may mask index/data divergence on deletes.
631    /// Use Error to surface missing rows during scan/delete execution.
632    #[must_use]
633    pub const fn new(consistency: MissingRowPolicy) -> Self {
634        Self {
635            intent: QueryModel::new(E::MODEL, consistency),
636            _marker: PhantomData,
637        }
638    }
639
640    /// Return the intent mode (load vs delete).
641    #[must_use]
642    pub const fn mode(&self) -> QueryMode {
643        self.intent.mode()
644    }
645
646    #[must_use]
647    pub(crate) fn has_explicit_order(&self) -> bool {
648        self.intent.has_explicit_order()
649    }
650
651    #[must_use]
652    pub(crate) const fn has_grouping(&self) -> bool {
653        self.intent.has_grouping()
654    }
655
656    #[must_use]
657    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
658        self.intent.load_spec()
659    }
660
661    /// Add a predicate, implicitly AND-ing with any existing predicate.
662    #[must_use]
663    pub fn filter(mut self, predicate: Predicate) -> Self {
664        self.intent = self.intent.filter(predicate);
665        self
666    }
667
668    /// Apply a dynamic filter expression.
669    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
670        let Self { intent, _marker } = self;
671        let intent = intent.filter_expr(expr)?;
672
673        Ok(Self { intent, _marker })
674    }
675
676    /// Apply a dynamic sort expression.
677    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
678        let Self { intent, _marker } = self;
679        let intent = intent.sort_expr(expr)?;
680
681        Ok(Self { intent, _marker })
682    }
683
684    /// Append an ascending sort key.
685    #[must_use]
686    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
687        self.intent = self.intent.order_by(field);
688        self
689    }
690
691    /// Append a descending sort key.
692    #[must_use]
693    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
694        self.intent = self.intent.order_by_desc(field);
695        self
696    }
697
698    /// Enable DISTINCT semantics for this query.
699    #[must_use]
700    pub fn distinct(mut self) -> Self {
701        self.intent = self.intent.distinct();
702        self
703    }
704
705    /// Add one GROUP BY field.
706    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
707        let Self { intent, _marker } = self;
708        let intent = intent.push_group_field(field.as_ref())?;
709
710        Ok(Self { intent, _marker })
711    }
712
713    /// Add one aggregate terminal via composable aggregate expression.
714    #[must_use]
715    pub fn aggregate(mut self, aggregate: AggregateExpr) -> Self {
716        self.intent = self.intent.push_group_aggregate(aggregate);
717        self
718    }
719
720    /// Override grouped hard limits for grouped execution budget enforcement.
721    #[must_use]
722    pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
723        self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
724        self
725    }
726
727    /// Add one grouped HAVING compare clause over one grouped key field.
728    pub fn having_group(
729        self,
730        field: impl AsRef<str>,
731        op: CompareOp,
732        value: Value,
733    ) -> Result<Self, QueryError> {
734        let field = field.as_ref().to_owned();
735        let Self { intent, _marker } = self;
736        let intent = intent.push_having_group_clause(&field, op, value)?;
737
738        Ok(Self { intent, _marker })
739    }
740
741    /// Add one grouped HAVING compare clause over one grouped aggregate output.
742    pub fn having_aggregate(
743        self,
744        aggregate_index: usize,
745        op: CompareOp,
746        value: Value,
747    ) -> Result<Self, QueryError> {
748        let Self { intent, _marker } = self;
749        let intent = intent.push_having_aggregate_clause(aggregate_index, op, value)?;
750
751        Ok(Self { intent, _marker })
752    }
753
754    /// Set the access path to a single primary key lookup.
755    pub(crate) fn by_id(self, id: E::Key) -> Self {
756        let Self { intent, _marker } = self;
757        Self {
758            intent: intent.by_id(id),
759            _marker,
760        }
761    }
762
763    /// Set the access path to a primary key batch lookup.
764    pub(crate) fn by_ids<I>(self, ids: I) -> Self
765    where
766        I: IntoIterator<Item = E::Key>,
767    {
768        let Self { intent, _marker } = self;
769        Self {
770            intent: intent.by_ids(ids),
771            _marker,
772        }
773    }
774
775    /// Mark this intent as a delete query.
776    #[must_use]
777    pub fn delete(mut self) -> Self {
778        self.intent = self.intent.delete();
779        self
780    }
781
782    /// Apply a limit to the current mode.
783    ///
784    /// Load limits bound result size; delete limits bound mutation size.
785    /// For scalar load queries, any use of `limit` or `offset` requires an
786    /// explicit `order_by(...)` so pagination is deterministic.
787    /// GROUP BY queries use canonical grouped-key order by default.
788    #[must_use]
789    pub fn limit(mut self, limit: u32) -> Self {
790        self.intent = self.intent.limit(limit);
791        self
792    }
793
794    /// Apply an offset to a load intent.
795    ///
796    /// Scalar pagination requires an explicit `order_by(...)`.
797    /// GROUP BY queries use canonical grouped-key order by default.
798    #[must_use]
799    pub fn offset(mut self, offset: u32) -> Self {
800        self.intent = self.intent.offset(offset);
801        self
802    }
803
804    /// Explain this intent without executing it.
805    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
806        let plan = self.planned()?;
807
808        Ok(plan.explain())
809    }
810
811    /// Plan this intent into a neutral planned query contract.
812    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
813        let plan = self.build_plan()?;
814
815        Ok(PlannedQuery::new(plan))
816    }
817
818    /// Compile this intent into query-owned handoff state.
819    ///
820    /// This boundary intentionally does not expose executor runtime shape.
821    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
822        let plan = self.build_plan()?;
823
824        Ok(CompiledQuery::new(plan))
825    }
826
827    // Build a logical plan for the current intent.
828    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
829        let plan_value = self.intent.build_plan_model()?;
830        let (logical, access) = plan_value.into_parts();
831        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
832        let plan = AccessPlannedQuery::from_parts(logical, access);
833
834        Ok(plan)
835    }
836}
837
838impl<E> Query<E>
839where
840    E: EntityKind + SingletonEntity,
841    E::Key: Default,
842{
843    /// Set the access path to the singleton primary key.
844    pub(crate) fn only(self) -> Self {
845        let Self { intent, _marker } = self;
846
847        Self {
848            intent: intent.only(E::Key::default()),
849            _marker,
850        }
851    }
852}
853
854///
855/// QueryError
856///
857
858#[derive(Debug, ThisError)]
859pub enum QueryError {
860    #[error("{0}")]
861    Validate(#[from] ValidateError),
862
863    #[error("{0}")]
864    Plan(Box<PlanError>),
865
866    #[error("{0}")]
867    Intent(#[from] IntentError),
868
869    #[error("{0}")]
870    Response(#[from] ResponseError),
871
872    #[error("{0}")]
873    Execute(#[from] QueryExecuteError),
874}
875
876impl QueryError {
877    /// Construct an execution-domain query error from one classified runtime error.
878    pub(crate) fn execute(err: InternalError) -> Self {
879        Self::Execute(QueryExecuteError::from(err))
880    }
881}
882
883///
884/// QueryExecuteError
885///
886
887#[derive(Debug, ThisError)]
888pub enum QueryExecuteError {
889    #[error("{0}")]
890    Corruption(InternalError),
891
892    #[error("{0}")]
893    InvariantViolation(InternalError),
894
895    #[error("{0}")]
896    Conflict(InternalError),
897
898    #[error("{0}")]
899    NotFound(InternalError),
900
901    #[error("{0}")]
902    Unsupported(InternalError),
903
904    #[error("{0}")]
905    Internal(InternalError),
906}
907
908impl QueryExecuteError {
909    #[must_use]
910    /// Borrow the wrapped classified runtime error.
911    pub const fn as_internal(&self) -> &InternalError {
912        match self {
913            Self::Corruption(err)
914            | Self::InvariantViolation(err)
915            | Self::Conflict(err)
916            | Self::NotFound(err)
917            | Self::Unsupported(err)
918            | Self::Internal(err) => err,
919        }
920    }
921}
922
923impl From<InternalError> for QueryExecuteError {
924    fn from(err: InternalError) -> Self {
925        match err.class {
926            ErrorClass::Corruption => Self::Corruption(err),
927            ErrorClass::InvariantViolation => Self::InvariantViolation(err),
928            ErrorClass::Conflict => Self::Conflict(err),
929            ErrorClass::NotFound => Self::NotFound(err),
930            ErrorClass::Unsupported => Self::Unsupported(err),
931            ErrorClass::Internal => Self::Internal(err),
932        }
933    }
934}
935
936impl From<PlannerError> for QueryError {
937    fn from(err: PlannerError) -> Self {
938        match err {
939            PlannerError::Plan(err) => Self::from(*err),
940            PlannerError::Internal(err) => Self::execute(*err),
941        }
942    }
943}
944
945impl From<PlanError> for QueryError {
946    fn from(err: PlanError) -> Self {
947        Self::Plan(Box::new(err))
948    }
949}
950
951///
952/// IntentError
953///
954
955#[derive(Clone, Copy, Debug, ThisError)]
956pub enum IntentError {
957    #[error("{0}")]
958    PlanShape(#[from] PolicyPlanError),
959
960    #[error("by_ids() cannot be combined with predicates")]
961    ByIdsWithPredicate,
962
963    #[error("only() cannot be combined with predicates")]
964    OnlyWithPredicate,
965
966    #[error("multiple key access methods were used on the same query")]
967    KeyAccessConflict,
968
969    #[error(
970        "{message}",
971        message = CursorPlanError::cursor_requires_order_message()
972    )]
973    CursorRequiresOrder,
974
975    #[error(
976        "{message}",
977        message = CursorPlanError::cursor_requires_limit_message()
978    )]
979    CursorRequiresLimit,
980
981    #[error("cursor tokens can only be used with .page().execute()")]
982    CursorRequiresPagedExecution,
983
984    #[error("grouped queries require execute_grouped(...)")]
985    GroupedRequiresExecuteGrouped,
986
987    #[error("HAVING requires GROUP BY")]
988    HavingRequiresGroupBy,
989}
990
991impl From<CursorPagingPolicyError> for IntentError {
992    fn from(err: CursorPagingPolicyError) -> Self {
993        match err {
994            CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
995            CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
996        }
997    }
998}
999
1000impl From<IntentKeyAccessPolicyViolation> for IntentError {
1001    fn from(err: IntentKeyAccessPolicyViolation) -> Self {
1002        match err {
1003            IntentKeyAccessPolicyViolation::KeyAccessConflict => Self::KeyAccessConflict,
1004            IntentKeyAccessPolicyViolation::ByIdsWithPredicate => Self::ByIdsWithPredicate,
1005            IntentKeyAccessPolicyViolation::OnlyWithPredicate => Self::OnlyWithPredicate,
1006        }
1007    }
1008}
1009
1010impl From<FluentLoadPolicyViolation> for IntentError {
1011    fn from(err: FluentLoadPolicyViolation) -> Self {
1012        match err {
1013            FluentLoadPolicyViolation::CursorRequiresPagedExecution => {
1014                Self::CursorRequiresPagedExecution
1015            }
1016            FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped => {
1017                Self::GroupedRequiresExecuteGrouped
1018            }
1019            FluentLoadPolicyViolation::CursorRequiresOrder => Self::CursorRequiresOrder,
1020            FluentLoadPolicyViolation::CursorRequiresLimit => Self::CursorRequiresLimit,
1021        }
1022    }
1023}
1024
1025/// Helper to append an ordering field while preserving existing order spec.
1026fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
1027    match order {
1028        Some(mut spec) => {
1029            spec.fields.push((field.to_string(), direction));
1030            spec
1031        }
1032        None => OrderSpec {
1033            fields: vec![(field.to_string(), direction)],
1034        },
1035    }
1036}
1037
1038// Normalize ORDER BY into a canonical, deterministic shape:
1039// - preserve user field order
1040// - remove explicit primary-key references from the user segment
1041// - append exactly one primary-key field as the terminal tie-break
1042fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
1043    let mut order = order?;
1044    if order.fields.is_empty() {
1045        return Some(order);
1046    }
1047
1048    let pk_field = model.primary_key.name;
1049    let mut pk_direction = None;
1050    order.fields.retain(|(field, direction)| {
1051        if field == pk_field {
1052            if pk_direction.is_none() {
1053                pk_direction = Some(*direction);
1054            }
1055            false
1056        } else {
1057            true
1058        }
1059    });
1060
1061    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
1062    order.fields.push((pk_field.to_string(), pk_direction));
1063
1064    Some(order)
1065}