Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Query-plan validation for planner-owned logical semantics.
2//!
3//! Validation ownership contract:
4//! - `validate_query_semantics` owns user-facing query semantics and emits `PlanError`.
5//! - executor-boundary defensive checks live in `db::executor::plan_validate`.
6//!
7//! Future rule changes must declare a semantic owner. Defensive re-check layers may mirror
8//! rules, but must not reinterpret semantics or error class intent.
9
10use crate::{
11    db::{
12        access::{
13            AccessPlanError,
14            validate_access_structure_model as validate_access_structure_model_shared,
15        },
16        cursor::CursorPlanError,
17        predicate::{CompareOp, SchemaInfo, ValidateError, validate},
18        query::plan::{
19            AccessPlannedQuery, FieldSlot, GroupAggregateSpec, GroupDistinctAdmissibility,
20            GroupDistinctPolicyReason, GroupHavingSpec, GroupHavingSymbol, GroupSpec, LoadSpec,
21            LogicalPlan, OrderSpec, QueryMode, ScalarPlan,
22            global_distinct_field_aggregate_admissibility, grouped_distinct_admissibility,
23            is_global_distinct_field_aggregate_candidate,
24        },
25    },
26    model::entity::EntityModel,
27    value::Value,
28};
29use std::collections::BTreeSet;
30use thiserror::Error as ThisError;
31
32///
33/// PlanError
34///
35/// Executor-visible validation failures for logical plans.
36///
37/// These errors indicate that a plan cannot be safely executed against the
38/// current schema or entity definition. They are *not* planner bugs.
39///
40
41#[derive(Debug, ThisError)]
42pub enum PlanError {
43    #[error("predicate validation failed: {0}")]
44    PredicateInvalid(Box<ValidateError>),
45
46    #[error("{0}")]
47    Order(Box<OrderPlanError>),
48
49    #[error("{0}")]
50    Access(Box<AccessPlanError>),
51
52    #[error("{0}")]
53    Policy(Box<PolicyPlanError>),
54
55    #[error("{0}")]
56    Cursor(Box<CursorPlanError>),
57
58    #[error("{0}")]
59    Group(Box<GroupPlanError>),
60}
61
62///
63/// OrderPlanError
64///
65/// ORDER BY-specific validation failures.
66///
67
68#[derive(Debug, ThisError)]
69pub enum OrderPlanError {
70    /// ORDER BY references an unknown field.
71    #[error("unknown order field '{field}'")]
72    UnknownField { field: String },
73
74    /// ORDER BY references a field that cannot be ordered.
75    #[error("order field '{field}' is not orderable")]
76    UnorderableField { field: String },
77
78    /// ORDER BY references the same non-primary-key field multiple times.
79    #[error("order field '{field}' appears multiple times")]
80    DuplicateOrderField { field: String },
81
82    /// Ordered plans must terminate with the primary-key tie-break.
83    #[error("order specification must end with primary key '{field}' as deterministic tie-break")]
84    MissingPrimaryKeyTieBreak { field: String },
85}
86
87///
88/// PolicyPlanError
89///
90/// Plan-shape policy failures.
91///
92
93#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
94pub enum PolicyPlanError {
95    /// ORDER BY must specify at least one field.
96    #[error("order specification must include at least one field")]
97    EmptyOrderSpec,
98
99    /// Delete plans must not carry pagination.
100    #[error("delete plans must not include pagination")]
101    DeletePlanWithPagination,
102
103    /// Load plans must not carry delete limits.
104    #[error("load plans must not include delete limits")]
105    LoadPlanWithDeleteLimit,
106
107    /// Delete limits require an explicit ordering.
108    #[error("delete limit requires an explicit ordering")]
109    DeleteLimitRequiresOrder,
110
111    /// Pagination requires an explicit ordering.
112    #[error(
113        "Unordered pagination is not allowed.\nThis query uses LIMIT or OFFSET without an ORDER BY clause.\nPagination without a total ordering is non-deterministic.\nAdd an explicit order_by(...) to make the query stable."
114    )]
115    UnorderedPagination,
116}
117
118///
119/// CursorPagingPolicyError
120///
121/// Cursor pagination readiness errors shared by intent/fluent entry surfaces.
122///
123
124#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
125pub enum CursorPagingPolicyError {
126    #[error(
127        "{message}",
128        message = CursorPlanError::cursor_requires_order_message()
129    )]
130    CursorRequiresOrder,
131
132    #[error(
133        "{message}",
134        message = CursorPlanError::cursor_requires_limit_message()
135    )]
136    CursorRequiresLimit,
137}
138
139///
140/// GroupPlanError
141///
142/// GROUP BY wrapper validation failures owned by query planning.
143///
144
145#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
146pub enum GroupPlanError {
147    /// HAVING requires GROUP BY grouped plan shape.
148    #[error("HAVING is only supported for GROUP BY queries in this release")]
149    HavingRequiresGroupBy,
150
151    /// Grouped validation entrypoint received a scalar logical plan.
152    #[error("group query validation requires grouped logical plan variant")]
153    GroupedLogicalPlanRequired,
154
155    /// GROUP BY requires at least one declared grouping field.
156    #[error("group specification must include at least one group field")]
157    EmptyGroupFields,
158
159    /// Global DISTINCT aggregate shapes without GROUP BY are restricted.
160    #[error(
161        "global DISTINCT aggregate without GROUP BY must declare exactly one DISTINCT field-target aggregate in this release"
162    )]
163    GlobalDistinctAggregateShapeUnsupported,
164
165    /// GROUP BY requires at least one aggregate terminal.
166    #[error("group specification must include at least one aggregate terminal")]
167    EmptyAggregates,
168
169    /// GROUP BY references an unknown group field.
170    #[error("unknown group field '{field}'")]
171    UnknownGroupField { field: String },
172
173    /// GROUP BY must not repeat the same resolved group slot.
174    #[error("group specification has duplicate group key: '{field}'")]
175    DuplicateGroupField { field: String },
176
177    /// GROUP BY v1 does not accept DISTINCT unless adjacency eligibility is explicit.
178    #[error(
179        "grouped DISTINCT requires adjacency-based ordered-group eligibility proof in this release"
180    )]
181    DistinctAdjacencyEligibilityRequired,
182
183    /// GROUP BY ORDER BY shape must start with grouped-key prefix.
184    #[error("grouped ORDER BY must start with GROUP BY key prefix in this release")]
185    OrderPrefixNotAlignedWithGroupKeys,
186
187    /// GROUP BY ORDER BY requires an explicit LIMIT in grouped v1.
188    #[error("grouped ORDER BY requires LIMIT in this release")]
189    OrderRequiresLimit,
190
191    /// HAVING with DISTINCT is deferred until grouped DISTINCT support expands.
192    #[error("grouped HAVING with DISTINCT is not supported in this release")]
193    DistinctHavingUnsupported,
194
195    /// HAVING currently supports compare operators only.
196    #[error("grouped HAVING clause at index={index} uses unsupported operator: {op}")]
197    HavingUnsupportedCompareOp { index: usize, op: String },
198
199    /// HAVING group-field symbols must reference declared grouped keys.
200    #[error("grouped HAVING clause at index={index} references non-group field '{field}'")]
201    HavingNonGroupFieldReference { index: usize, field: String },
202
203    /// HAVING aggregate references must resolve to declared grouped terminals.
204    #[error(
205        "grouped HAVING clause at index={index} references aggregate index {aggregate_index} but aggregate_count={aggregate_count}"
206    )]
207    HavingAggregateIndexOutOfBounds {
208        index: usize,
209        aggregate_index: usize,
210        aggregate_count: usize,
211    },
212
213    /// DISTINCT grouped terminal kinds are intentionally conservative in v1.
214    #[error(
215        "grouped DISTINCT aggregate at index={index} uses unsupported kind '{kind}' in this release"
216    )]
217    DistinctAggregateKindUnsupported { index: usize, kind: String },
218
219    /// DISTINCT over grouped field-target terminals is deferred with field-target support.
220    #[error(
221        "grouped DISTINCT aggregate at index={index} cannot target field '{field}' in this release: found {kind}"
222    )]
223    DistinctAggregateFieldTargetUnsupported {
224        index: usize,
225        kind: String,
226        field: String,
227    },
228
229    /// Aggregate target fields must resolve in the model schema.
230    #[error("unknown grouped aggregate target field at index={index}: '{field}'")]
231    UnknownAggregateTargetField { index: usize, field: String },
232
233    /// Global DISTINCT SUM requires a numeric field target.
234    #[error(
235        "global DISTINCT SUM aggregate target field at index={index} is not numeric: '{field}'"
236    )]
237    GlobalDistinctSumTargetNotNumeric { index: usize, field: String },
238
239    /// Field-target grouped terminals are not enabled in grouped execution v1.
240    #[error(
241        "grouped aggregate at index={index} cannot target field '{field}' in this release: found {kind}"
242    )]
243    FieldTargetAggregatesUnsupported {
244        index: usize,
245        kind: String,
246        field: String,
247    },
248}
249
250///
251/// CursorOrderPlanShapeError
252///
253/// Logical cursor-order plan-shape failures used by cursor/runtime boundary adapters.
254///
255
256#[derive(Clone, Copy, Debug, Eq, PartialEq)]
257pub(crate) enum CursorOrderPlanShapeError {
258    MissingExplicitOrder,
259    EmptyOrderSpec,
260}
261
262///
263/// IntentKeyAccessKind
264///
265/// Key-access shape used by intent policy validation.
266///
267
268#[derive(Clone, Copy, Debug, Eq, PartialEq)]
269pub(crate) enum IntentKeyAccessKind {
270    Single,
271    Many,
272    Only,
273}
274
275///
276/// IntentKeyAccessPolicyViolation
277///
278/// Logical key-access policy violations at query-intent boundaries.
279///
280#[derive(Clone, Copy, Debug, Eq, PartialEq)]
281pub(crate) enum IntentKeyAccessPolicyViolation {
282    KeyAccessConflict,
283    ByIdsWithPredicate,
284    OnlyWithPredicate,
285}
286
287///
288/// FluentLoadPolicyViolation
289///
290/// Fluent load-entry policy violations.
291///
292
293#[derive(Clone, Copy, Debug, Eq, PartialEq)]
294pub(crate) enum FluentLoadPolicyViolation {
295    CursorRequiresPagedExecution,
296    GroupedRequiresExecuteGrouped,
297    CursorRequiresOrder,
298    CursorRequiresLimit,
299}
300
301impl From<ValidateError> for PlanError {
302    fn from(err: ValidateError) -> Self {
303        Self::PredicateInvalid(Box::new(err))
304    }
305}
306
307impl From<OrderPlanError> for PlanError {
308    fn from(err: OrderPlanError) -> Self {
309        Self::Order(Box::new(err))
310    }
311}
312
313impl From<AccessPlanError> for PlanError {
314    fn from(err: AccessPlanError) -> Self {
315        Self::Access(Box::new(err))
316    }
317}
318
319impl From<PolicyPlanError> for PlanError {
320    fn from(err: PolicyPlanError) -> Self {
321        Self::Policy(Box::new(err))
322    }
323}
324
325impl From<CursorPlanError> for PlanError {
326    fn from(err: CursorPlanError) -> Self {
327        Self::Cursor(Box::new(err))
328    }
329}
330
331impl From<GroupPlanError> for PlanError {
332    fn from(err: GroupPlanError) -> Self {
333        Self::Group(Box::new(err))
334    }
335}
336
337/// Validate a logical plan with model-level key values.
338///
339/// Ownership:
340/// - semantic owner for user-facing query validity at planning boundaries
341/// - failures here are user-visible planning failures (`PlanError`)
342///
343/// New user-facing validation rules must be introduced here first, then mirrored
344/// defensively in downstream layers without changing semantics.
345pub(crate) fn validate_query_semantics(
346    schema: &SchemaInfo,
347    model: &EntityModel,
348    plan: &AccessPlannedQuery<Value>,
349) -> Result<(), PlanError> {
350    let logical = plan.scalar_plan();
351
352    validate_plan_core(
353        schema,
354        model,
355        logical,
356        plan,
357        validate_order,
358        |schema, model, plan| {
359            validate_access_structure_model_shared(schema, model, &plan.access)
360                .map_err(PlanError::from)
361        },
362    )?;
363
364    Ok(())
365}
366
367/// Validate grouped query semantics for one grouped plan wrapper.
368///
369/// Ownership:
370/// - semantic owner for GROUP BY wrapper validation
371/// - failures here are user-visible planning failures (`PlanError`)
372pub(crate) fn validate_group_query_semantics(
373    schema: &SchemaInfo,
374    model: &EntityModel,
375    plan: &AccessPlannedQuery<Value>,
376) -> Result<(), PlanError> {
377    let (logical, group, having) = match &plan.logical {
378        LogicalPlan::Grouped(grouped) => (&grouped.scalar, &grouped.group, grouped.having.as_ref()),
379        LogicalPlan::Scalar(_) => {
380            return Err(PlanError::from(GroupPlanError::GroupedLogicalPlanRequired));
381        }
382    };
383
384    validate_plan_core(
385        schema,
386        model,
387        logical,
388        plan,
389        validate_order,
390        |schema, model, plan| {
391            validate_access_structure_model_shared(schema, model, &plan.access)
392                .map_err(PlanError::from)
393        },
394    )?;
395    validate_group_structure(schema, model, group, having)?;
396    validate_group_policy(schema, logical, group, having)?;
397    validate_group_cursor_constraints(logical, group)?;
398
399    Ok(())
400}
401
402// Validate grouped structural invariants before policy/cursor gates.
403fn validate_group_structure(
404    schema: &SchemaInfo,
405    model: &EntityModel,
406    group: &GroupSpec,
407    having: Option<&GroupHavingSpec>,
408) -> Result<(), PlanError> {
409    if group.group_fields.is_empty() && having.is_some() {
410        return Err(PlanError::from(
411            GroupPlanError::GlobalDistinctAggregateShapeUnsupported,
412        ));
413    }
414
415    validate_group_spec_structure(schema, model, group)?;
416    validate_grouped_having_structure(group, having)?;
417
418    Ok(())
419}
420
421// Validate grouped policy gates independent from structural shape checks.
422fn validate_group_policy(
423    schema: &SchemaInfo,
424    logical: &ScalarPlan,
425    group: &GroupSpec,
426    having: Option<&GroupHavingSpec>,
427) -> Result<(), PlanError> {
428    validate_grouped_distinct_policy(logical, having.is_some())?;
429    validate_grouped_having_policy(having)?;
430    validate_group_spec_policy(schema, group, having)?;
431
432    Ok(())
433}
434
435// Validate grouped cursor-order constraints in one dedicated gate.
436fn validate_group_cursor_constraints(
437    logical: &ScalarPlan,
438    group: &GroupSpec,
439) -> Result<(), PlanError> {
440    // Grouped pagination/order constraints are cursor-domain policy:
441    // grouped ORDER BY requires LIMIT and must align with grouped-key prefix.
442    let Some(order) = logical.order.as_ref() else {
443        return Ok(());
444    };
445    if logical.page.as_ref().and_then(|page| page.limit).is_none() {
446        return Err(PlanError::from(GroupPlanError::OrderRequiresLimit));
447    }
448    if order_prefix_aligned_with_group_fields(order, group.group_fields.as_slice()) {
449        return Ok(());
450    }
451
452    Err(PlanError::from(
453        GroupPlanError::OrderPrefixNotAlignedWithGroupKeys,
454    ))
455}
456
457// Validate grouped DISTINCT policy gates for grouped v1 hardening.
458fn validate_grouped_distinct_policy(
459    logical: &ScalarPlan,
460    has_having: bool,
461) -> Result<(), PlanError> {
462    match grouped_distinct_admissibility(logical.distinct, has_having) {
463        GroupDistinctAdmissibility::Allowed => Ok(()),
464        GroupDistinctAdmissibility::Disallowed(reason) => Err(PlanError::from(
465            group_plan_error_from_distinct_policy_reason(reason, None),
466        )),
467    }
468}
469
470// Validate grouped HAVING structural symbol/reference compatibility.
471fn validate_grouped_having_structure(
472    group: &GroupSpec,
473    having: Option<&GroupHavingSpec>,
474) -> Result<(), PlanError> {
475    let Some(having) = having else {
476        return Ok(());
477    };
478
479    for (index, clause) in having.clauses().iter().enumerate() {
480        match clause.symbol() {
481            GroupHavingSymbol::GroupField(field_slot) => {
482                if !group
483                    .group_fields
484                    .iter()
485                    .any(|group_field| group_field.index() == field_slot.index())
486                {
487                    return Err(PlanError::from(
488                        GroupPlanError::HavingNonGroupFieldReference {
489                            index,
490                            field: field_slot.field().to_string(),
491                        },
492                    ));
493                }
494            }
495            GroupHavingSymbol::AggregateIndex(aggregate_index) => {
496                if *aggregate_index >= group.aggregates.len() {
497                    return Err(PlanError::from(
498                        GroupPlanError::HavingAggregateIndexOutOfBounds {
499                            index,
500                            aggregate_index: *aggregate_index,
501                            aggregate_count: group.aggregates.len(),
502                        },
503                    ));
504                }
505            }
506        }
507    }
508
509    Ok(())
510}
511
512// Validate grouped HAVING policy gates and operator support.
513fn validate_grouped_having_policy(having: Option<&GroupHavingSpec>) -> Result<(), PlanError> {
514    let Some(having) = having else {
515        return Ok(());
516    };
517
518    for (index, clause) in having.clauses().iter().enumerate() {
519        if !having_compare_op_supported(clause.op()) {
520            return Err(PlanError::from(
521                GroupPlanError::HavingUnsupportedCompareOp {
522                    index,
523                    op: format!("{:?}", clause.op()),
524                },
525            ));
526        }
527    }
528
529    Ok(())
530}
531
532const fn having_compare_op_supported(op: CompareOp) -> bool {
533    matches!(
534        op,
535        CompareOp::Eq
536            | CompareOp::Ne
537            | CompareOp::Lt
538            | CompareOp::Lte
539            | CompareOp::Gt
540            | CompareOp::Gte
541    )
542}
543
544// Return true when ORDER BY starts with GROUP BY key fields in declaration order.
545fn order_prefix_aligned_with_group_fields(order: &OrderSpec, group_fields: &[FieldSlot]) -> bool {
546    if order.fields.len() < group_fields.len() {
547        return false;
548    }
549
550    group_fields
551        .iter()
552        .zip(order.fields.iter())
553        .all(|(group_field, (order_field, _))| order_field == group_field.field())
554}
555
556// Validate grouped structural declarations against model/schema shape.
557fn validate_group_spec_structure(
558    schema: &SchemaInfo,
559    model: &EntityModel,
560    group: &GroupSpec,
561) -> Result<(), PlanError> {
562    if group.group_fields.is_empty() {
563        if group.aggregates.iter().any(GroupAggregateSpec::distinct) {
564            return Ok(());
565        }
566
567        return Err(PlanError::from(GroupPlanError::EmptyGroupFields));
568    }
569    if group.aggregates.is_empty() {
570        return Err(PlanError::from(GroupPlanError::EmptyAggregates));
571    }
572
573    let mut seen_group_slots = BTreeSet::<usize>::new();
574    for field_slot in &group.group_fields {
575        if model.fields.get(field_slot.index()).is_none() {
576            return Err(PlanError::from(GroupPlanError::UnknownGroupField {
577                field: field_slot.field().to_string(),
578            }));
579        }
580        if !seen_group_slots.insert(field_slot.index()) {
581            return Err(PlanError::from(GroupPlanError::DuplicateGroupField {
582                field: field_slot.field().to_string(),
583            }));
584        }
585    }
586
587    for (index, aggregate) in group.aggregates.iter().enumerate() {
588        let Some(target_field) = aggregate.target_field.as_ref() else {
589            continue;
590        };
591        if schema.field(target_field).is_none() {
592            return Err(PlanError::from(
593                GroupPlanError::UnknownAggregateTargetField {
594                    index,
595                    field: target_field.clone(),
596                },
597            ));
598        }
599    }
600
601    Ok(())
602}
603
604// Validate grouped execution policy over a structurally valid grouped spec.
605fn validate_group_spec_policy(
606    schema: &SchemaInfo,
607    group: &GroupSpec,
608    having: Option<&GroupHavingSpec>,
609) -> Result<(), PlanError> {
610    if group.group_fields.is_empty() {
611        validate_global_distinct_aggregate_without_group_keys(schema, group, having)?;
612        return Ok(());
613    }
614
615    for (index, aggregate) in group.aggregates.iter().enumerate() {
616        if aggregate.distinct() && !aggregate.kind().supports_grouped_distinct_v1() {
617            return Err(PlanError::from(
618                GroupPlanError::DistinctAggregateKindUnsupported {
619                    index,
620                    kind: format!("{:?}", aggregate.kind()),
621                },
622            ));
623        }
624
625        let Some(target_field) = aggregate.target_field.as_ref() else {
626            continue;
627        };
628        if aggregate.distinct() {
629            return Err(PlanError::from(
630                GroupPlanError::DistinctAggregateFieldTargetUnsupported {
631                    index,
632                    kind: format!("{:?}", aggregate.kind()),
633                    field: target_field.clone(),
634                },
635            ));
636        }
637        return Err(PlanError::from(
638            GroupPlanError::FieldTargetAggregatesUnsupported {
639                index,
640                kind: format!("{:?}", aggregate.kind()),
641                field: target_field.clone(),
642            },
643        ));
644    }
645
646    Ok(())
647}
648
649// Validate the restricted global DISTINCT aggregate shape (`GROUP BY` omitted).
650fn validate_global_distinct_aggregate_without_group_keys(
651    schema: &SchemaInfo,
652    group: &GroupSpec,
653    having: Option<&GroupHavingSpec>,
654) -> Result<(), PlanError> {
655    if !is_global_distinct_field_aggregate_candidate(
656        group.group_fields.as_slice(),
657        group.aggregates.as_slice(),
658    ) {
659        return Err(PlanError::from(
660            GroupPlanError::GlobalDistinctAggregateShapeUnsupported,
661        ));
662    }
663    let aggregate = &group.aggregates[0];
664    match global_distinct_field_aggregate_admissibility(group.aggregates.as_slice(), having) {
665        GroupDistinctAdmissibility::Allowed => {}
666        GroupDistinctAdmissibility::Disallowed(reason) => {
667            return Err(PlanError::from(
668                group_plan_error_from_distinct_policy_reason(reason, Some(aggregate)),
669            ));
670        }
671    }
672
673    let Some(target_field) = aggregate.target_field() else {
674        return Err(PlanError::from(
675            GroupPlanError::GlobalDistinctAggregateShapeUnsupported,
676        ));
677    };
678    let Some(field_type) = schema.field(target_field) else {
679        return Err(PlanError::from(
680            GroupPlanError::UnknownAggregateTargetField {
681                index: 0,
682                field: target_field.to_string(),
683            },
684        ));
685    };
686    if aggregate.kind().is_sum() && !field_type.supports_numeric_coercion() {
687        return Err(PlanError::from(
688            GroupPlanError::GlobalDistinctSumTargetNotNumeric {
689                index: 0,
690                field: target_field.to_string(),
691            },
692        ));
693    }
694
695    Ok(())
696}
697
698// Map one grouped DISTINCT policy reason to planner-visible grouped plan errors.
699fn group_plan_error_from_distinct_policy_reason(
700    reason: GroupDistinctPolicyReason,
701    aggregate: Option<&GroupAggregateSpec>,
702) -> GroupPlanError {
703    match reason {
704        GroupDistinctPolicyReason::DistinctHavingUnsupported => {
705            GroupPlanError::DistinctHavingUnsupported
706        }
707        GroupDistinctPolicyReason::DistinctAdjacencyEligibilityRequired => {
708            GroupPlanError::DistinctAdjacencyEligibilityRequired
709        }
710        GroupDistinctPolicyReason::GlobalDistinctHavingUnsupported
711        | GroupDistinctPolicyReason::GlobalDistinctRequiresSingleAggregate
712        | GroupDistinctPolicyReason::GlobalDistinctRequiresFieldTargetAggregate
713        | GroupDistinctPolicyReason::GlobalDistinctRequiresDistinctAggregateTerminal => {
714            GroupPlanError::GlobalDistinctAggregateShapeUnsupported
715        }
716        GroupDistinctPolicyReason::GlobalDistinctUnsupportedAggregateKind => {
717            let kind = aggregate.map_or_else(
718                || "Unknown".to_string(),
719                |aggregate| format!("{:?}", aggregate.kind()),
720            );
721            GroupPlanError::DistinctAggregateKindUnsupported { index: 0, kind }
722        }
723    }
724}
725
726// Shared logical plan validation core owned by planner semantics.
727fn validate_plan_core<K, FOrder, FAccess>(
728    schema: &SchemaInfo,
729    model: &EntityModel,
730    logical: &ScalarPlan,
731    plan: &AccessPlannedQuery<K>,
732    validate_order_fn: FOrder,
733    validate_access_fn: FAccess,
734) -> Result<(), PlanError>
735where
736    FOrder: Fn(&SchemaInfo, &OrderSpec) -> Result<(), PlanError>,
737    FAccess: Fn(&SchemaInfo, &EntityModel, &AccessPlannedQuery<K>) -> Result<(), PlanError>,
738{
739    if let Some(predicate) = &logical.predicate {
740        validate(schema, predicate)?;
741    }
742
743    if let Some(order) = &logical.order {
744        validate_order_fn(schema, order)?;
745        validate_no_duplicate_non_pk_order_fields(model, order)?;
746        validate_primary_key_tie_break(model, order)?;
747    }
748
749    validate_access_fn(schema, model, plan)?;
750    validate_plan_shape(&plan.logical)?;
751
752    Ok(())
753}
754// ORDER validation ownership contract:
755// - This module owns ORDER semantic validation (field existence/orderability/tie-break).
756// - ORDER canonicalization (primary-key tie-break insertion) is performed at the
757//   intent boundary via `canonicalize_order_spec` before plan validation.
758// - Shape-policy checks (for example empty ORDER, pagination/order coupling) are owned here.
759// - Executor/runtime layers may defend execution preconditions only.
760
761/// Return true when an ORDER BY exists and contains at least one field.
762#[must_use]
763pub(crate) fn has_explicit_order(order: Option<&OrderSpec>) -> bool {
764    order.is_some_and(|order| !order.fields.is_empty())
765}
766
767/// Return true when an ORDER BY exists but is empty.
768#[must_use]
769pub(crate) fn has_empty_order(order: Option<&OrderSpec>) -> bool {
770    order.is_some_and(|order| order.fields.is_empty())
771}
772
773/// Validate order-shape rules shared across intent and logical plan boundaries.
774pub(crate) fn validate_order_shape(order: Option<&OrderSpec>) -> Result<(), PolicyPlanError> {
775    if has_empty_order(order) {
776        return Err(PolicyPlanError::EmptyOrderSpec);
777    }
778
779    Ok(())
780}
781
782/// Validate intent-level plan-shape rules derived from query mode + order.
783pub(crate) fn validate_intent_plan_shape(
784    mode: QueryMode,
785    order: Option<&OrderSpec>,
786) -> Result<(), PolicyPlanError> {
787    validate_order_shape(order)?;
788
789    let has_order = has_explicit_order(order);
790    if matches!(mode, QueryMode::Delete(spec) if spec.limit.is_some()) && !has_order {
791        return Err(PolicyPlanError::DeleteLimitRequiresOrder);
792    }
793
794    Ok(())
795}
796
797/// Validate cursor-pagination readiness for a load-spec + ordering pair.
798pub(crate) const fn validate_cursor_paging_requirements(
799    has_order: bool,
800    spec: LoadSpec,
801) -> Result<(), CursorPagingPolicyError> {
802    if !has_order {
803        return Err(CursorPagingPolicyError::CursorRequiresOrder);
804    }
805    if spec.limit.is_none() {
806        return Err(CursorPagingPolicyError::CursorRequiresLimit);
807    }
808
809    Ok(())
810}
811
812/// Validate cursor-order shape and return the logical order contract when present.
813pub(crate) const fn validate_cursor_order_plan_shape(
814    order: Option<&OrderSpec>,
815    require_explicit_order: bool,
816) -> Result<Option<&OrderSpec>, CursorOrderPlanShapeError> {
817    let Some(order) = order else {
818        if require_explicit_order {
819            return Err(CursorOrderPlanShapeError::MissingExplicitOrder);
820        }
821
822        return Ok(None);
823    };
824
825    if order.fields.is_empty() {
826        return Err(CursorOrderPlanShapeError::EmptyOrderSpec);
827    }
828
829    Ok(Some(order))
830}
831
832/// Resolve one grouped field into a stable field slot.
833pub(crate) fn resolve_group_field_slot(
834    model: &EntityModel,
835    field: &str,
836) -> Result<FieldSlot, PlanError> {
837    FieldSlot::resolve(model, field).ok_or_else(|| {
838        PlanError::from(GroupPlanError::UnknownGroupField {
839            field: field.to_string(),
840        })
841    })
842}
843
844/// Validate intent key-access policy before planning.
845pub(crate) const fn validate_intent_key_access_policy(
846    key_access_conflict: bool,
847    key_access_kind: Option<IntentKeyAccessKind>,
848    has_predicate: bool,
849) -> Result<(), IntentKeyAccessPolicyViolation> {
850    if key_access_conflict {
851        return Err(IntentKeyAccessPolicyViolation::KeyAccessConflict);
852    }
853
854    match key_access_kind {
855        Some(IntentKeyAccessKind::Many) if has_predicate => {
856            Err(IntentKeyAccessPolicyViolation::ByIdsWithPredicate)
857        }
858        Some(IntentKeyAccessKind::Only) if has_predicate => {
859            Err(IntentKeyAccessPolicyViolation::OnlyWithPredicate)
860        }
861        Some(
862            IntentKeyAccessKind::Single | IntentKeyAccessKind::Many | IntentKeyAccessKind::Only,
863        )
864        | None => Ok(()),
865    }
866}
867
868/// Validate fluent non-paged load entry policy.
869pub(crate) const fn validate_fluent_non_paged_mode(
870    has_cursor_token: bool,
871    has_grouping: bool,
872) -> Result<(), FluentLoadPolicyViolation> {
873    if has_cursor_token {
874        return Err(FluentLoadPolicyViolation::CursorRequiresPagedExecution);
875    }
876    if has_grouping {
877        return Err(FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped);
878    }
879
880    Ok(())
881}
882
883/// Validate fluent paged load entry policy.
884pub(crate) fn validate_fluent_paged_mode(
885    has_grouping: bool,
886    has_explicit_order: bool,
887    spec: Option<LoadSpec>,
888) -> Result<(), FluentLoadPolicyViolation> {
889    if has_grouping {
890        return Err(FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped);
891    }
892
893    let Some(spec) = spec else {
894        return Ok(());
895    };
896
897    validate_cursor_paging_requirements(has_explicit_order, spec).map_err(|err| match err {
898        CursorPagingPolicyError::CursorRequiresOrder => {
899            FluentLoadPolicyViolation::CursorRequiresOrder
900        }
901        CursorPagingPolicyError::CursorRequiresLimit => {
902            FluentLoadPolicyViolation::CursorRequiresLimit
903        }
904    })
905}
906
907/// Validate mode/order/pagination invariants for one logical plan.
908pub(crate) fn validate_plan_shape(plan: &LogicalPlan) -> Result<(), PolicyPlanError> {
909    let grouped = matches!(plan, LogicalPlan::Grouped(_));
910    let plan = match plan {
911        LogicalPlan::Scalar(plan) => plan,
912        LogicalPlan::Grouped(plan) => &plan.scalar,
913    };
914    validate_order_shape(plan.order.as_ref())?;
915
916    let has_order = has_explicit_order(plan.order.as_ref());
917    if plan.delete_limit.is_some() && !has_order {
918        return Err(PolicyPlanError::DeleteLimitRequiresOrder);
919    }
920
921    match plan.mode {
922        QueryMode::Delete(_) => {
923            if plan.page.is_some() {
924                return Err(PolicyPlanError::DeletePlanWithPagination);
925            }
926        }
927        QueryMode::Load(_) => {
928            if plan.delete_limit.is_some() {
929                return Err(PolicyPlanError::LoadPlanWithDeleteLimit);
930            }
931            // GROUP BY v1 uses canonical grouped key ordering when ORDER BY is
932            // omitted, so grouped pagination remains deterministic without an
933            // explicit sort clause.
934            if plan.page.is_some() && !has_order && !grouped {
935                return Err(PolicyPlanError::UnorderedPagination);
936            }
937        }
938    }
939
940    Ok(())
941}
942
943/// Validate ORDER BY fields against the schema.
944pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
945    for (field, _) in &order.fields {
946        let field_type = schema
947            .field(field)
948            .ok_or_else(|| OrderPlanError::UnknownField {
949                field: field.clone(),
950            })
951            .map_err(PlanError::from)?;
952
953        if !field_type.is_orderable() {
954            // CONTRACT: ORDER BY rejects non-queryable or unordered fields.
955            return Err(PlanError::from(OrderPlanError::UnorderableField {
956                field: field.clone(),
957            }));
958        }
959    }
960
961    Ok(())
962}
963
964/// Reject duplicate non-primary-key fields in ORDER BY.
965pub(crate) fn validate_no_duplicate_non_pk_order_fields(
966    model: &EntityModel,
967    order: &OrderSpec,
968) -> Result<(), PlanError> {
969    let mut seen = BTreeSet::new();
970    let pk_field = model.primary_key.name;
971
972    for (field, _) in &order.fields {
973        if field == pk_field {
974            continue;
975        }
976        if !seen.insert(field.as_str()) {
977            return Err(PlanError::from(OrderPlanError::DuplicateOrderField {
978                field: field.clone(),
979            }));
980        }
981    }
982
983    Ok(())
984}
985
986// Ordered plans must include exactly one terminal primary-key field so ordering is total and
987// deterministic across explain, fingerprint, and executor comparison paths.
988pub(crate) fn validate_primary_key_tie_break(
989    model: &EntityModel,
990    order: &OrderSpec,
991) -> Result<(), PlanError> {
992    if order.fields.is_empty() {
993        return Ok(());
994    }
995
996    let pk_field = model.primary_key.name;
997    let pk_count = order
998        .fields
999        .iter()
1000        .filter(|(field, _)| field == pk_field)
1001        .count();
1002    let trailing_pk = order
1003        .fields
1004        .last()
1005        .is_some_and(|(field, _)| field == pk_field);
1006
1007    if pk_count == 1 && trailing_pk {
1008        Ok(())
1009    } else {
1010        Err(PlanError::from(OrderPlanError::MissingPrimaryKeyTieBreak {
1011            field: pk_field.to_string(),
1012        }))
1013    }
1014}