Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Query-plan validation for planner-owned logical semantics.
2//!
3//! Validation ownership contract:
4//! - `validate_query_semantics` owns user-facing query semantics and emits `PlanError`.
5//! - executor-boundary defensive checks live in `db::executor::plan_validate`.
6//!
7//! Future rule changes must declare a semantic owner. Defensive re-check layers may mirror
8//! rules, but must not reinterpret semantics or error class intent.
9
10use crate::{
11    db::{
12        access::{
13            AccessPlanError,
14            validate_access_structure_model as validate_access_structure_model_shared,
15        },
16        cursor::CursorPlanError,
17        predicate::{SchemaInfo, ValidateError, validate},
18        query::plan::{
19            AccessPlannedQuery, FieldSlot, GroupAggregateSpec, GroupDistinctAdmissibility,
20            GroupDistinctPolicyReason, GroupHavingSpec, GroupHavingSymbol, GroupSpec, LoadSpec,
21            LogicalPlan, OrderSpec, QueryMode, ScalarPlan, grouped_distinct_admissibility,
22            grouped_having_compare_op_supported, resolve_global_distinct_field_aggregate,
23        },
24    },
25    model::entity::EntityModel,
26    value::Value,
27};
28use std::collections::BTreeSet;
29use thiserror::Error as ThisError;
30
31///
32/// PlanError
33///
34/// Executor-visible validation failures for logical plans.
35///
36/// These errors indicate that a plan cannot be safely executed against the
37/// current schema or entity definition. They are *not* planner bugs.
38///
39
40#[derive(Debug, ThisError)]
41pub enum PlanError {
42    #[error("predicate validation failed: {0}")]
43    PredicateInvalid(Box<ValidateError>),
44
45    #[error("{0}")]
46    Order(Box<OrderPlanError>),
47
48    #[error("{0}")]
49    Access(Box<AccessPlanError>),
50
51    #[error("{0}")]
52    Policy(Box<PolicyPlanError>),
53
54    #[error("{0}")]
55    Cursor(Box<CursorPlanError>),
56
57    #[error("{0}")]
58    Group(Box<GroupPlanError>),
59}
60
61///
62/// OrderPlanError
63///
64/// ORDER BY-specific validation failures.
65///
66
67#[derive(Debug, ThisError)]
68pub enum OrderPlanError {
69    /// ORDER BY references an unknown field.
70    #[error("unknown order field '{field}'")]
71    UnknownField { field: String },
72
73    /// ORDER BY references a field that cannot be ordered.
74    #[error("order field '{field}' is not orderable")]
75    UnorderableField { field: String },
76
77    /// ORDER BY references the same non-primary-key field multiple times.
78    #[error("order field '{field}' appears multiple times")]
79    DuplicateOrderField { field: String },
80
81    /// Ordered plans must terminate with the primary-key tie-break.
82    #[error("order specification must end with primary key '{field}' as deterministic tie-break")]
83    MissingPrimaryKeyTieBreak { field: String },
84}
85
86///
87/// PolicyPlanError
88///
89/// Plan-shape policy failures.
90///
91
92#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
93pub enum PolicyPlanError {
94    /// ORDER BY must specify at least one field.
95    #[error("order specification must include at least one field")]
96    EmptyOrderSpec,
97
98    /// Delete plans must not carry pagination.
99    #[error("delete plans must not include pagination")]
100    DeletePlanWithPagination,
101
102    /// Load plans must not carry delete limits.
103    #[error("load plans must not include delete limits")]
104    LoadPlanWithDeleteLimit,
105
106    /// Delete limits require an explicit ordering.
107    #[error("delete limit requires an explicit ordering")]
108    DeleteLimitRequiresOrder,
109
110    /// Pagination requires an explicit ordering.
111    #[error(
112        "Unordered pagination is not allowed.\nThis query uses LIMIT or OFFSET without an ORDER BY clause.\nPagination without a total ordering is non-deterministic.\nAdd an explicit order_by(...) to make the query stable."
113    )]
114    UnorderedPagination,
115}
116
117///
118/// CursorPagingPolicyError
119///
120/// Cursor pagination readiness errors shared by intent/fluent entry surfaces.
121///
122
123#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
124pub enum CursorPagingPolicyError {
125    #[error(
126        "{message}",
127        message = CursorPlanError::cursor_requires_order_message()
128    )]
129    CursorRequiresOrder,
130
131    #[error(
132        "{message}",
133        message = CursorPlanError::cursor_requires_limit_message()
134    )]
135    CursorRequiresLimit,
136}
137
138///
139/// GroupPlanError
140///
141/// GROUP BY wrapper validation failures owned by query planning.
142///
143
144#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
145pub enum GroupPlanError {
146    /// HAVING requires GROUP BY grouped plan shape.
147    #[error("HAVING is only supported for GROUP BY queries in this release")]
148    HavingRequiresGroupBy,
149
150    /// Grouped validation entrypoint received a scalar logical plan.
151    #[error("group query validation requires grouped logical plan variant")]
152    GroupedLogicalPlanRequired,
153
154    /// GROUP BY requires at least one declared grouping field.
155    #[error("group specification must include at least one group field")]
156    EmptyGroupFields,
157
158    /// Global DISTINCT aggregate shapes without GROUP BY are restricted.
159    #[error(
160        "global DISTINCT aggregate without GROUP BY must declare exactly one DISTINCT field-target aggregate in this release"
161    )]
162    GlobalDistinctAggregateShapeUnsupported,
163
164    /// GROUP BY requires at least one aggregate terminal.
165    #[error("group specification must include at least one aggregate terminal")]
166    EmptyAggregates,
167
168    /// GROUP BY references an unknown group field.
169    #[error("unknown group field '{field}'")]
170    UnknownGroupField { field: String },
171
172    /// GROUP BY must not repeat the same resolved group slot.
173    #[error("group specification has duplicate group key: '{field}'")]
174    DuplicateGroupField { field: String },
175
176    /// GROUP BY v1 does not accept DISTINCT unless adjacency eligibility is explicit.
177    #[error(
178        "grouped DISTINCT requires adjacency-based ordered-group eligibility proof in this release"
179    )]
180    DistinctAdjacencyEligibilityRequired,
181
182    /// GROUP BY ORDER BY shape must start with grouped-key prefix.
183    #[error("grouped ORDER BY must start with GROUP BY key prefix in this release")]
184    OrderPrefixNotAlignedWithGroupKeys,
185
186    /// GROUP BY ORDER BY requires an explicit LIMIT in grouped v1.
187    #[error("grouped ORDER BY requires LIMIT in this release")]
188    OrderRequiresLimit,
189
190    /// HAVING with DISTINCT is deferred until grouped DISTINCT support expands.
191    #[error("grouped HAVING with DISTINCT is not supported in this release")]
192    DistinctHavingUnsupported,
193
194    /// HAVING currently supports compare operators only.
195    #[error("grouped HAVING clause at index={index} uses unsupported operator: {op}")]
196    HavingUnsupportedCompareOp { index: usize, op: String },
197
198    /// HAVING group-field symbols must reference declared grouped keys.
199    #[error("grouped HAVING clause at index={index} references non-group field '{field}'")]
200    HavingNonGroupFieldReference { index: usize, field: String },
201
202    /// HAVING aggregate references must resolve to declared grouped terminals.
203    #[error(
204        "grouped HAVING clause at index={index} references aggregate index {aggregate_index} but aggregate_count={aggregate_count}"
205    )]
206    HavingAggregateIndexOutOfBounds {
207        index: usize,
208        aggregate_index: usize,
209        aggregate_count: usize,
210    },
211
212    /// DISTINCT grouped terminal kinds are intentionally conservative in v1.
213    #[error(
214        "grouped DISTINCT aggregate at index={index} uses unsupported kind '{kind}' in this release"
215    )]
216    DistinctAggregateKindUnsupported { index: usize, kind: String },
217
218    /// DISTINCT over grouped field-target terminals is deferred with field-target support.
219    #[error(
220        "grouped DISTINCT aggregate at index={index} cannot target field '{field}' in this release: found {kind}"
221    )]
222    DistinctAggregateFieldTargetUnsupported {
223        index: usize,
224        kind: String,
225        field: String,
226    },
227
228    /// Aggregate target fields must resolve in the model schema.
229    #[error("unknown grouped aggregate target field at index={index}: '{field}'")]
230    UnknownAggregateTargetField { index: usize, field: String },
231
232    /// Global DISTINCT SUM requires a numeric field target.
233    #[error(
234        "global DISTINCT SUM aggregate target field at index={index} is not numeric: '{field}'"
235    )]
236    GlobalDistinctSumTargetNotNumeric { index: usize, field: String },
237
238    /// Field-target grouped terminals are not enabled in grouped execution v1.
239    #[error(
240        "grouped aggregate at index={index} cannot target field '{field}' in this release: found {kind}"
241    )]
242    FieldTargetAggregatesUnsupported {
243        index: usize,
244        kind: String,
245        field: String,
246    },
247}
248
249///
250/// CursorOrderPlanShapeError
251///
252/// Logical cursor-order plan-shape failures used by cursor/runtime boundary adapters.
253///
254
255#[derive(Clone, Copy, Debug, Eq, PartialEq)]
256pub(crate) enum CursorOrderPlanShapeError {
257    MissingExplicitOrder,
258    EmptyOrderSpec,
259}
260
261///
262/// IntentKeyAccessKind
263///
264/// Key-access shape used by intent policy validation.
265///
266
267#[derive(Clone, Copy, Debug, Eq, PartialEq)]
268pub(crate) enum IntentKeyAccessKind {
269    Single,
270    Many,
271    Only,
272}
273
274///
275/// IntentKeyAccessPolicyViolation
276///
277/// Logical key-access policy violations at query-intent boundaries.
278///
279#[derive(Clone, Copy, Debug, Eq, PartialEq)]
280pub(crate) enum IntentKeyAccessPolicyViolation {
281    KeyAccessConflict,
282    ByIdsWithPredicate,
283    OnlyWithPredicate,
284}
285
286///
287/// FluentLoadPolicyViolation
288///
289/// Fluent load-entry policy violations.
290///
291
292#[derive(Clone, Copy, Debug, Eq, PartialEq)]
293pub(crate) enum FluentLoadPolicyViolation {
294    CursorRequiresPagedExecution,
295    GroupedRequiresExecuteGrouped,
296    CursorRequiresOrder,
297    CursorRequiresLimit,
298}
299
300impl From<ValidateError> for PlanError {
301    fn from(err: ValidateError) -> Self {
302        Self::PredicateInvalid(Box::new(err))
303    }
304}
305
306impl From<OrderPlanError> for PlanError {
307    fn from(err: OrderPlanError) -> Self {
308        Self::Order(Box::new(err))
309    }
310}
311
312impl From<AccessPlanError> for PlanError {
313    fn from(err: AccessPlanError) -> Self {
314        Self::Access(Box::new(err))
315    }
316}
317
318impl From<PolicyPlanError> for PlanError {
319    fn from(err: PolicyPlanError) -> Self {
320        Self::Policy(Box::new(err))
321    }
322}
323
324impl From<CursorPlanError> for PlanError {
325    fn from(err: CursorPlanError) -> Self {
326        Self::Cursor(Box::new(err))
327    }
328}
329
330impl From<GroupPlanError> for PlanError {
331    fn from(err: GroupPlanError) -> Self {
332        Self::Group(Box::new(err))
333    }
334}
335
336/// Validate a logical plan with model-level key values.
337///
338/// Ownership:
339/// - semantic owner for user-facing query validity at planning boundaries
340/// - failures here are user-visible planning failures (`PlanError`)
341///
342/// New user-facing validation rules must be introduced here first, then mirrored
343/// defensively in downstream layers without changing semantics.
344pub(crate) fn validate_query_semantics(
345    schema: &SchemaInfo,
346    model: &EntityModel,
347    plan: &AccessPlannedQuery<Value>,
348) -> Result<(), PlanError> {
349    let logical = plan.scalar_plan();
350
351    validate_plan_core(
352        schema,
353        model,
354        logical,
355        plan,
356        validate_order,
357        |schema, model, plan| {
358            validate_access_structure_model_shared(schema, model, &plan.access)
359                .map_err(PlanError::from)
360        },
361    )?;
362
363    Ok(())
364}
365
366/// Validate grouped query semantics for one grouped plan wrapper.
367///
368/// Ownership:
369/// - semantic owner for GROUP BY wrapper validation
370/// - failures here are user-visible planning failures (`PlanError`)
371pub(crate) fn validate_group_query_semantics(
372    schema: &SchemaInfo,
373    model: &EntityModel,
374    plan: &AccessPlannedQuery<Value>,
375) -> Result<(), PlanError> {
376    let (logical, group, having) = match &plan.logical {
377        LogicalPlan::Grouped(grouped) => (&grouped.scalar, &grouped.group, grouped.having.as_ref()),
378        LogicalPlan::Scalar(_) => {
379            return Err(PlanError::from(GroupPlanError::GroupedLogicalPlanRequired));
380        }
381    };
382
383    validate_plan_core(
384        schema,
385        model,
386        logical,
387        plan,
388        validate_order,
389        |schema, model, plan| {
390            validate_access_structure_model_shared(schema, model, &plan.access)
391                .map_err(PlanError::from)
392        },
393    )?;
394    validate_group_structure(schema, model, group, having)?;
395    validate_group_policy(schema, logical, group, having)?;
396    validate_group_cursor_constraints(logical, group)?;
397
398    Ok(())
399}
400
401// Validate grouped structural invariants before policy/cursor gates.
402fn validate_group_structure(
403    schema: &SchemaInfo,
404    model: &EntityModel,
405    group: &GroupSpec,
406    having: Option<&GroupHavingSpec>,
407) -> Result<(), PlanError> {
408    if group.group_fields.is_empty() && having.is_some() {
409        return Err(PlanError::from(
410            GroupPlanError::GlobalDistinctAggregateShapeUnsupported,
411        ));
412    }
413
414    validate_group_spec_structure(schema, model, group)?;
415    validate_grouped_having_structure(group, having)?;
416
417    Ok(())
418}
419
420// Validate grouped policy gates independent from structural shape checks.
421fn validate_group_policy(
422    schema: &SchemaInfo,
423    logical: &ScalarPlan,
424    group: &GroupSpec,
425    having: Option<&GroupHavingSpec>,
426) -> Result<(), PlanError> {
427    validate_grouped_distinct_policy(logical, having.is_some())?;
428    validate_grouped_having_policy(having)?;
429    validate_group_spec_policy(schema, group, having)?;
430
431    Ok(())
432}
433
434// Validate grouped cursor-order constraints in one dedicated gate.
435fn validate_group_cursor_constraints(
436    logical: &ScalarPlan,
437    group: &GroupSpec,
438) -> Result<(), PlanError> {
439    // Grouped pagination/order constraints are cursor-domain policy:
440    // grouped ORDER BY requires LIMIT and must align with grouped-key prefix.
441    let Some(order) = logical.order.as_ref() else {
442        return Ok(());
443    };
444    if logical.page.as_ref().and_then(|page| page.limit).is_none() {
445        return Err(PlanError::from(GroupPlanError::OrderRequiresLimit));
446    }
447    if order_prefix_aligned_with_group_fields(order, group.group_fields.as_slice()) {
448        return Ok(());
449    }
450
451    Err(PlanError::from(
452        GroupPlanError::OrderPrefixNotAlignedWithGroupKeys,
453    ))
454}
455
456// Validate grouped DISTINCT policy gates for grouped v1 hardening.
457fn validate_grouped_distinct_policy(
458    logical: &ScalarPlan,
459    has_having: bool,
460) -> Result<(), PlanError> {
461    match grouped_distinct_admissibility(logical.distinct, has_having) {
462        GroupDistinctAdmissibility::Allowed => Ok(()),
463        GroupDistinctAdmissibility::Disallowed(reason) => Err(PlanError::from(
464            group_plan_error_from_distinct_policy_reason(reason, None),
465        )),
466    }
467}
468
469// Validate grouped HAVING structural symbol/reference compatibility.
470fn validate_grouped_having_structure(
471    group: &GroupSpec,
472    having: Option<&GroupHavingSpec>,
473) -> Result<(), PlanError> {
474    let Some(having) = having else {
475        return Ok(());
476    };
477
478    for (index, clause) in having.clauses().iter().enumerate() {
479        match clause.symbol() {
480            GroupHavingSymbol::GroupField(field_slot) => {
481                if !group
482                    .group_fields
483                    .iter()
484                    .any(|group_field| group_field.index() == field_slot.index())
485                {
486                    return Err(PlanError::from(
487                        GroupPlanError::HavingNonGroupFieldReference {
488                            index,
489                            field: field_slot.field().to_string(),
490                        },
491                    ));
492                }
493            }
494            GroupHavingSymbol::AggregateIndex(aggregate_index) => {
495                if *aggregate_index >= group.aggregates.len() {
496                    return Err(PlanError::from(
497                        GroupPlanError::HavingAggregateIndexOutOfBounds {
498                            index,
499                            aggregate_index: *aggregate_index,
500                            aggregate_count: group.aggregates.len(),
501                        },
502                    ));
503                }
504            }
505        }
506    }
507
508    Ok(())
509}
510
511// Validate grouped HAVING policy gates and operator support.
512fn validate_grouped_having_policy(having: Option<&GroupHavingSpec>) -> Result<(), PlanError> {
513    let Some(having) = having else {
514        return Ok(());
515    };
516
517    for (index, clause) in having.clauses().iter().enumerate() {
518        if !grouped_having_compare_op_supported(clause.op()) {
519            return Err(PlanError::from(
520                GroupPlanError::HavingUnsupportedCompareOp {
521                    index,
522                    op: format!("{:?}", clause.op()),
523                },
524            ));
525        }
526    }
527
528    Ok(())
529}
530
531// Return true when ORDER BY starts with GROUP BY key fields in declaration order.
532fn order_prefix_aligned_with_group_fields(order: &OrderSpec, group_fields: &[FieldSlot]) -> bool {
533    if order.fields.len() < group_fields.len() {
534        return false;
535    }
536
537    group_fields
538        .iter()
539        .zip(order.fields.iter())
540        .all(|(group_field, (order_field, _))| order_field == group_field.field())
541}
542
543// Validate grouped structural declarations against model/schema shape.
544fn validate_group_spec_structure(
545    schema: &SchemaInfo,
546    model: &EntityModel,
547    group: &GroupSpec,
548) -> Result<(), PlanError> {
549    if group.group_fields.is_empty() {
550        if group.aggregates.iter().any(GroupAggregateSpec::distinct) {
551            return Ok(());
552        }
553
554        return Err(PlanError::from(GroupPlanError::EmptyGroupFields));
555    }
556    if group.aggregates.is_empty() {
557        return Err(PlanError::from(GroupPlanError::EmptyAggregates));
558    }
559
560    let mut seen_group_slots = BTreeSet::<usize>::new();
561    for field_slot in &group.group_fields {
562        if model.fields.get(field_slot.index()).is_none() {
563            return Err(PlanError::from(GroupPlanError::UnknownGroupField {
564                field: field_slot.field().to_string(),
565            }));
566        }
567        if !seen_group_slots.insert(field_slot.index()) {
568            return Err(PlanError::from(GroupPlanError::DuplicateGroupField {
569                field: field_slot.field().to_string(),
570            }));
571        }
572    }
573
574    for (index, aggregate) in group.aggregates.iter().enumerate() {
575        let Some(target_field) = aggregate.target_field.as_ref() else {
576            continue;
577        };
578        if schema.field(target_field).is_none() {
579            return Err(PlanError::from(
580                GroupPlanError::UnknownAggregateTargetField {
581                    index,
582                    field: target_field.clone(),
583                },
584            ));
585        }
586    }
587
588    Ok(())
589}
590
591// Validate grouped execution policy over a structurally valid grouped spec.
592fn validate_group_spec_policy(
593    schema: &SchemaInfo,
594    group: &GroupSpec,
595    having: Option<&GroupHavingSpec>,
596) -> Result<(), PlanError> {
597    if group.group_fields.is_empty() {
598        validate_global_distinct_aggregate_without_group_keys(schema, group, having)?;
599        return Ok(());
600    }
601
602    for (index, aggregate) in group.aggregates.iter().enumerate() {
603        if aggregate.distinct() && !aggregate.kind().supports_grouped_distinct_v1() {
604            return Err(PlanError::from(
605                GroupPlanError::DistinctAggregateKindUnsupported {
606                    index,
607                    kind: format!("{:?}", aggregate.kind()),
608                },
609            ));
610        }
611
612        let Some(target_field) = aggregate.target_field.as_ref() else {
613            continue;
614        };
615        if aggregate.distinct() {
616            return Err(PlanError::from(
617                GroupPlanError::DistinctAggregateFieldTargetUnsupported {
618                    index,
619                    kind: format!("{:?}", aggregate.kind()),
620                    field: target_field.clone(),
621                },
622            ));
623        }
624        return Err(PlanError::from(
625            GroupPlanError::FieldTargetAggregatesUnsupported {
626                index,
627                kind: format!("{:?}", aggregate.kind()),
628                field: target_field.clone(),
629            },
630        ));
631    }
632
633    Ok(())
634}
635
636// Validate the restricted global DISTINCT aggregate shape (`GROUP BY` omitted).
637fn validate_global_distinct_aggregate_without_group_keys(
638    schema: &SchemaInfo,
639    group: &GroupSpec,
640    having: Option<&GroupHavingSpec>,
641) -> Result<(), PlanError> {
642    let aggregate = match resolve_global_distinct_field_aggregate(
643        group.group_fields.as_slice(),
644        group.aggregates.as_slice(),
645        having,
646    ) {
647        Ok(Some(aggregate)) => aggregate,
648        Ok(None) => {
649            return Err(PlanError::from(
650                GroupPlanError::GlobalDistinctAggregateShapeUnsupported,
651            ));
652        }
653        Err(reason) => {
654            let aggregate = group.aggregates.first();
655            return Err(PlanError::from(
656                group_plan_error_from_distinct_policy_reason(reason, aggregate),
657            ));
658        }
659    };
660
661    let target_field = aggregate.target_field();
662    let Some(field_type) = schema.field(target_field) else {
663        return Err(PlanError::from(
664            GroupPlanError::UnknownAggregateTargetField {
665                index: 0,
666                field: target_field.to_string(),
667            },
668        ));
669    };
670    if aggregate.kind().is_sum() && !field_type.supports_numeric_coercion() {
671        return Err(PlanError::from(
672            GroupPlanError::GlobalDistinctSumTargetNotNumeric {
673                index: 0,
674                field: target_field.to_string(),
675            },
676        ));
677    }
678
679    Ok(())
680}
681
682// Map one grouped DISTINCT policy reason to planner-visible grouped plan errors.
683fn group_plan_error_from_distinct_policy_reason(
684    reason: GroupDistinctPolicyReason,
685    aggregate: Option<&GroupAggregateSpec>,
686) -> GroupPlanError {
687    match reason {
688        GroupDistinctPolicyReason::DistinctHavingUnsupported => {
689            GroupPlanError::DistinctHavingUnsupported
690        }
691        GroupDistinctPolicyReason::DistinctAdjacencyEligibilityRequired => {
692            GroupPlanError::DistinctAdjacencyEligibilityRequired
693        }
694        GroupDistinctPolicyReason::GlobalDistinctHavingUnsupported
695        | GroupDistinctPolicyReason::GlobalDistinctRequiresSingleAggregate
696        | GroupDistinctPolicyReason::GlobalDistinctRequiresFieldTargetAggregate
697        | GroupDistinctPolicyReason::GlobalDistinctRequiresDistinctAggregateTerminal => {
698            GroupPlanError::GlobalDistinctAggregateShapeUnsupported
699        }
700        GroupDistinctPolicyReason::GlobalDistinctUnsupportedAggregateKind => {
701            let kind = aggregate.map_or_else(
702                || "Unknown".to_string(),
703                |aggregate| format!("{:?}", aggregate.kind()),
704            );
705            GroupPlanError::DistinctAggregateKindUnsupported { index: 0, kind }
706        }
707    }
708}
709
710// Shared logical plan validation core owned by planner semantics.
711fn validate_plan_core<K, FOrder, FAccess>(
712    schema: &SchemaInfo,
713    model: &EntityModel,
714    logical: &ScalarPlan,
715    plan: &AccessPlannedQuery<K>,
716    validate_order_fn: FOrder,
717    validate_access_fn: FAccess,
718) -> Result<(), PlanError>
719where
720    FOrder: Fn(&SchemaInfo, &OrderSpec) -> Result<(), PlanError>,
721    FAccess: Fn(&SchemaInfo, &EntityModel, &AccessPlannedQuery<K>) -> Result<(), PlanError>,
722{
723    if let Some(predicate) = &logical.predicate {
724        validate(schema, predicate)?;
725    }
726
727    if let Some(order) = &logical.order {
728        validate_order_fn(schema, order)?;
729        validate_no_duplicate_non_pk_order_fields(model, order)?;
730        validate_primary_key_tie_break(model, order)?;
731    }
732
733    validate_access_fn(schema, model, plan)?;
734    validate_plan_shape(&plan.logical)?;
735
736    Ok(())
737}
738// ORDER validation ownership contract:
739// - This module owns ORDER semantic validation (field existence/orderability/tie-break).
740// - ORDER canonicalization (primary-key tie-break insertion) is performed at the
741//   intent boundary via `canonicalize_order_spec` before plan validation.
742// - Shape-policy checks (for example empty ORDER, pagination/order coupling) are owned here.
743// - Executor/runtime layers may defend execution preconditions only.
744
745/// Return true when an ORDER BY exists and contains at least one field.
746#[must_use]
747pub(crate) fn has_explicit_order(order: Option<&OrderSpec>) -> bool {
748    order.is_some_and(|order| !order.fields.is_empty())
749}
750
751/// Return true when an ORDER BY exists but is empty.
752#[must_use]
753pub(crate) fn has_empty_order(order: Option<&OrderSpec>) -> bool {
754    order.is_some_and(|order| order.fields.is_empty())
755}
756
757/// Validate order-shape rules shared across intent and logical plan boundaries.
758pub(crate) fn validate_order_shape(order: Option<&OrderSpec>) -> Result<(), PolicyPlanError> {
759    if has_empty_order(order) {
760        return Err(PolicyPlanError::EmptyOrderSpec);
761    }
762
763    Ok(())
764}
765
766/// Validate intent-level plan-shape rules derived from query mode + order.
767pub(crate) fn validate_intent_plan_shape(
768    mode: QueryMode,
769    order: Option<&OrderSpec>,
770) -> Result<(), PolicyPlanError> {
771    validate_order_shape(order)?;
772
773    let has_order = has_explicit_order(order);
774    if matches!(mode, QueryMode::Delete(spec) if spec.limit.is_some()) && !has_order {
775        return Err(PolicyPlanError::DeleteLimitRequiresOrder);
776    }
777
778    Ok(())
779}
780
781/// Validate cursor-pagination readiness for a load-spec + ordering pair.
782pub(crate) const fn validate_cursor_paging_requirements(
783    has_order: bool,
784    spec: LoadSpec,
785) -> Result<(), CursorPagingPolicyError> {
786    if !has_order {
787        return Err(CursorPagingPolicyError::CursorRequiresOrder);
788    }
789    if spec.limit.is_none() {
790        return Err(CursorPagingPolicyError::CursorRequiresLimit);
791    }
792
793    Ok(())
794}
795
796/// Validate cursor-order shape and return the logical order contract when present.
797pub(crate) const fn validate_cursor_order_plan_shape(
798    order: Option<&OrderSpec>,
799    require_explicit_order: bool,
800) -> Result<Option<&OrderSpec>, CursorOrderPlanShapeError> {
801    let Some(order) = order else {
802        if require_explicit_order {
803            return Err(CursorOrderPlanShapeError::MissingExplicitOrder);
804        }
805
806        return Ok(None);
807    };
808
809    if order.fields.is_empty() {
810        return Err(CursorOrderPlanShapeError::EmptyOrderSpec);
811    }
812
813    Ok(Some(order))
814}
815
816/// Resolve one grouped field into a stable field slot.
817pub(crate) fn resolve_group_field_slot(
818    model: &EntityModel,
819    field: &str,
820) -> Result<FieldSlot, PlanError> {
821    FieldSlot::resolve(model, field).ok_or_else(|| {
822        PlanError::from(GroupPlanError::UnknownGroupField {
823            field: field.to_string(),
824        })
825    })
826}
827
828/// Validate intent key-access policy before planning.
829pub(crate) const fn validate_intent_key_access_policy(
830    key_access_conflict: bool,
831    key_access_kind: Option<IntentKeyAccessKind>,
832    has_predicate: bool,
833) -> Result<(), IntentKeyAccessPolicyViolation> {
834    if key_access_conflict {
835        return Err(IntentKeyAccessPolicyViolation::KeyAccessConflict);
836    }
837
838    match key_access_kind {
839        Some(IntentKeyAccessKind::Many) if has_predicate => {
840            Err(IntentKeyAccessPolicyViolation::ByIdsWithPredicate)
841        }
842        Some(IntentKeyAccessKind::Only) if has_predicate => {
843            Err(IntentKeyAccessPolicyViolation::OnlyWithPredicate)
844        }
845        Some(
846            IntentKeyAccessKind::Single | IntentKeyAccessKind::Many | IntentKeyAccessKind::Only,
847        )
848        | None => Ok(()),
849    }
850}
851
852/// Validate fluent non-paged load entry policy.
853pub(crate) const fn validate_fluent_non_paged_mode(
854    has_cursor_token: bool,
855    has_grouping: bool,
856) -> Result<(), FluentLoadPolicyViolation> {
857    if has_cursor_token {
858        return Err(FluentLoadPolicyViolation::CursorRequiresPagedExecution);
859    }
860    if has_grouping {
861        return Err(FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped);
862    }
863
864    Ok(())
865}
866
867/// Validate fluent paged load entry policy.
868pub(crate) fn validate_fluent_paged_mode(
869    has_grouping: bool,
870    has_explicit_order: bool,
871    spec: Option<LoadSpec>,
872) -> Result<(), FluentLoadPolicyViolation> {
873    if has_grouping {
874        return Err(FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped);
875    }
876
877    let Some(spec) = spec else {
878        return Ok(());
879    };
880
881    validate_cursor_paging_requirements(has_explicit_order, spec).map_err(|err| match err {
882        CursorPagingPolicyError::CursorRequiresOrder => {
883            FluentLoadPolicyViolation::CursorRequiresOrder
884        }
885        CursorPagingPolicyError::CursorRequiresLimit => {
886            FluentLoadPolicyViolation::CursorRequiresLimit
887        }
888    })
889}
890
891/// Validate mode/order/pagination invariants for one logical plan.
892pub(crate) fn validate_plan_shape(plan: &LogicalPlan) -> Result<(), PolicyPlanError> {
893    let grouped = matches!(plan, LogicalPlan::Grouped(_));
894    let plan = match plan {
895        LogicalPlan::Scalar(plan) => plan,
896        LogicalPlan::Grouped(plan) => &plan.scalar,
897    };
898    validate_order_shape(plan.order.as_ref())?;
899
900    let has_order = has_explicit_order(plan.order.as_ref());
901    if plan.delete_limit.is_some() && !has_order {
902        return Err(PolicyPlanError::DeleteLimitRequiresOrder);
903    }
904
905    match plan.mode {
906        QueryMode::Delete(_) => {
907            if plan.page.is_some() {
908                return Err(PolicyPlanError::DeletePlanWithPagination);
909            }
910        }
911        QueryMode::Load(_) => {
912            if plan.delete_limit.is_some() {
913                return Err(PolicyPlanError::LoadPlanWithDeleteLimit);
914            }
915            // GROUP BY v1 uses canonical grouped key ordering when ORDER BY is
916            // omitted, so grouped pagination remains deterministic without an
917            // explicit sort clause.
918            if plan.page.is_some() && !has_order && !grouped {
919                return Err(PolicyPlanError::UnorderedPagination);
920            }
921        }
922    }
923
924    Ok(())
925}
926
927/// Validate ORDER BY fields against the schema.
928pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
929    for (field, _) in &order.fields {
930        let field_type = schema
931            .field(field)
932            .ok_or_else(|| OrderPlanError::UnknownField {
933                field: field.clone(),
934            })
935            .map_err(PlanError::from)?;
936
937        if !field_type.is_orderable() {
938            // CONTRACT: ORDER BY rejects non-queryable or unordered fields.
939            return Err(PlanError::from(OrderPlanError::UnorderableField {
940                field: field.clone(),
941            }));
942        }
943    }
944
945    Ok(())
946}
947
948/// Reject duplicate non-primary-key fields in ORDER BY.
949pub(crate) fn validate_no_duplicate_non_pk_order_fields(
950    model: &EntityModel,
951    order: &OrderSpec,
952) -> Result<(), PlanError> {
953    let mut seen = BTreeSet::new();
954    let pk_field = model.primary_key.name;
955
956    for (field, _) in &order.fields {
957        if field == pk_field {
958            continue;
959        }
960        if !seen.insert(field.as_str()) {
961            return Err(PlanError::from(OrderPlanError::DuplicateOrderField {
962                field: field.clone(),
963            }));
964        }
965    }
966
967    Ok(())
968}
969
970// Ordered plans must include exactly one terminal primary-key field so ordering is total and
971// deterministic across explain, fingerprint, and executor comparison paths.
972pub(crate) fn validate_primary_key_tie_break(
973    model: &EntityModel,
974    order: &OrderSpec,
975) -> Result<(), PlanError> {
976    if order.fields.is_empty() {
977        return Ok(());
978    }
979
980    let pk_field = model.primary_key.name;
981    let pk_count = order
982        .fields
983        .iter()
984        .filter(|(field, _)| field == pk_field)
985        .count();
986    let trailing_pk = order
987        .fields
988        .last()
989        .is_some_and(|(field, _)| field == pk_field);
990
991    if pk_count == 1 && trailing_pk {
992        Ok(())
993    } else {
994        Err(PlanError::from(OrderPlanError::MissingPrimaryKeyTieBreak {
995            field: pk_field.to_string(),
996        }))
997    }
998}