Skip to main content

icydb_core/db/query/intent/
mod.rs

1#![expect(clippy::used_underscore_binding)]
2
3//! Module: query::intent
4//! Responsibility: query intent construction, coercion, and semantic-plan compilation.
5//! Does not own: executor runtime behavior or index storage details.
6//! Boundary: typed/fluent query inputs lowered into validated logical plans.
7
8#[cfg(test)]
9mod tests;
10
11// Key-only access intent and helpers.
12pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
13pub type LoadSpec = crate::db::query::plan::LoadSpec;
14pub type QueryMode = crate::db::query::plan::QueryMode;
15
16use crate::{
17    db::{
18        access::{AccessPath, AccessPlan, AccessPlanError, canonical_by_keys_path},
19        predicate::{
20            MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
21            normalize_enum_literals, reject_unsupported_query_features,
22        },
23        query::{
24            explain::ExplainPlan,
25            expr::{FilterExpr, SortExpr, SortLowerError},
26            plan::{
27                AccessPlannedQuery, CursorPagingPolicyError, DeleteLimitSpec,
28                FluentLoadPolicyViolation, GroupAggregateKind, GroupAggregateSpec, GroupSpec,
29                GroupedExecutionConfig, IntentKeyAccessKind as IntentValidationKeyAccessKind,
30                IntentKeyAccessPolicyViolation, IntentTerminalPolicyViolation, LogicalPlan,
31                OrderDirection, OrderSpec, PageSpec, PlanError, PlannerError, PolicyPlanError,
32                ScalarPlan, has_explicit_order, plan_access, resolve_group_field_slot,
33                validate_group_query_semantics, validate_grouped_field_target_extrema_policy,
34                validate_intent_key_access_policy, validate_intent_plan_shape,
35                validate_order_shape, validate_query_semantics,
36            },
37        },
38        response::ResponseError,
39    },
40    error::InternalError,
41    model::entity::EntityModel,
42    traits::{EntityKind, FieldValue, SingletonEntity},
43    value::Value,
44};
45use std::marker::PhantomData;
46use thiserror::Error as ThisError;
47
48///
49/// KeyAccess
50/// Primary-key-only access hints for query planning.
51///
52
53#[derive(Clone, Debug, Eq, PartialEq)]
54pub(crate) enum KeyAccess<K> {
55    Single(K),
56    Many(Vec<K>),
57}
58
59///
60/// KeyAccessKind
61/// Identifies which key-only builder set the access path.
62///
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
65pub(crate) enum KeyAccessKind {
66    Single,
67    Many,
68    Only,
69}
70
71///
72/// KeyAccessState
73/// Tracks key-only access plus its origin for intent validation.
74///
75
76#[derive(Clone, Debug, Eq, PartialEq)]
77pub(crate) struct KeyAccessState<K> {
78    pub kind: KeyAccessKind,
79    pub access: KeyAccess<K>,
80}
81
82// Build a model-level access plan for key-only intents.
83pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
84where
85    K: FieldValue,
86{
87    // Phase 1: map typed keys into model-level Value access paths.
88    match access {
89        KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
90        KeyAccess::Many(keys) => {
91            let values = keys.iter().map(FieldValue::to_value).collect();
92            AccessPlan::path(canonical_by_keys_path(values))
93        }
94    }
95}
96
97// Convert model-level access plans into entity-keyed access plans.
98pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
99    model: &EntityModel,
100    access: AccessPlan<Value>,
101) -> Result<AccessPlan<E::Key>, PlanError> {
102    access.into_executable::<E>(model)
103}
104
105// Convert model-level key values into typed entity keys.
106pub(crate) fn coerce_entity_key<E: EntityKind>(
107    model: &EntityModel,
108    key: &Value,
109) -> Result<E::Key, PlanError> {
110    E::Key::from_value(key).ok_or_else(|| {
111        PlanError::from(AccessPlanError::PrimaryKeyMismatch {
112            field: model.primary_key.name.to_string(),
113            key: key.clone(),
114        })
115    })
116}
117
118impl AccessPlan<Value> {
119    /// Convert model-level access plans into typed executable access plans.
120    pub(crate) fn into_executable<E: EntityKind>(
121        self,
122        model: &EntityModel,
123    ) -> Result<AccessPlan<E::Key>, PlanError> {
124        match self {
125            Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
126            Self::Union(children) => {
127                let mut out = Vec::with_capacity(children.len());
128                for child in children {
129                    out.push(child.into_executable::<E>(model)?);
130                }
131
132                Ok(AccessPlan::union(out))
133            }
134            Self::Intersection(children) => {
135                let mut out = Vec::with_capacity(children.len());
136                for child in children {
137                    out.push(child.into_executable::<E>(model)?);
138                }
139
140                Ok(AccessPlan::intersection(out))
141            }
142        }
143    }
144}
145
146impl AccessPath<Value> {
147    /// Convert one model-level access path into a typed executable access path.
148    pub(crate) fn into_executable<E: EntityKind>(
149        self,
150        model: &EntityModel,
151    ) -> Result<AccessPath<E::Key>, PlanError> {
152        match self {
153            Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
154            Self::ByKeys(keys) => {
155                let mut out = Vec::with_capacity(keys.len());
156                for key in keys {
157                    out.push(coerce_entity_key::<E>(model, &key)?);
158                }
159
160                Ok(AccessPath::ByKeys(out))
161            }
162            Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
163                start: coerce_entity_key::<E>(model, &start)?,
164                end: coerce_entity_key::<E>(model, &end)?,
165            }),
166            Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
167            Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
168            Self::FullScan => Ok(AccessPath::FullScan),
169        }
170    }
171}
172
173///
174/// QueryModel
175///
176/// Model-level query intent and planning context.
177/// Consumes an `EntityModel` derived from typed entity definitions.
178///
179
180#[derive(Debug)]
181pub(crate) struct QueryModel<'m, K> {
182    model: &'m EntityModel,
183    mode: QueryMode,
184    predicate: Option<Predicate>,
185    key_access: Option<KeyAccessState<K>>,
186    key_access_conflict: bool,
187    group: Option<crate::db::query::plan::GroupSpec>,
188    order: Option<OrderSpec>,
189    distinct: bool,
190    consistency: MissingRowPolicy,
191}
192
193impl<'m, K: FieldValue> QueryModel<'m, K> {
194    #[must_use]
195    pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
196        Self {
197            model,
198            mode: QueryMode::Load(LoadSpec::new()),
199            predicate: None,
200            key_access: None,
201            key_access_conflict: false,
202            group: None,
203            order: None,
204            distinct: false,
205            consistency,
206        }
207    }
208
209    /// Return the intent mode (load vs delete).
210    #[must_use]
211    pub(crate) const fn mode(&self) -> QueryMode {
212        self.mode
213    }
214
215    #[must_use]
216    fn has_explicit_order(&self) -> bool {
217        has_explicit_order(self.order.as_ref())
218    }
219
220    #[must_use]
221    const fn has_grouping(&self) -> bool {
222        self.group.is_some()
223    }
224
225    #[must_use]
226    const fn load_spec(&self) -> Option<LoadSpec> {
227        match self.mode {
228            QueryMode::Load(spec) => Some(spec),
229            QueryMode::Delete(_) => None,
230        }
231    }
232
233    /// Add a predicate, implicitly AND-ing with any existing predicate.
234    #[must_use]
235    pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
236        self.predicate = match self.predicate.take() {
237            Some(existing) => Some(Predicate::And(vec![existing, predicate])),
238            None => Some(predicate),
239        };
240        self
241    }
242
243    /// Apply a dynamic filter expression using the model schema.
244    pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
245        let schema = SchemaInfo::from_entity_model(self.model)?;
246        let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
247
248        Ok(self.filter(predicate))
249    }
250
251    /// Apply a dynamic sort expression using the model schema.
252    pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
253        let schema = SchemaInfo::from_entity_model(self.model)?;
254        let order = match expr.lower_with(&schema) {
255            Ok(order) => order,
256            Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
257            Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
258        };
259
260        validate_order_shape(Some(&order))
261            .map_err(IntentError::from)
262            .map_err(QueryError::from)?;
263
264        Ok(self.order_spec(order))
265    }
266
267    /// Append an ascending sort key.
268    #[must_use]
269    pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
270        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
271        self
272    }
273
274    /// Append a descending sort key.
275    #[must_use]
276    pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
277        self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
278        self
279    }
280
281    /// Set a fully-specified order spec (validated before reaching this boundary).
282    pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
283        self.order = Some(order);
284        self
285    }
286
287    /// Enable DISTINCT semantics for this query intent.
288    #[must_use]
289    pub(crate) const fn distinct(mut self) -> Self {
290        self.distinct = true;
291        self
292    }
293
294    // Resolve one grouped field into one stable field slot and append it to the
295    // grouped spec in declaration order.
296    fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
297        let field_slot = resolve_group_field_slot(self.model, field).map_err(QueryError::from)?;
298        let group = self.group.get_or_insert(GroupSpec {
299            group_fields: Vec::new(),
300            aggregates: Vec::new(),
301            execution: GroupedExecutionConfig::unbounded(),
302        });
303        if !group
304            .group_fields
305            .iter()
306            .any(|existing| existing.index() == field_slot.index())
307        {
308            group.group_fields.push(field_slot);
309        }
310
311        Ok(self)
312    }
313
314    // Append one grouped aggregate terminal to the grouped declarative spec.
315    fn push_group_aggregate(
316        mut self,
317        kind: GroupAggregateKind,
318        target_field: Option<String>,
319    ) -> Self {
320        let group = self.group.get_or_insert(GroupSpec {
321            group_fields: Vec::new(),
322            aggregates: Vec::new(),
323            execution: GroupedExecutionConfig::unbounded(),
324        });
325        group
326            .aggregates
327            .push(GroupAggregateSpec { kind, target_field });
328
329        self
330    }
331
332    // Override grouped hard limits for this grouped query.
333    fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
334        let group = self.group.get_or_insert(GroupSpec {
335            group_fields: Vec::new(),
336            aggregates: Vec::new(),
337            execution: GroupedExecutionConfig::unbounded(),
338        });
339        group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
340
341        self
342    }
343
344    /// Track key-only access paths and detect conflicting key intents.
345    fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
346        if let Some(existing) = &self.key_access
347            && existing.kind != kind
348        {
349            self.key_access_conflict = true;
350        }
351
352        self.key_access = Some(KeyAccessState { kind, access });
353
354        self
355    }
356
357    /// Set the access path to a single primary key lookup.
358    pub(crate) fn by_id(self, id: K) -> Self {
359        self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
360    }
361
362    /// Set the access path to a primary key batch lookup.
363    pub(crate) fn by_ids<I>(self, ids: I) -> Self
364    where
365        I: IntoIterator<Item = K>,
366    {
367        self.set_key_access(
368            KeyAccessKind::Many,
369            KeyAccess::Many(ids.into_iter().collect()),
370        )
371    }
372
373    /// Set the access path to the singleton primary key.
374    pub(crate) fn only(self, id: K) -> Self {
375        self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
376    }
377
378    /// Mark this intent as a delete query.
379    #[must_use]
380    pub(crate) const fn delete(mut self) -> Self {
381        if self.mode.is_load() {
382            self.mode = QueryMode::Delete(DeleteSpec::new());
383        }
384        self
385    }
386
387    /// Apply a limit to the current mode.
388    ///
389    /// Load limits bound result size; delete limits bound mutation size.
390    #[must_use]
391    pub(crate) const fn limit(mut self, limit: u32) -> Self {
392        match self.mode {
393            QueryMode::Load(mut spec) => {
394                spec.limit = Some(limit);
395                self.mode = QueryMode::Load(spec);
396            }
397            QueryMode::Delete(mut spec) => {
398                spec.limit = Some(limit);
399                self.mode = QueryMode::Delete(spec);
400            }
401        }
402        self
403    }
404
405    /// Apply an offset to a load intent.
406    #[must_use]
407    pub(crate) const fn offset(mut self, offset: u32) -> Self {
408        if let QueryMode::Load(mut spec) = self.mode {
409            spec.offset = offset;
410            self.mode = QueryMode::Load(spec);
411        }
412        self
413    }
414
415    /// Build a model-level logical plan using Value-based access keys.
416    fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
417        // Phase 1: schema surface and intent validation.
418        let schema_info = SchemaInfo::from_entity_model(self.model)?;
419        self.validate_intent()?;
420
421        // Phase 2: predicate normalization and access planning.
422        let normalized_predicate = self
423            .predicate
424            .as_ref()
425            .map(|predicate| {
426                reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
427                let predicate = normalize_enum_literals(&schema_info, predicate)?;
428                Ok::<Predicate, ValidateError>(normalize(&predicate))
429            })
430            .transpose()?;
431        let access_plan_value = match &self.key_access {
432            Some(state) => access_plan_from_keys_value(&state.access),
433            None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
434        };
435
436        // Phase 3: assemble the executor-ready plan.
437        let scalar = ScalarPlan {
438            mode: self.mode,
439            predicate: normalized_predicate,
440            // Canonicalize ORDER BY to include an explicit primary-key tie-break.
441            // This ensures explain/fingerprint/execution share one deterministic order shape.
442            order: canonicalize_order_spec(self.model, self.order.clone()),
443            distinct: self.distinct,
444            delete_limit: match self.mode {
445                QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
446                QueryMode::Load(_) => None,
447            },
448            page: match self.mode {
449                QueryMode::Load(spec) => {
450                    if spec.limit.is_some() || spec.offset > 0 {
451                        Some(PageSpec {
452                            limit: spec.limit,
453                            offset: spec.offset,
454                        })
455                    } else {
456                        None
457                    }
458                }
459                QueryMode::Delete(_) => None,
460            },
461            consistency: self.consistency,
462        };
463        let mut plan =
464            AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
465        if let Some(group) = self.group.clone() {
466            plan = plan.into_grouped(group);
467        }
468
469        if plan.grouped_plan().is_some() {
470            validate_group_query_semantics(&schema_info, self.model, &plan)?;
471        } else {
472            validate_query_semantics(&schema_info, self.model, &plan)?;
473        }
474
475        Ok(plan)
476    }
477
478    // Validate pre-plan policy invariants and key-access rules before planning.
479    fn validate_intent(&self) -> Result<(), IntentError> {
480        validate_intent_plan_shape(self.mode, self.order.as_ref()).map_err(IntentError::from)?;
481
482        let key_access_kind = self.key_access.as_ref().map(|state| match state.kind {
483            KeyAccessKind::Single => IntentValidationKeyAccessKind::Single,
484            KeyAccessKind::Many => IntentValidationKeyAccessKind::Many,
485            KeyAccessKind::Only => IntentValidationKeyAccessKind::Only,
486        });
487        validate_intent_key_access_policy(
488            self.key_access_conflict,
489            key_access_kind,
490            self.predicate.is_some(),
491        )
492        .map_err(IntentError::from)?;
493
494        Ok(())
495    }
496}
497
498///
499/// Query
500///
501/// Typed, declarative query intent for a specific entity type.
502///
503/// This intent is:
504/// - schema-agnostic at construction
505/// - normalized and validated only during planning
506/// - free of access-path decisions
507///
508
509///
510/// PlannedQuery
511///
512/// Neutral query-owned planned contract produced by query planning.
513/// Stores logical + access shape without executor compilation state.
514///
515#[derive(Debug)]
516pub struct PlannedQuery<E: EntityKind> {
517    plan: AccessPlannedQuery<E::Key>,
518    _marker: PhantomData<E>,
519}
520
521impl<E: EntityKind> PlannedQuery<E> {
522    #[must_use]
523    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
524        Self {
525            plan,
526            _marker: PhantomData,
527        }
528    }
529
530    #[must_use]
531    pub fn explain(&self) -> ExplainPlan {
532        self.plan.explain_with_model(E::MODEL)
533    }
534}
535
536///
537/// CompiledQuery
538///
539/// Query-owned compiled handoff produced by `Query::plan()`.
540/// This type intentionally carries only logical/access query semantics.
541/// Executor runtime shape is derived explicitly at the executor boundary.
542///
543#[derive(Clone, Debug)]
544pub struct CompiledQuery<E: EntityKind> {
545    plan: AccessPlannedQuery<E::Key>,
546    _marker: PhantomData<E>,
547}
548
549impl<E: EntityKind> CompiledQuery<E> {
550    #[must_use]
551    pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
552        Self {
553            plan,
554            _marker: PhantomData,
555        }
556    }
557
558    #[must_use]
559    pub fn explain(&self) -> ExplainPlan {
560        self.plan.explain_with_model(E::MODEL)
561    }
562
563    #[must_use]
564    pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
565        self.plan
566    }
567}
568
569#[derive(Debug)]
570pub struct Query<E: EntityKind> {
571    intent: QueryModel<'static, E::Key>,
572    _marker: PhantomData<E>,
573}
574
575impl<E: EntityKind> Query<E> {
576    /// Create a new intent with an explicit missing-row policy.
577    /// Ignore favors idempotency and may mask index/data divergence on deletes.
578    /// Use Error to surface missing rows during scan/delete execution.
579    #[must_use]
580    pub const fn new(consistency: MissingRowPolicy) -> Self {
581        Self {
582            intent: QueryModel::new(E::MODEL, consistency),
583            _marker: PhantomData,
584        }
585    }
586
587    /// Return the intent mode (load vs delete).
588    #[must_use]
589    pub const fn mode(&self) -> QueryMode {
590        self.intent.mode()
591    }
592
593    #[must_use]
594    pub(crate) fn has_explicit_order(&self) -> bool {
595        self.intent.has_explicit_order()
596    }
597
598    #[must_use]
599    pub(crate) const fn has_grouping(&self) -> bool {
600        self.intent.has_grouping()
601    }
602
603    #[must_use]
604    pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
605        self.intent.load_spec()
606    }
607
608    /// Add a predicate, implicitly AND-ing with any existing predicate.
609    #[must_use]
610    pub fn filter(mut self, predicate: Predicate) -> Self {
611        self.intent = self.intent.filter(predicate);
612        self
613    }
614
615    /// Apply a dynamic filter expression.
616    pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
617        let Self { intent, _marker } = self;
618        let intent = intent.filter_expr(expr)?;
619
620        Ok(Self { intent, _marker })
621    }
622
623    /// Apply a dynamic sort expression.
624    pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
625        let Self { intent, _marker } = self;
626        let intent = intent.sort_expr(expr)?;
627
628        Ok(Self { intent, _marker })
629    }
630
631    /// Append an ascending sort key.
632    #[must_use]
633    pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
634        self.intent = self.intent.order_by(field);
635        self
636    }
637
638    /// Append a descending sort key.
639    #[must_use]
640    pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
641        self.intent = self.intent.order_by_desc(field);
642        self
643    }
644
645    /// Enable DISTINCT semantics for this query.
646    #[must_use]
647    pub fn distinct(mut self) -> Self {
648        self.intent = self.intent.distinct();
649        self
650    }
651
652    /// Add one GROUP BY field.
653    pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
654        let Self { intent, _marker } = self;
655        let intent = intent.push_group_field(field.as_ref())?;
656
657        Ok(Self { intent, _marker })
658    }
659
660    fn push_group_terminal(mut self, kind: GroupAggregateKind) -> Self {
661        self.intent = self.intent.push_group_aggregate(kind, None);
662        self
663    }
664
665    /// Add one grouped `count(*)` terminal.
666    #[must_use]
667    pub fn group_count(self) -> Self {
668        self.push_group_terminal(GroupAggregateKind::Count)
669    }
670
671    /// Add one grouped `exists` terminal.
672    #[must_use]
673    pub fn group_exists(self) -> Self {
674        self.push_group_terminal(GroupAggregateKind::Exists)
675    }
676
677    /// Add one grouped `first` terminal.
678    #[must_use]
679    pub fn group_first(self) -> Self {
680        self.push_group_terminal(GroupAggregateKind::First)
681    }
682
683    /// Add one grouped `last` terminal.
684    #[must_use]
685    pub fn group_last(self) -> Self {
686        self.push_group_terminal(GroupAggregateKind::Last)
687    }
688
689    /// Add one grouped `min` terminal (id extrema).
690    #[must_use]
691    pub fn group_min(self) -> Self {
692        self.push_group_terminal(GroupAggregateKind::Min)
693    }
694
695    /// Add one grouped `max` terminal (id extrema).
696    #[must_use]
697    pub fn group_max(self) -> Self {
698        self.push_group_terminal(GroupAggregateKind::Max)
699    }
700
701    /// Add one grouped `min(field)` terminal.
702    ///
703    /// Grouped field-target extrema are deferred in grouped v1.
704    pub fn group_min_by(self, _field: impl AsRef<str>) -> Result<Self, QueryError> {
705        validate_grouped_field_target_extrema_policy()
706            .map_err(IntentError::from)
707            .map_err(QueryError::Intent)?;
708
709        Ok(self)
710    }
711
712    /// Add one grouped `max(field)` terminal.
713    ///
714    /// Grouped field-target extrema are deferred in grouped v1.
715    pub fn group_max_by(self, _field: impl AsRef<str>) -> Result<Self, QueryError> {
716        validate_grouped_field_target_extrema_policy()
717            .map_err(IntentError::from)
718            .map_err(QueryError::Intent)?;
719
720        Ok(self)
721    }
722
723    /// Override grouped hard limits for grouped execution budget enforcement.
724    #[must_use]
725    pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
726        self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
727        self
728    }
729
730    /// Set the access path to a single primary key lookup.
731    pub(crate) fn by_id(self, id: E::Key) -> Self {
732        let Self { intent, _marker } = self;
733        Self {
734            intent: intent.by_id(id),
735            _marker,
736        }
737    }
738
739    /// Set the access path to a primary key batch lookup.
740    pub(crate) fn by_ids<I>(self, ids: I) -> Self
741    where
742        I: IntoIterator<Item = E::Key>,
743    {
744        let Self { intent, _marker } = self;
745        Self {
746            intent: intent.by_ids(ids),
747            _marker,
748        }
749    }
750
751    /// Mark this intent as a delete query.
752    #[must_use]
753    pub fn delete(mut self) -> Self {
754        self.intent = self.intent.delete();
755        self
756    }
757
758    /// Apply a limit to the current mode.
759    ///
760    /// Load limits bound result size; delete limits bound mutation size.
761    /// For scalar load queries, any use of `limit` or `offset` requires an
762    /// explicit `order_by(...)` so pagination is deterministic.
763    /// GROUP BY queries use canonical grouped-key order by default.
764    #[must_use]
765    pub fn limit(mut self, limit: u32) -> Self {
766        self.intent = self.intent.limit(limit);
767        self
768    }
769
770    /// Apply an offset to a load intent.
771    ///
772    /// Scalar pagination requires an explicit `order_by(...)`.
773    /// GROUP BY queries use canonical grouped-key order by default.
774    #[must_use]
775    pub fn offset(mut self, offset: u32) -> Self {
776        self.intent = self.intent.offset(offset);
777        self
778    }
779
780    /// Explain this intent without executing it.
781    pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
782        let plan = self.planned()?;
783
784        Ok(plan.explain())
785    }
786
787    /// Plan this intent into a neutral planned query contract.
788    pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
789        let plan = self.build_plan()?;
790
791        Ok(PlannedQuery::new(plan))
792    }
793
794    /// Compile this intent into query-owned handoff state.
795    ///
796    /// This boundary intentionally does not expose executor runtime shape.
797    pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
798        let plan = self.build_plan()?;
799
800        Ok(CompiledQuery::new(plan))
801    }
802
803    // Build a logical plan for the current intent.
804    fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
805        let plan_value = self.intent.build_plan_model()?;
806        let (logical, access) = plan_value.into_parts();
807        let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
808        let plan = AccessPlannedQuery::from_parts(logical, access);
809
810        Ok(plan)
811    }
812}
813
814impl<E> Query<E>
815where
816    E: EntityKind + SingletonEntity,
817    E::Key: Default,
818{
819    /// Set the access path to the singleton primary key.
820    pub(crate) fn only(self) -> Self {
821        let Self { intent, _marker } = self;
822
823        Self {
824            intent: intent.only(E::Key::default()),
825            _marker,
826        }
827    }
828}
829
830///
831/// QueryError
832///
833
834#[derive(Debug, ThisError)]
835pub enum QueryError {
836    #[error("{0}")]
837    Validate(#[from] ValidateError),
838
839    #[error("{0}")]
840    Plan(Box<PlanError>),
841
842    #[error("{0}")]
843    Intent(#[from] IntentError),
844
845    #[error("{0}")]
846    Response(#[from] ResponseError),
847
848    #[error("{0}")]
849    Execute(#[from] InternalError),
850}
851
852impl From<PlannerError> for QueryError {
853    fn from(err: PlannerError) -> Self {
854        match err {
855            PlannerError::Plan(err) => Self::from(*err),
856            PlannerError::Internal(err) => Self::Execute(*err),
857        }
858    }
859}
860
861impl From<PlanError> for QueryError {
862    fn from(err: PlanError) -> Self {
863        Self::Plan(Box::new(err))
864    }
865}
866
867///
868/// IntentError
869///
870
871#[derive(Clone, Copy, Debug, ThisError)]
872pub enum IntentError {
873    #[error("{0}")]
874    PlanShape(#[from] PolicyPlanError),
875
876    #[error("by_ids() cannot be combined with predicates")]
877    ByIdsWithPredicate,
878
879    #[error("only() cannot be combined with predicates")]
880    OnlyWithPredicate,
881
882    #[error("multiple key access methods were used on the same query")]
883    KeyAccessConflict,
884
885    #[error("cursor pagination requires an explicit ordering")]
886    CursorRequiresOrder,
887
888    #[error("cursor pagination requires an explicit limit")]
889    CursorRequiresLimit,
890
891    #[error("cursor tokens can only be used with .page().execute()")]
892    CursorRequiresPagedExecution,
893
894    #[error("grouped queries require execute_grouped(...)")]
895    GroupedRequiresExecuteGrouped,
896
897    #[error("grouped field-target extrema are not supported in grouped v1")]
898    GroupedFieldTargetExtremaUnsupported,
899}
900
901impl From<CursorPagingPolicyError> for IntentError {
902    fn from(err: CursorPagingPolicyError) -> Self {
903        match err {
904            CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
905            CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
906        }
907    }
908}
909
910impl From<IntentKeyAccessPolicyViolation> for IntentError {
911    fn from(err: IntentKeyAccessPolicyViolation) -> Self {
912        match err {
913            IntentKeyAccessPolicyViolation::KeyAccessConflict => Self::KeyAccessConflict,
914            IntentKeyAccessPolicyViolation::ByIdsWithPredicate => Self::ByIdsWithPredicate,
915            IntentKeyAccessPolicyViolation::OnlyWithPredicate => Self::OnlyWithPredicate,
916        }
917    }
918}
919
920impl From<IntentTerminalPolicyViolation> for IntentError {
921    fn from(err: IntentTerminalPolicyViolation) -> Self {
922        match err {
923            IntentTerminalPolicyViolation::GroupedFieldTargetExtremaUnsupported => {
924                Self::GroupedFieldTargetExtremaUnsupported
925            }
926        }
927    }
928}
929
930impl From<FluentLoadPolicyViolation> for IntentError {
931    fn from(err: FluentLoadPolicyViolation) -> Self {
932        match err {
933            FluentLoadPolicyViolation::CursorRequiresPagedExecution => {
934                Self::CursorRequiresPagedExecution
935            }
936            FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped => {
937                Self::GroupedRequiresExecuteGrouped
938            }
939            FluentLoadPolicyViolation::CursorRequiresOrder => Self::CursorRequiresOrder,
940            FluentLoadPolicyViolation::CursorRequiresLimit => Self::CursorRequiresLimit,
941        }
942    }
943}
944
945/// Helper to append an ordering field while preserving existing order spec.
946fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
947    match order {
948        Some(mut spec) => {
949            spec.fields.push((field.to_string(), direction));
950            spec
951        }
952        None => OrderSpec {
953            fields: vec![(field.to_string(), direction)],
954        },
955    }
956}
957
958// Normalize ORDER BY into a canonical, deterministic shape:
959// - preserve user field order
960// - remove explicit primary-key references from the user segment
961// - append exactly one primary-key field as the terminal tie-break
962fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
963    let mut order = order?;
964    if order.fields.is_empty() {
965        return Some(order);
966    }
967
968    let pk_field = model.primary_key.name;
969    let mut pk_direction = None;
970    order.fields.retain(|(field, direction)| {
971        if field == pk_field {
972            if pk_direction.is_none() {
973                pk_direction = Some(*direction);
974            }
975            false
976        } else {
977            true
978        }
979    });
980
981    let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
982    order.fields.push((pk_field.to_string(), pk_direction));
983
984    Some(order)
985}