Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Query-plan validation for planner-owned logical semantics.
2//!
3//! Validation ownership contract:
4//! - `validate_query_semantics` owns user-facing query semantics and emits `PlanError`.
5//! - executor-boundary defensive checks live in `db::executor::plan_validate`.
6//!
7//! Future rule changes must declare a semantic owner. Defensive re-check layers may mirror
8//! rules, but must not reinterpret semantics or error class intent.
9
10use crate::{
11    db::{
12        access::{
13            AccessPlanError,
14            validate_access_structure_model as validate_access_structure_model_shared,
15        },
16        cursor::CursorPlanError,
17        predicate::{SchemaInfo, ValidateError, validate},
18        query::plan::{
19            AccessPlannedQuery, FieldSlot, GroupAggregateSpec, GroupDistinctAdmissibility,
20            GroupDistinctPolicyReason, GroupHavingSpec, GroupHavingSymbol, GroupSpec, LoadSpec,
21            LogicalPlan, OrderSpec, QueryMode, ScalarPlan,
22            expr::{ProjectionField, ProjectionSpec, expr_references_only_fields},
23            grouped_distinct_admissibility, grouped_having_compare_op_supported,
24            resolve_global_distinct_field_aggregate,
25        },
26    },
27    model::entity::EntityModel,
28    value::Value,
29};
30use std::collections::{BTreeSet, HashSet};
31use thiserror::Error as ThisError;
32
33///
34/// PlanError
35///
36/// Executor-visible validation failures for logical plans.
37///
38/// These errors indicate that a plan cannot be safely executed against the
39/// current schema or entity definition. They are *not* planner bugs.
40///
41
42#[derive(Debug, ThisError)]
43pub enum PlanError {
44    #[error("predicate validation failed: {0}")]
45    PredicateInvalid(Box<ValidateError>),
46
47    #[error("{0}")]
48    Order(Box<OrderPlanError>),
49
50    #[error("{0}")]
51    Access(Box<AccessPlanError>),
52
53    #[error("{0}")]
54    Policy(Box<PolicyPlanError>),
55
56    #[error("{0}")]
57    Cursor(Box<CursorPlanError>),
58
59    #[error("{0}")]
60    Group(Box<GroupPlanError>),
61
62    #[error("{0}")]
63    Expr(Box<ExprPlanError>),
64}
65
66///
67/// OrderPlanError
68///
69/// ORDER BY-specific validation failures.
70///
71
72#[derive(Debug, ThisError)]
73pub enum OrderPlanError {
74    /// ORDER BY references an unknown field.
75    #[error("unknown order field '{field}'")]
76    UnknownField { field: String },
77
78    /// ORDER BY references a field that cannot be ordered.
79    #[error("order field '{field}' is not orderable")]
80    UnorderableField { field: String },
81
82    /// ORDER BY references the same non-primary-key field multiple times.
83    #[error("order field '{field}' appears multiple times")]
84    DuplicateOrderField { field: String },
85
86    /// Ordered plans must terminate with the primary-key tie-break.
87    #[error("order specification must end with primary key '{field}' as deterministic tie-break")]
88    MissingPrimaryKeyTieBreak { field: String },
89}
90
91///
92/// PolicyPlanError
93///
94/// Plan-shape policy failures.
95///
96
97#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
98pub enum PolicyPlanError {
99    /// ORDER BY must specify at least one field.
100    #[error("order specification must include at least one field")]
101    EmptyOrderSpec,
102
103    /// Delete plans must not carry pagination.
104    #[error("delete plans must not include pagination")]
105    DeletePlanWithPagination,
106
107    /// Load plans must not carry delete limits.
108    #[error("load plans must not include delete limits")]
109    LoadPlanWithDeleteLimit,
110
111    /// Delete limits require an explicit ordering.
112    #[error("delete limit requires an explicit ordering")]
113    DeleteLimitRequiresOrder,
114
115    /// Pagination requires an explicit ordering.
116    #[error(
117        "Unordered pagination is not allowed.\nThis query uses LIMIT or OFFSET without an ORDER BY clause.\nPagination without a total ordering is non-deterministic.\nAdd an explicit order_by(...) to make the query stable."
118    )]
119    UnorderedPagination,
120}
121
122///
123/// CursorPagingPolicyError
124///
125/// Cursor pagination readiness errors shared by intent/fluent entry surfaces.
126///
127
128#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
129pub enum CursorPagingPolicyError {
130    #[error(
131        "{message}",
132        message = CursorPlanError::cursor_requires_order_message()
133    )]
134    CursorRequiresOrder,
135
136    #[error(
137        "{message}",
138        message = CursorPlanError::cursor_requires_limit_message()
139    )]
140    CursorRequiresLimit,
141}
142
143///
144/// GroupPlanError
145///
146/// GROUP BY wrapper validation failures owned by query planning.
147///
148
149#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
150pub enum GroupPlanError {
151    /// HAVING requires GROUP BY grouped plan shape.
152    #[error("HAVING is only supported for GROUP BY queries in this release")]
153    HavingRequiresGroupBy,
154
155    /// Grouped validation entrypoint received a scalar logical plan.
156    #[error("group query validation requires grouped logical plan variant")]
157    GroupedLogicalPlanRequired,
158
159    /// GROUP BY requires at least one declared grouping field.
160    #[error("group specification must include at least one group field")]
161    EmptyGroupFields,
162
163    /// Global DISTINCT aggregate shapes without GROUP BY are restricted.
164    #[error(
165        "global DISTINCT aggregate without GROUP BY must declare exactly one DISTINCT field-target aggregate in this release"
166    )]
167    GlobalDistinctAggregateShapeUnsupported,
168
169    /// GROUP BY requires at least one aggregate terminal.
170    #[error("group specification must include at least one aggregate terminal")]
171    EmptyAggregates,
172
173    /// GROUP BY references an unknown group field.
174    #[error("unknown group field '{field}'")]
175    UnknownGroupField { field: String },
176
177    /// GROUP BY must not repeat the same resolved group slot.
178    #[error("group specification has duplicate group key: '{field}'")]
179    DuplicateGroupField { field: String },
180
181    /// GROUP BY v1 does not accept DISTINCT unless adjacency eligibility is explicit.
182    #[error(
183        "grouped DISTINCT requires adjacency-based ordered-group eligibility proof in this release"
184    )]
185    DistinctAdjacencyEligibilityRequired,
186
187    /// GROUP BY ORDER BY shape must start with grouped-key prefix.
188    #[error("grouped ORDER BY must start with GROUP BY key prefix in this release")]
189    OrderPrefixNotAlignedWithGroupKeys,
190
191    /// GROUP BY ORDER BY requires an explicit LIMIT in grouped v1.
192    #[error("grouped ORDER BY requires LIMIT in this release")]
193    OrderRequiresLimit,
194
195    /// HAVING with DISTINCT is deferred until grouped DISTINCT support expands.
196    #[error("grouped HAVING with DISTINCT is not supported in this release")]
197    DistinctHavingUnsupported,
198
199    /// HAVING currently supports compare operators only.
200    #[error("grouped HAVING clause at index={index} uses unsupported operator: {op}")]
201    HavingUnsupportedCompareOp { index: usize, op: String },
202
203    /// HAVING group-field symbols must reference declared grouped keys.
204    #[error("grouped HAVING clause at index={index} references non-group field '{field}'")]
205    HavingNonGroupFieldReference { index: usize, field: String },
206
207    /// HAVING aggregate references must resolve to declared grouped terminals.
208    #[error(
209        "grouped HAVING clause at index={index} references aggregate index {aggregate_index} but aggregate_count={aggregate_count}"
210    )]
211    HavingAggregateIndexOutOfBounds {
212        index: usize,
213        aggregate_index: usize,
214        aggregate_count: usize,
215    },
216
217    /// DISTINCT grouped terminal kinds are intentionally conservative in v1.
218    #[error(
219        "grouped DISTINCT aggregate at index={index} uses unsupported kind '{kind}' in this release"
220    )]
221    DistinctAggregateKindUnsupported { index: usize, kind: String },
222
223    /// DISTINCT over grouped field-target terminals is deferred with field-target support.
224    #[error(
225        "grouped DISTINCT aggregate at index={index} cannot target field '{field}' in this release: found {kind}"
226    )]
227    DistinctAggregateFieldTargetUnsupported {
228        index: usize,
229        kind: String,
230        field: String,
231    },
232
233    /// Aggregate target fields must resolve in the model schema.
234    #[error("unknown grouped aggregate target field at index={index}: '{field}'")]
235    UnknownAggregateTargetField { index: usize, field: String },
236
237    /// Global DISTINCT SUM requires a numeric field target.
238    #[error(
239        "global DISTINCT SUM aggregate target field at index={index} is not numeric: '{field}'"
240    )]
241    GlobalDistinctSumTargetNotNumeric { index: usize, field: String },
242
243    /// Field-target grouped terminals are not enabled in grouped execution v1.
244    #[error(
245        "grouped aggregate at index={index} cannot target field '{field}' in this release: found {kind}"
246    )]
247    FieldTargetAggregatesUnsupported {
248        index: usize,
249        kind: String,
250        field: String,
251    },
252}
253
254///
255/// ExprPlanError
256///
257/// Expression-spine inference failures owned by planner semantics.
258///
259
260#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
261pub enum ExprPlanError {
262    /// Expression references a field that does not exist in schema.
263    #[error("unknown expression field '{field}'")]
264    UnknownExprField { field: String },
265
266    /// Aggregate terminal requires a numeric target field.
267    #[error("aggregate '{kind}' requires numeric target field '{field}'")]
268    NonNumericAggregateTarget { kind: String, field: String },
269
270    /// Unary operation is incompatible with inferred operand type.
271    #[error("unary operator '{op}' is incompatible with operand type {found}")]
272    InvalidUnaryOperand { op: String, found: String },
273
274    /// Binary operation is incompatible with inferred operand types.
275    #[error("binary operator '{op}' is incompatible with operand types ({left}, {right})")]
276    InvalidBinaryOperands {
277        op: String,
278        left: String,
279        right: String,
280    },
281
282    /// GROUP BY projections must not reference fields outside grouped keys.
283    #[error(
284        "grouped projection expression at index={index} references fields outside GROUP BY keys"
285    )]
286    GroupedProjectionReferencesNonGroupField { index: usize },
287}
288
289///
290/// CursorOrderPlanShapeError
291///
292/// Logical cursor-order plan-shape failures used by cursor/runtime boundary adapters.
293///
294
295#[derive(Clone, Copy, Debug, Eq, PartialEq)]
296pub(crate) enum CursorOrderPlanShapeError {
297    MissingExplicitOrder,
298    EmptyOrderSpec,
299}
300
301///
302/// IntentKeyAccessKind
303///
304/// Key-access shape used by intent policy validation.
305///
306
307#[derive(Clone, Copy, Debug, Eq, PartialEq)]
308pub(crate) enum IntentKeyAccessKind {
309    Single,
310    Many,
311    Only,
312}
313
314///
315/// IntentKeyAccessPolicyViolation
316///
317/// Logical key-access policy violations at query-intent boundaries.
318///
319#[derive(Clone, Copy, Debug, Eq, PartialEq)]
320pub(crate) enum IntentKeyAccessPolicyViolation {
321    KeyAccessConflict,
322    ByIdsWithPredicate,
323    OnlyWithPredicate,
324}
325
326///
327/// FluentLoadPolicyViolation
328///
329/// Fluent load-entry policy violations.
330///
331
332#[derive(Clone, Copy, Debug, Eq, PartialEq)]
333pub(crate) enum FluentLoadPolicyViolation {
334    CursorRequiresPagedExecution,
335    GroupedRequiresExecuteGrouped,
336    CursorRequiresOrder,
337    CursorRequiresLimit,
338}
339
340impl From<ValidateError> for PlanError {
341    fn from(err: ValidateError) -> Self {
342        Self::PredicateInvalid(Box::new(err))
343    }
344}
345
346impl From<OrderPlanError> for PlanError {
347    fn from(err: OrderPlanError) -> Self {
348        Self::Order(Box::new(err))
349    }
350}
351
352impl From<AccessPlanError> for PlanError {
353    fn from(err: AccessPlanError) -> Self {
354        Self::Access(Box::new(err))
355    }
356}
357
358impl From<PolicyPlanError> for PlanError {
359    fn from(err: PolicyPlanError) -> Self {
360        Self::Policy(Box::new(err))
361    }
362}
363
364impl From<CursorPlanError> for PlanError {
365    fn from(err: CursorPlanError) -> Self {
366        Self::Cursor(Box::new(err))
367    }
368}
369
370impl From<GroupPlanError> for PlanError {
371    fn from(err: GroupPlanError) -> Self {
372        Self::Group(Box::new(err))
373    }
374}
375
376impl From<ExprPlanError> for PlanError {
377    fn from(err: ExprPlanError) -> Self {
378        Self::Expr(Box::new(err))
379    }
380}
381
382/// Validate a logical plan with model-level key values.
383///
384/// Ownership:
385/// - semantic owner for user-facing query validity at planning boundaries
386/// - failures here are user-visible planning failures (`PlanError`)
387///
388/// New user-facing validation rules must be introduced here first, then mirrored
389/// defensively in downstream layers without changing semantics.
390pub(crate) fn validate_query_semantics(
391    schema: &SchemaInfo,
392    model: &EntityModel,
393    plan: &AccessPlannedQuery<Value>,
394) -> Result<(), PlanError> {
395    let logical = plan.scalar_plan();
396
397    validate_plan_core(
398        schema,
399        model,
400        logical,
401        plan,
402        validate_order,
403        |schema, model, plan| {
404            validate_access_structure_model_shared(schema, model, &plan.access)
405                .map_err(PlanError::from)
406        },
407    )?;
408
409    Ok(())
410}
411
412/// Validate grouped query semantics for one grouped plan wrapper.
413///
414/// Ownership:
415/// - semantic owner for GROUP BY wrapper validation
416/// - failures here are user-visible planning failures (`PlanError`)
417pub(crate) fn validate_group_query_semantics(
418    schema: &SchemaInfo,
419    model: &EntityModel,
420    plan: &AccessPlannedQuery<Value>,
421) -> Result<(), PlanError> {
422    let (logical, group, having) = match &plan.logical {
423        LogicalPlan::Grouped(grouped) => (&grouped.scalar, &grouped.group, grouped.having.as_ref()),
424        LogicalPlan::Scalar(_) => {
425            return Err(PlanError::from(GroupPlanError::GroupedLogicalPlanRequired));
426        }
427    };
428    let projection = plan.projection_spec(model);
429
430    validate_plan_core(
431        schema,
432        model,
433        logical,
434        plan,
435        validate_order,
436        |schema, model, plan| {
437            validate_access_structure_model_shared(schema, model, &plan.access)
438                .map_err(PlanError::from)
439        },
440    )?;
441    validate_group_structure(schema, model, group, &projection, having)?;
442    validate_group_policy(schema, logical, group, having)?;
443    validate_group_cursor_constraints(logical, group)?;
444
445    Ok(())
446}
447
448// Validate grouped structural invariants before policy/cursor gates.
449fn validate_group_structure(
450    schema: &SchemaInfo,
451    model: &EntityModel,
452    group: &GroupSpec,
453    projection: &ProjectionSpec,
454    having: Option<&GroupHavingSpec>,
455) -> Result<(), PlanError> {
456    if group.group_fields.is_empty() && having.is_some() {
457        return Err(PlanError::from(
458            GroupPlanError::GlobalDistinctAggregateShapeUnsupported,
459        ));
460    }
461
462    validate_group_spec_structure(schema, model, group)?;
463    validate_group_projection_expr_compatibility(group, projection)?;
464    validate_grouped_having_structure(group, having)?;
465
466    Ok(())
467}
468
469// Validate grouped policy gates independent from structural shape checks.
470fn validate_group_policy(
471    schema: &SchemaInfo,
472    logical: &ScalarPlan,
473    group: &GroupSpec,
474    having: Option<&GroupHavingSpec>,
475) -> Result<(), PlanError> {
476    validate_grouped_distinct_policy(logical, having.is_some())?;
477    validate_grouped_having_policy(having)?;
478    validate_group_spec_policy(schema, group, having)?;
479
480    Ok(())
481}
482
483// Validate grouped cursor-order constraints in one dedicated gate.
484fn validate_group_cursor_constraints(
485    logical: &ScalarPlan,
486    group: &GroupSpec,
487) -> Result<(), PlanError> {
488    // Grouped pagination/order constraints are cursor-domain policy:
489    // grouped ORDER BY requires LIMIT and must align with grouped-key prefix.
490    let Some(order) = logical.order.as_ref() else {
491        return Ok(());
492    };
493    if logical.page.as_ref().and_then(|page| page.limit).is_none() {
494        return Err(PlanError::from(GroupPlanError::OrderRequiresLimit));
495    }
496    if order_prefix_aligned_with_group_fields(order, group.group_fields.as_slice()) {
497        return Ok(());
498    }
499
500    Err(PlanError::from(
501        GroupPlanError::OrderPrefixNotAlignedWithGroupKeys,
502    ))
503}
504
505// Validate grouped DISTINCT policy gates for grouped v1 hardening.
506fn validate_grouped_distinct_policy(
507    logical: &ScalarPlan,
508    has_having: bool,
509) -> Result<(), PlanError> {
510    match grouped_distinct_admissibility(logical.distinct, has_having) {
511        GroupDistinctAdmissibility::Allowed => Ok(()),
512        GroupDistinctAdmissibility::Disallowed(reason) => Err(PlanError::from(
513            group_plan_error_from_distinct_policy_reason(reason, None),
514        )),
515    }
516}
517
518// Validate grouped HAVING structural symbol/reference compatibility.
519fn validate_grouped_having_structure(
520    group: &GroupSpec,
521    having: Option<&GroupHavingSpec>,
522) -> Result<(), PlanError> {
523    let Some(having) = having else {
524        return Ok(());
525    };
526
527    for (index, clause) in having.clauses().iter().enumerate() {
528        match clause.symbol() {
529            GroupHavingSymbol::GroupField(field_slot) => {
530                if !group
531                    .group_fields
532                    .iter()
533                    .any(|group_field| group_field.index() == field_slot.index())
534                {
535                    return Err(PlanError::from(
536                        GroupPlanError::HavingNonGroupFieldReference {
537                            index,
538                            field: field_slot.field().to_string(),
539                        },
540                    ));
541                }
542            }
543            GroupHavingSymbol::AggregateIndex(aggregate_index) => {
544                if *aggregate_index >= group.aggregates.len() {
545                    return Err(PlanError::from(
546                        GroupPlanError::HavingAggregateIndexOutOfBounds {
547                            index,
548                            aggregate_index: *aggregate_index,
549                            aggregate_count: group.aggregates.len(),
550                        },
551                    ));
552                }
553            }
554        }
555    }
556
557    Ok(())
558}
559
560// Validate grouped HAVING policy gates and operator support.
561fn validate_grouped_having_policy(having: Option<&GroupHavingSpec>) -> Result<(), PlanError> {
562    let Some(having) = having else {
563        return Ok(());
564    };
565
566    for (index, clause) in having.clauses().iter().enumerate() {
567        if !grouped_having_compare_op_supported(clause.op()) {
568            return Err(PlanError::from(
569                GroupPlanError::HavingUnsupportedCompareOp {
570                    index,
571                    op: format!("{:?}", clause.op()),
572                },
573            ));
574        }
575    }
576
577    Ok(())
578}
579
580// Return true when ORDER BY starts with GROUP BY key fields in declaration order.
581fn order_prefix_aligned_with_group_fields(order: &OrderSpec, group_fields: &[FieldSlot]) -> bool {
582    if order.fields.len() < group_fields.len() {
583        return false;
584    }
585
586    group_fields
587        .iter()
588        .zip(order.fields.iter())
589        .all(|(group_field, (order_field, _))| order_field == group_field.field())
590}
591
592// Validate grouped structural declarations against model/schema shape.
593fn validate_group_spec_structure(
594    schema: &SchemaInfo,
595    model: &EntityModel,
596    group: &GroupSpec,
597) -> Result<(), PlanError> {
598    if group.group_fields.is_empty() {
599        if group.aggregates.iter().any(GroupAggregateSpec::distinct) {
600            return Ok(());
601        }
602
603        return Err(PlanError::from(GroupPlanError::EmptyGroupFields));
604    }
605    if group.aggregates.is_empty() {
606        return Err(PlanError::from(GroupPlanError::EmptyAggregates));
607    }
608
609    let mut seen_group_slots = BTreeSet::<usize>::new();
610    for field_slot in &group.group_fields {
611        if model.fields.get(field_slot.index()).is_none() {
612            return Err(PlanError::from(GroupPlanError::UnknownGroupField {
613                field: field_slot.field().to_string(),
614            }));
615        }
616        if !seen_group_slots.insert(field_slot.index()) {
617            return Err(PlanError::from(GroupPlanError::DuplicateGroupField {
618                field: field_slot.field().to_string(),
619            }));
620        }
621    }
622
623    for (index, aggregate) in group.aggregates.iter().enumerate() {
624        let Some(target_field) = aggregate.target_field.as_ref() else {
625            continue;
626        };
627        if schema.field(target_field).is_none() {
628            return Err(PlanError::from(
629                GroupPlanError::UnknownAggregateTargetField {
630                    index,
631                    field: target_field.clone(),
632                },
633            ));
634        }
635    }
636
637    Ok(())
638}
639
640// Validate grouped execution policy over a structurally valid grouped spec.
641fn validate_group_spec_policy(
642    schema: &SchemaInfo,
643    group: &GroupSpec,
644    having: Option<&GroupHavingSpec>,
645) -> Result<(), PlanError> {
646    if group.group_fields.is_empty() {
647        validate_global_distinct_aggregate_without_group_keys(schema, group, having)?;
648        return Ok(());
649    }
650
651    for (index, aggregate) in group.aggregates.iter().enumerate() {
652        if aggregate.distinct() && !aggregate.kind().supports_grouped_distinct_v1() {
653            return Err(PlanError::from(
654                GroupPlanError::DistinctAggregateKindUnsupported {
655                    index,
656                    kind: format!("{:?}", aggregate.kind()),
657                },
658            ));
659        }
660
661        let Some(target_field) = aggregate.target_field.as_ref() else {
662            continue;
663        };
664        if aggregate.distinct() {
665            return Err(PlanError::from(
666                GroupPlanError::DistinctAggregateFieldTargetUnsupported {
667                    index,
668                    kind: format!("{:?}", aggregate.kind()),
669                    field: target_field.clone(),
670                },
671            ));
672        }
673        return Err(PlanError::from(
674            GroupPlanError::FieldTargetAggregatesUnsupported {
675                index,
676                kind: format!("{:?}", aggregate.kind()),
677                field: target_field.clone(),
678            },
679        ));
680    }
681
682    Ok(())
683}
684
685// Validate GROUP BY expression compatibility over canonical projection semantics.
686fn validate_group_projection_expr_compatibility(
687    group: &GroupSpec,
688    projection: &ProjectionSpec,
689) -> Result<(), PlanError> {
690    if group.group_fields.is_empty() {
691        return Ok(());
692    }
693
694    let grouped_fields = group
695        .group_fields
696        .iter()
697        .map(FieldSlot::field)
698        .collect::<HashSet<_>>();
699
700    for (index, field) in projection.fields().enumerate() {
701        match field {
702            ProjectionField::Scalar { expr, .. } => {
703                if !expr_references_only_fields(expr, &grouped_fields) {
704                    return Err(PlanError::from(
705                        ExprPlanError::GroupedProjectionReferencesNonGroupField { index },
706                    ));
707                }
708            }
709        }
710    }
711
712    Ok(())
713}
714
715#[cfg(test)]
716pub(in crate::db::query) fn validate_group_projection_expr_compatibility_for_test(
717    group: &GroupSpec,
718    projection: &ProjectionSpec,
719) -> Result<(), PlanError> {
720    validate_group_projection_expr_compatibility(group, projection)
721}
722
723// Validate the restricted global DISTINCT aggregate shape (`GROUP BY` omitted).
724fn validate_global_distinct_aggregate_without_group_keys(
725    schema: &SchemaInfo,
726    group: &GroupSpec,
727    having: Option<&GroupHavingSpec>,
728) -> Result<(), PlanError> {
729    let aggregate = match resolve_global_distinct_field_aggregate(
730        group.group_fields.as_slice(),
731        group.aggregates.as_slice(),
732        having,
733    ) {
734        Ok(Some(aggregate)) => aggregate,
735        Ok(None) => {
736            return Err(PlanError::from(
737                GroupPlanError::GlobalDistinctAggregateShapeUnsupported,
738            ));
739        }
740        Err(reason) => {
741            let aggregate = group.aggregates.first();
742            return Err(PlanError::from(
743                group_plan_error_from_distinct_policy_reason(reason, aggregate),
744            ));
745        }
746    };
747
748    let target_field = aggregate.target_field();
749    let Some(field_type) = schema.field(target_field) else {
750        return Err(PlanError::from(
751            GroupPlanError::UnknownAggregateTargetField {
752                index: 0,
753                field: target_field.to_string(),
754            },
755        ));
756    };
757    if aggregate.kind().is_sum() && !field_type.supports_numeric_coercion() {
758        return Err(PlanError::from(
759            GroupPlanError::GlobalDistinctSumTargetNotNumeric {
760                index: 0,
761                field: target_field.to_string(),
762            },
763        ));
764    }
765
766    Ok(())
767}
768
769// Map one grouped DISTINCT policy reason to planner-visible grouped plan errors.
770fn group_plan_error_from_distinct_policy_reason(
771    reason: GroupDistinctPolicyReason,
772    aggregate: Option<&GroupAggregateSpec>,
773) -> GroupPlanError {
774    match reason {
775        GroupDistinctPolicyReason::DistinctHavingUnsupported => {
776            GroupPlanError::DistinctHavingUnsupported
777        }
778        GroupDistinctPolicyReason::DistinctAdjacencyEligibilityRequired => {
779            GroupPlanError::DistinctAdjacencyEligibilityRequired
780        }
781        GroupDistinctPolicyReason::GlobalDistinctHavingUnsupported
782        | GroupDistinctPolicyReason::GlobalDistinctRequiresSingleAggregate
783        | GroupDistinctPolicyReason::GlobalDistinctRequiresFieldTargetAggregate
784        | GroupDistinctPolicyReason::GlobalDistinctRequiresDistinctAggregateTerminal => {
785            GroupPlanError::GlobalDistinctAggregateShapeUnsupported
786        }
787        GroupDistinctPolicyReason::GlobalDistinctUnsupportedAggregateKind => {
788            let kind = aggregate.map_or_else(
789                || "Unknown".to_string(),
790                |aggregate| format!("{:?}", aggregate.kind()),
791            );
792            GroupPlanError::DistinctAggregateKindUnsupported { index: 0, kind }
793        }
794    }
795}
796
797// Shared logical plan validation core owned by planner semantics.
798fn validate_plan_core<K, FOrder, FAccess>(
799    schema: &SchemaInfo,
800    model: &EntityModel,
801    logical: &ScalarPlan,
802    plan: &AccessPlannedQuery<K>,
803    validate_order_fn: FOrder,
804    validate_access_fn: FAccess,
805) -> Result<(), PlanError>
806where
807    FOrder: Fn(&SchemaInfo, &OrderSpec) -> Result<(), PlanError>,
808    FAccess: Fn(&SchemaInfo, &EntityModel, &AccessPlannedQuery<K>) -> Result<(), PlanError>,
809{
810    if let Some(predicate) = &logical.predicate {
811        validate(schema, predicate)?;
812    }
813
814    if let Some(order) = &logical.order {
815        validate_order_fn(schema, order)?;
816        validate_no_duplicate_non_pk_order_fields(model, order)?;
817        validate_primary_key_tie_break(model, order)?;
818    }
819
820    validate_access_fn(schema, model, plan)?;
821    validate_plan_shape(&plan.logical)?;
822
823    Ok(())
824}
825// ORDER validation ownership contract:
826// - This module owns ORDER semantic validation (field existence/orderability/tie-break).
827// - ORDER canonicalization (primary-key tie-break insertion) is performed at the
828//   intent boundary via `canonicalize_order_spec` before plan validation.
829// - Shape-policy checks (for example empty ORDER, pagination/order coupling) are owned here.
830// - Executor/runtime layers may defend execution preconditions only.
831
832/// Return true when an ORDER BY exists and contains at least one field.
833#[must_use]
834pub(crate) fn has_explicit_order(order: Option<&OrderSpec>) -> bool {
835    order.is_some_and(|order| !order.fields.is_empty())
836}
837
838/// Return true when an ORDER BY exists but is empty.
839#[must_use]
840pub(crate) fn has_empty_order(order: Option<&OrderSpec>) -> bool {
841    order.is_some_and(|order| order.fields.is_empty())
842}
843
844/// Validate order-shape rules shared across intent and logical plan boundaries.
845pub(crate) fn validate_order_shape(order: Option<&OrderSpec>) -> Result<(), PolicyPlanError> {
846    if has_empty_order(order) {
847        return Err(PolicyPlanError::EmptyOrderSpec);
848    }
849
850    Ok(())
851}
852
853/// Validate intent-level plan-shape rules derived from query mode + order.
854pub(crate) fn validate_intent_plan_shape(
855    mode: QueryMode,
856    order: Option<&OrderSpec>,
857) -> Result<(), PolicyPlanError> {
858    validate_order_shape(order)?;
859
860    let has_order = has_explicit_order(order);
861    if matches!(mode, QueryMode::Delete(spec) if spec.limit.is_some()) && !has_order {
862        return Err(PolicyPlanError::DeleteLimitRequiresOrder);
863    }
864
865    Ok(())
866}
867
868/// Validate cursor-pagination readiness for a load-spec + ordering pair.
869pub(crate) const fn validate_cursor_paging_requirements(
870    has_order: bool,
871    spec: LoadSpec,
872) -> Result<(), CursorPagingPolicyError> {
873    if !has_order {
874        return Err(CursorPagingPolicyError::CursorRequiresOrder);
875    }
876    if spec.limit.is_none() {
877        return Err(CursorPagingPolicyError::CursorRequiresLimit);
878    }
879
880    Ok(())
881}
882
883/// Validate cursor-order shape and return the logical order contract when present.
884pub(crate) const fn validate_cursor_order_plan_shape(
885    order: Option<&OrderSpec>,
886    require_explicit_order: bool,
887) -> Result<Option<&OrderSpec>, CursorOrderPlanShapeError> {
888    let Some(order) = order else {
889        if require_explicit_order {
890            return Err(CursorOrderPlanShapeError::MissingExplicitOrder);
891        }
892
893        return Ok(None);
894    };
895
896    if order.fields.is_empty() {
897        return Err(CursorOrderPlanShapeError::EmptyOrderSpec);
898    }
899
900    Ok(Some(order))
901}
902
903/// Resolve one grouped field into a stable field slot.
904pub(crate) fn resolve_group_field_slot(
905    model: &EntityModel,
906    field: &str,
907) -> Result<FieldSlot, PlanError> {
908    FieldSlot::resolve(model, field).ok_or_else(|| {
909        PlanError::from(GroupPlanError::UnknownGroupField {
910            field: field.to_string(),
911        })
912    })
913}
914
915/// Validate intent key-access policy before planning.
916pub(crate) const fn validate_intent_key_access_policy(
917    key_access_conflict: bool,
918    key_access_kind: Option<IntentKeyAccessKind>,
919    has_predicate: bool,
920) -> Result<(), IntentKeyAccessPolicyViolation> {
921    if key_access_conflict {
922        return Err(IntentKeyAccessPolicyViolation::KeyAccessConflict);
923    }
924
925    match key_access_kind {
926        Some(IntentKeyAccessKind::Many) if has_predicate => {
927            Err(IntentKeyAccessPolicyViolation::ByIdsWithPredicate)
928        }
929        Some(IntentKeyAccessKind::Only) if has_predicate => {
930            Err(IntentKeyAccessPolicyViolation::OnlyWithPredicate)
931        }
932        Some(
933            IntentKeyAccessKind::Single | IntentKeyAccessKind::Many | IntentKeyAccessKind::Only,
934        )
935        | None => Ok(()),
936    }
937}
938
939/// Validate fluent non-paged load entry policy.
940pub(crate) const fn validate_fluent_non_paged_mode(
941    has_cursor_token: bool,
942    has_grouping: bool,
943) -> Result<(), FluentLoadPolicyViolation> {
944    if has_cursor_token {
945        return Err(FluentLoadPolicyViolation::CursorRequiresPagedExecution);
946    }
947    if has_grouping {
948        return Err(FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped);
949    }
950
951    Ok(())
952}
953
954/// Validate fluent paged load entry policy.
955pub(crate) fn validate_fluent_paged_mode(
956    has_grouping: bool,
957    has_explicit_order: bool,
958    spec: Option<LoadSpec>,
959) -> Result<(), FluentLoadPolicyViolation> {
960    if has_grouping {
961        return Err(FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped);
962    }
963
964    let Some(spec) = spec else {
965        return Ok(());
966    };
967
968    validate_cursor_paging_requirements(has_explicit_order, spec).map_err(|err| match err {
969        CursorPagingPolicyError::CursorRequiresOrder => {
970            FluentLoadPolicyViolation::CursorRequiresOrder
971        }
972        CursorPagingPolicyError::CursorRequiresLimit => {
973            FluentLoadPolicyViolation::CursorRequiresLimit
974        }
975    })
976}
977
978/// Validate mode/order/pagination invariants for one logical plan.
979pub(crate) fn validate_plan_shape(plan: &LogicalPlan) -> Result<(), PolicyPlanError> {
980    let grouped = matches!(plan, LogicalPlan::Grouped(_));
981    let plan = match plan {
982        LogicalPlan::Scalar(plan) => plan,
983        LogicalPlan::Grouped(plan) => &plan.scalar,
984    };
985    validate_order_shape(plan.order.as_ref())?;
986
987    let has_order = has_explicit_order(plan.order.as_ref());
988    if plan.delete_limit.is_some() && !has_order {
989        return Err(PolicyPlanError::DeleteLimitRequiresOrder);
990    }
991
992    match plan.mode {
993        QueryMode::Delete(_) => {
994            if plan.page.is_some() {
995                return Err(PolicyPlanError::DeletePlanWithPagination);
996            }
997        }
998        QueryMode::Load(_) => {
999            if plan.delete_limit.is_some() {
1000                return Err(PolicyPlanError::LoadPlanWithDeleteLimit);
1001            }
1002            // GROUP BY v1 uses canonical grouped key ordering when ORDER BY is
1003            // omitted, so grouped pagination remains deterministic without an
1004            // explicit sort clause.
1005            if plan.page.is_some() && !has_order && !grouped {
1006                return Err(PolicyPlanError::UnorderedPagination);
1007            }
1008        }
1009    }
1010
1011    Ok(())
1012}
1013
1014/// Validate ORDER BY fields against the schema.
1015pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
1016    for (field, _) in &order.fields {
1017        let field_type = schema
1018            .field(field)
1019            .ok_or_else(|| OrderPlanError::UnknownField {
1020                field: field.clone(),
1021            })
1022            .map_err(PlanError::from)?;
1023
1024        if !field_type.is_orderable() {
1025            // CONTRACT: ORDER BY rejects non-queryable or unordered fields.
1026            return Err(PlanError::from(OrderPlanError::UnorderableField {
1027                field: field.clone(),
1028            }));
1029        }
1030    }
1031
1032    Ok(())
1033}
1034
1035/// Reject duplicate non-primary-key fields in ORDER BY.
1036pub(crate) fn validate_no_duplicate_non_pk_order_fields(
1037    model: &EntityModel,
1038    order: &OrderSpec,
1039) -> Result<(), PlanError> {
1040    let mut seen = BTreeSet::new();
1041    let pk_field = model.primary_key.name;
1042
1043    for (field, _) in &order.fields {
1044        if field == pk_field {
1045            continue;
1046        }
1047        if !seen.insert(field.as_str()) {
1048            return Err(PlanError::from(OrderPlanError::DuplicateOrderField {
1049                field: field.clone(),
1050            }));
1051        }
1052    }
1053
1054    Ok(())
1055}
1056
1057// Ordered plans must include exactly one terminal primary-key field so ordering is total and
1058// deterministic across explain, fingerprint, and executor comparison paths.
1059pub(crate) fn validate_primary_key_tie_break(
1060    model: &EntityModel,
1061    order: &OrderSpec,
1062) -> Result<(), PlanError> {
1063    if order.fields.is_empty() {
1064        return Ok(());
1065    }
1066
1067    let pk_field = model.primary_key.name;
1068    let pk_count = order
1069        .fields
1070        .iter()
1071        .filter(|(field, _)| field == pk_field)
1072        .count();
1073    let trailing_pk = order
1074        .fields
1075        .last()
1076        .is_some_and(|(field, _)| field == pk_field);
1077
1078    if pk_count == 1 && trailing_pk {
1079        Ok(())
1080    } else {
1081        Err(PlanError::from(OrderPlanError::MissingPrimaryKeyTieBreak {
1082            field: pk_field.to_string(),
1083        }))
1084    }
1085}