Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Query-plan validation for planner-owned logical semantics.
2//!
3//! Validation ownership contract:
4//! - `validate_query_semantics` owns user-facing query semantics and emits `PlanError`.
5//! - executor-boundary defensive checks live in `db::executor::plan_validate`.
6//!
7//! Future rule changes must declare a semantic owner. Defensive re-check layers may mirror
8//! rules, but must not reinterpret semantics or error class intent.
9
10use crate::{
11    db::{
12        access::{
13            AccessPlanError,
14            validate_access_structure_model as validate_access_structure_model_shared,
15        },
16        cursor::CursorPlanError,
17        predicate::{CompareOp, SchemaInfo, ValidateError, validate},
18        query::plan::{
19            AccessPlannedQuery, FieldSlot, GroupHavingSpec, GroupHavingSymbol, GroupSpec, LoadSpec,
20            LogicalPlan, OrderSpec, QueryMode, ScalarPlan,
21        },
22    },
23    model::entity::EntityModel,
24    value::Value,
25};
26use std::collections::BTreeSet;
27use thiserror::Error as ThisError;
28
29///
30/// PlanError
31///
32/// Executor-visible validation failures for logical plans.
33///
34/// These errors indicate that a plan cannot be safely executed against the
35/// current schema or entity definition. They are *not* planner bugs.
36///
37
38#[derive(Debug, ThisError)]
39pub enum PlanError {
40    #[error("predicate validation failed: {0}")]
41    PredicateInvalid(Box<ValidateError>),
42
43    #[error("{0}")]
44    Order(Box<OrderPlanError>),
45
46    #[error("{0}")]
47    Access(Box<AccessPlanError>),
48
49    #[error("{0}")]
50    Policy(Box<PolicyPlanError>),
51
52    #[error("{0}")]
53    Cursor(Box<CursorPlanError>),
54
55    #[error("{0}")]
56    Group(Box<GroupPlanError>),
57}
58
59///
60/// OrderPlanError
61///
62/// ORDER BY-specific validation failures.
63///
64
65#[derive(Debug, ThisError)]
66pub enum OrderPlanError {
67    /// ORDER BY references an unknown field.
68    #[error("unknown order field '{field}'")]
69    UnknownField { field: String },
70
71    /// ORDER BY references a field that cannot be ordered.
72    #[error("order field '{field}' is not orderable")]
73    UnorderableField { field: String },
74
75    /// ORDER BY references the same non-primary-key field multiple times.
76    #[error("order field '{field}' appears multiple times")]
77    DuplicateOrderField { field: String },
78
79    /// Ordered plans must terminate with the primary-key tie-break.
80    #[error("order specification must end with primary key '{field}' as deterministic tie-break")]
81    MissingPrimaryKeyTieBreak { field: String },
82}
83
84///
85/// PolicyPlanError
86///
87/// Plan-shape policy failures.
88///
89
90#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
91pub enum PolicyPlanError {
92    /// ORDER BY must specify at least one field.
93    #[error("order specification must include at least one field")]
94    EmptyOrderSpec,
95
96    /// Delete plans must not carry pagination.
97    #[error("delete plans must not include pagination")]
98    DeletePlanWithPagination,
99
100    /// Load plans must not carry delete limits.
101    #[error("load plans must not include delete limits")]
102    LoadPlanWithDeleteLimit,
103
104    /// Delete limits require an explicit ordering.
105    #[error("delete limit requires an explicit ordering")]
106    DeleteLimitRequiresOrder,
107
108    /// Pagination requires an explicit ordering.
109    #[error(
110        "Unordered pagination is not allowed.\nThis query uses LIMIT or OFFSET without an ORDER BY clause.\nPagination without a total ordering is non-deterministic.\nAdd an explicit order_by(...) to make the query stable."
111    )]
112    UnorderedPagination,
113}
114
115///
116/// CursorPagingPolicyError
117///
118/// Cursor pagination readiness errors shared by intent/fluent entry surfaces.
119///
120#[derive(Clone, Copy, Debug, Eq, PartialEq, ThisError)]
121pub enum CursorPagingPolicyError {
122    #[error("cursor pagination requires an explicit ordering")]
123    CursorRequiresOrder,
124
125    #[error("cursor pagination requires a limit")]
126    CursorRequiresLimit,
127}
128
129///
130/// GroupPlanError
131///
132/// GROUP BY wrapper validation failures owned by query planning.
133///
134
135#[derive(Clone, Debug, Eq, PartialEq, ThisError)]
136pub enum GroupPlanError {
137    /// HAVING requires GROUP BY grouped plan shape.
138    #[error("HAVING is only supported for GROUP BY queries in this release")]
139    HavingRequiresGroupBy,
140
141    /// Grouped validation entrypoint received a scalar logical plan.
142    #[error("group query validation requires grouped logical plan variant")]
143    GroupedLogicalPlanRequired,
144
145    /// GROUP BY requires at least one declared grouping field.
146    #[error("group specification must include at least one group field")]
147    EmptyGroupFields,
148
149    /// GROUP BY requires at least one aggregate terminal.
150    #[error("group specification must include at least one aggregate terminal")]
151    EmptyAggregates,
152
153    /// GROUP BY references an unknown group field.
154    #[error("unknown group field '{field}'")]
155    UnknownGroupField { field: String },
156
157    /// GROUP BY must not repeat the same resolved group slot.
158    #[error("group specification has duplicate group key: '{field}'")]
159    DuplicateGroupField { field: String },
160
161    /// GROUP BY v1 does not accept DISTINCT unless adjacency eligibility is explicit.
162    #[error(
163        "grouped DISTINCT requires adjacency-based ordered-group eligibility proof in this release"
164    )]
165    DistinctAdjacencyEligibilityRequired,
166
167    /// GROUP BY ORDER BY shape must start with grouped-key prefix.
168    #[error("grouped ORDER BY must start with GROUP BY key prefix in this release")]
169    OrderPrefixNotAlignedWithGroupKeys,
170
171    /// HAVING with DISTINCT is deferred until grouped DISTINCT support expands.
172    #[error("grouped HAVING with DISTINCT is not supported in this release")]
173    DistinctHavingUnsupported,
174
175    /// HAVING currently supports compare operators only.
176    #[error("grouped HAVING clause at index={index} uses unsupported operator: {op}")]
177    HavingUnsupportedCompareOp { index: usize, op: String },
178
179    /// HAVING group-field symbols must reference declared grouped keys.
180    #[error("grouped HAVING clause at index={index} references non-group field '{field}'")]
181    HavingNonGroupFieldReference { index: usize, field: String },
182
183    /// HAVING aggregate references must resolve to declared grouped terminals.
184    #[error(
185        "grouped HAVING clause at index={index} references aggregate index {aggregate_index} but aggregate_count={aggregate_count}"
186    )]
187    HavingAggregateIndexOutOfBounds {
188        index: usize,
189        aggregate_index: usize,
190        aggregate_count: usize,
191    },
192
193    /// DISTINCT grouped terminal kinds are intentionally conservative in v1.
194    #[error(
195        "grouped DISTINCT aggregate at index={index} uses unsupported kind '{kind}' in this release"
196    )]
197    DistinctAggregateKindUnsupported { index: usize, kind: String },
198
199    /// DISTINCT over grouped field-target terminals is deferred with field-target support.
200    #[error(
201        "grouped DISTINCT aggregate at index={index} cannot target field '{field}' in this release: found {kind}"
202    )]
203    DistinctAggregateFieldTargetUnsupported {
204        index: usize,
205        kind: String,
206        field: String,
207    },
208
209    /// Aggregate target fields must resolve in the model schema.
210    #[error("unknown grouped aggregate target field at index={index}: '{field}'")]
211    UnknownAggregateTargetField { index: usize, field: String },
212
213    /// Field-target grouped terminals are not enabled in grouped execution v1.
214    #[error(
215        "grouped aggregate at index={index} cannot target field '{field}' in this release: found {kind}"
216    )]
217    FieldTargetAggregatesUnsupported {
218        index: usize,
219        kind: String,
220        field: String,
221    },
222}
223
224///
225/// CursorOrderPlanShapeError
226///
227/// Logical cursor-order plan-shape failures used by cursor/runtime boundary adapters.
228///
229#[derive(Clone, Copy, Debug, Eq, PartialEq)]
230pub(crate) enum CursorOrderPlanShapeError {
231    MissingExplicitOrder,
232    EmptyOrderSpec,
233}
234
235///
236/// IntentKeyAccessKind
237///
238/// Key-access shape used by intent policy validation.
239///
240#[derive(Clone, Copy, Debug, Eq, PartialEq)]
241pub(crate) enum IntentKeyAccessKind {
242    Single,
243    Many,
244    Only,
245}
246
247///
248/// IntentKeyAccessPolicyViolation
249///
250/// Logical key-access policy violations at query-intent boundaries.
251///
252#[derive(Clone, Copy, Debug, Eq, PartialEq)]
253pub(crate) enum IntentKeyAccessPolicyViolation {
254    KeyAccessConflict,
255    ByIdsWithPredicate,
256    OnlyWithPredicate,
257}
258
259///
260/// IntentTerminalPolicyViolation
261///
262/// Intent-level terminal compatibility violations.
263///
264#[derive(Clone, Copy, Debug, Eq, PartialEq)]
265pub(crate) enum IntentTerminalPolicyViolation {
266    GroupedFieldTargetExtremaUnsupported,
267}
268
269///
270/// FluentLoadPolicyViolation
271///
272/// Fluent load-entry policy violations.
273///
274#[derive(Clone, Copy, Debug, Eq, PartialEq)]
275pub(crate) enum FluentLoadPolicyViolation {
276    CursorRequiresPagedExecution,
277    GroupedRequiresExecuteGrouped,
278    CursorRequiresOrder,
279    CursorRequiresLimit,
280}
281
282impl From<ValidateError> for PlanError {
283    fn from(err: ValidateError) -> Self {
284        Self::PredicateInvalid(Box::new(err))
285    }
286}
287
288impl From<OrderPlanError> for PlanError {
289    fn from(err: OrderPlanError) -> Self {
290        Self::Order(Box::new(err))
291    }
292}
293
294impl From<AccessPlanError> for PlanError {
295    fn from(err: AccessPlanError) -> Self {
296        Self::Access(Box::new(err))
297    }
298}
299
300impl From<PolicyPlanError> for PlanError {
301    fn from(err: PolicyPlanError) -> Self {
302        Self::Policy(Box::new(err))
303    }
304}
305
306impl From<CursorPlanError> for PlanError {
307    fn from(err: CursorPlanError) -> Self {
308        Self::Cursor(Box::new(err))
309    }
310}
311
312impl From<GroupPlanError> for PlanError {
313    fn from(err: GroupPlanError) -> Self {
314        Self::Group(Box::new(err))
315    }
316}
317
318/// Validate a logical plan with model-level key values.
319///
320/// Ownership:
321/// - semantic owner for user-facing query validity at planning boundaries
322/// - failures here are user-visible planning failures (`PlanError`)
323///
324/// New user-facing validation rules must be introduced here first, then mirrored
325/// defensively in downstream layers without changing semantics.
326pub(crate) fn validate_query_semantics(
327    schema: &SchemaInfo,
328    model: &EntityModel,
329    plan: &AccessPlannedQuery<Value>,
330) -> Result<(), PlanError> {
331    let logical = plan.scalar_plan();
332
333    validate_plan_core(
334        schema,
335        model,
336        logical,
337        plan,
338        validate_order,
339        |schema, model, plan| {
340            validate_access_structure_model_shared(schema, model, &plan.access)
341                .map_err(PlanError::from)
342        },
343    )?;
344
345    Ok(())
346}
347
348/// Validate grouped query semantics for one grouped plan wrapper.
349///
350/// Ownership:
351/// - semantic owner for GROUP BY wrapper validation
352/// - failures here are user-visible planning failures (`PlanError`)
353pub(crate) fn validate_group_query_semantics(
354    schema: &SchemaInfo,
355    model: &EntityModel,
356    plan: &AccessPlannedQuery<Value>,
357) -> Result<(), PlanError> {
358    let (logical, group, having) = match &plan.logical {
359        LogicalPlan::Grouped(grouped) => (&grouped.scalar, &grouped.group, grouped.having.as_ref()),
360        LogicalPlan::Scalar(_) => {
361            return Err(PlanError::from(GroupPlanError::GroupedLogicalPlanRequired));
362        }
363    };
364
365    validate_plan_core(
366        schema,
367        model,
368        logical,
369        plan,
370        validate_order,
371        |schema, model, plan| {
372            validate_access_structure_model_shared(schema, model, &plan.access)
373                .map_err(PlanError::from)
374        },
375    )?;
376    validate_grouped_distinct_and_order_policy(logical, group, having.is_some())?;
377    validate_group_spec(schema, model, group)?;
378    validate_grouped_having_policy(group, having)?;
379
380    Ok(())
381}
382
383// Validate grouped DISTINCT + ORDER BY policy gates for grouped v1 hardening.
384fn validate_grouped_distinct_and_order_policy(
385    logical: &ScalarPlan,
386    group: &GroupSpec,
387    has_having: bool,
388) -> Result<(), PlanError> {
389    if logical.distinct && has_having {
390        return Err(PlanError::from(GroupPlanError::DistinctHavingUnsupported));
391    }
392    if logical.distinct {
393        return Err(PlanError::from(
394            GroupPlanError::DistinctAdjacencyEligibilityRequired,
395        ));
396    }
397
398    let Some(order) = logical.order.as_ref() else {
399        return Ok(());
400    };
401    if order_prefix_aligned_with_group_fields(order, group.group_fields.as_slice()) {
402        return Ok(());
403    }
404
405    Err(PlanError::from(
406        GroupPlanError::OrderPrefixNotAlignedWithGroupKeys,
407    ))
408}
409
410// Validate grouped HAVING policy gates and grouped-symbol references.
411fn validate_grouped_having_policy(
412    group: &GroupSpec,
413    having: Option<&GroupHavingSpec>,
414) -> Result<(), PlanError> {
415    let Some(having) = having else {
416        return Ok(());
417    };
418
419    for (index, clause) in having.clauses().iter().enumerate() {
420        if !having_compare_op_supported(clause.op()) {
421            return Err(PlanError::from(
422                GroupPlanError::HavingUnsupportedCompareOp {
423                    index,
424                    op: format!("{:?}", clause.op()),
425                },
426            ));
427        }
428
429        match clause.symbol() {
430            GroupHavingSymbol::GroupField(field_slot) => {
431                if !group
432                    .group_fields
433                    .iter()
434                    .any(|group_field| group_field.index() == field_slot.index())
435                {
436                    return Err(PlanError::from(
437                        GroupPlanError::HavingNonGroupFieldReference {
438                            index,
439                            field: field_slot.field().to_string(),
440                        },
441                    ));
442                }
443            }
444            GroupHavingSymbol::AggregateIndex(aggregate_index) => {
445                if *aggregate_index >= group.aggregates.len() {
446                    return Err(PlanError::from(
447                        GroupPlanError::HavingAggregateIndexOutOfBounds {
448                            index,
449                            aggregate_index: *aggregate_index,
450                            aggregate_count: group.aggregates.len(),
451                        },
452                    ));
453                }
454            }
455        }
456    }
457
458    Ok(())
459}
460
461const fn having_compare_op_supported(op: CompareOp) -> bool {
462    matches!(
463        op,
464        CompareOp::Eq
465            | CompareOp::Ne
466            | CompareOp::Lt
467            | CompareOp::Lte
468            | CompareOp::Gt
469            | CompareOp::Gte
470    )
471}
472
473// Return true when ORDER BY starts with GROUP BY key fields in declaration order.
474fn order_prefix_aligned_with_group_fields(order: &OrderSpec, group_fields: &[FieldSlot]) -> bool {
475    if order.fields.len() < group_fields.len() {
476        return false;
477    }
478
479    group_fields
480        .iter()
481        .zip(order.fields.iter())
482        .all(|(group_field, (order_field, _))| order_field == group_field.field())
483}
484
485/// Validate one grouped declarative spec against schema-level field surface.
486pub(crate) fn validate_group_spec(
487    schema: &SchemaInfo,
488    model: &EntityModel,
489    group: &GroupSpec,
490) -> Result<(), PlanError> {
491    if group.group_fields.is_empty() {
492        return Err(PlanError::from(GroupPlanError::EmptyGroupFields));
493    }
494    if group.aggregates.is_empty() {
495        return Err(PlanError::from(GroupPlanError::EmptyAggregates));
496    }
497
498    let mut seen_group_slots = BTreeSet::<usize>::new();
499    for field_slot in &group.group_fields {
500        if model.fields.get(field_slot.index()).is_none() {
501            return Err(PlanError::from(GroupPlanError::UnknownGroupField {
502                field: field_slot.field().to_string(),
503            }));
504        }
505        if !seen_group_slots.insert(field_slot.index()) {
506            return Err(PlanError::from(GroupPlanError::DuplicateGroupField {
507                field: field_slot.field().to_string(),
508            }));
509        }
510    }
511
512    for (index, aggregate) in group.aggregates.iter().enumerate() {
513        if aggregate.distinct() && !aggregate.kind().supports_grouped_distinct_v1() {
514            return Err(PlanError::from(
515                GroupPlanError::DistinctAggregateKindUnsupported {
516                    index,
517                    kind: format!("{:?}", aggregate.kind()),
518                },
519            ));
520        }
521
522        let Some(target_field) = aggregate.target_field.as_ref() else {
523            continue;
524        };
525        if aggregate.distinct() {
526            return Err(PlanError::from(
527                GroupPlanError::DistinctAggregateFieldTargetUnsupported {
528                    index,
529                    kind: format!("{:?}", aggregate.kind()),
530                    field: target_field.clone(),
531                },
532            ));
533        }
534        if schema.field(target_field).is_none() {
535            return Err(PlanError::from(
536                GroupPlanError::UnknownAggregateTargetField {
537                    index,
538                    field: target_field.clone(),
539                },
540            ));
541        }
542        return Err(PlanError::from(
543            GroupPlanError::FieldTargetAggregatesUnsupported {
544                index,
545                kind: format!("{:?}", aggregate.kind()),
546                field: target_field.clone(),
547            },
548        ));
549    }
550
551    Ok(())
552}
553
554// Shared logical plan validation core owned by planner semantics.
555fn validate_plan_core<K, FOrder, FAccess>(
556    schema: &SchemaInfo,
557    model: &EntityModel,
558    logical: &ScalarPlan,
559    plan: &AccessPlannedQuery<K>,
560    validate_order_fn: FOrder,
561    validate_access_fn: FAccess,
562) -> Result<(), PlanError>
563where
564    FOrder: Fn(&SchemaInfo, &OrderSpec) -> Result<(), PlanError>,
565    FAccess: Fn(&SchemaInfo, &EntityModel, &AccessPlannedQuery<K>) -> Result<(), PlanError>,
566{
567    if let Some(predicate) = &logical.predicate {
568        validate(schema, predicate)?;
569    }
570
571    if let Some(order) = &logical.order {
572        validate_order_fn(schema, order)?;
573        validate_no_duplicate_non_pk_order_fields(model, order)?;
574        validate_primary_key_tie_break(model, order)?;
575    }
576
577    validate_access_fn(schema, model, plan)?;
578    validate_plan_shape(&plan.logical)?;
579
580    Ok(())
581}
582// ORDER validation ownership contract:
583// - This module owns ORDER semantic validation (field existence/orderability/tie-break).
584// - ORDER canonicalization (primary-key tie-break insertion) is performed at the
585//   intent boundary via `canonicalize_order_spec` before plan validation.
586// - Shape-policy checks (for example empty ORDER, pagination/order coupling) are owned here.
587// - Executor/runtime layers may defend execution preconditions only.
588
589/// Return true when an ORDER BY exists and contains at least one field.
590#[must_use]
591pub(crate) fn has_explicit_order(order: Option<&OrderSpec>) -> bool {
592    order.is_some_and(|order| !order.fields.is_empty())
593}
594
595/// Return true when an ORDER BY exists but is empty.
596#[must_use]
597pub(crate) fn has_empty_order(order: Option<&OrderSpec>) -> bool {
598    order.is_some_and(|order| order.fields.is_empty())
599}
600
601/// Validate order-shape rules shared across intent and logical plan boundaries.
602pub(crate) fn validate_order_shape(order: Option<&OrderSpec>) -> Result<(), PolicyPlanError> {
603    if has_empty_order(order) {
604        return Err(PolicyPlanError::EmptyOrderSpec);
605    }
606
607    Ok(())
608}
609
610/// Validate intent-level plan-shape rules derived from query mode + order.
611pub(crate) fn validate_intent_plan_shape(
612    mode: QueryMode,
613    order: Option<&OrderSpec>,
614) -> Result<(), PolicyPlanError> {
615    validate_order_shape(order)?;
616
617    let has_order = has_explicit_order(order);
618    if matches!(mode, QueryMode::Delete(spec) if spec.limit.is_some()) && !has_order {
619        return Err(PolicyPlanError::DeleteLimitRequiresOrder);
620    }
621
622    Ok(())
623}
624
625/// Validate cursor-pagination readiness for a load-spec + ordering pair.
626pub(crate) const fn validate_cursor_paging_requirements(
627    has_order: bool,
628    spec: LoadSpec,
629) -> Result<(), CursorPagingPolicyError> {
630    if !has_order {
631        return Err(CursorPagingPolicyError::CursorRequiresOrder);
632    }
633    if spec.limit.is_none() {
634        return Err(CursorPagingPolicyError::CursorRequiresLimit);
635    }
636
637    Ok(())
638}
639
640/// Validate cursor-order shape and return the logical order contract when present.
641pub(crate) const fn validate_cursor_order_plan_shape(
642    order: Option<&OrderSpec>,
643    require_explicit_order: bool,
644) -> Result<Option<&OrderSpec>, CursorOrderPlanShapeError> {
645    let Some(order) = order else {
646        if require_explicit_order {
647            return Err(CursorOrderPlanShapeError::MissingExplicitOrder);
648        }
649
650        return Ok(None);
651    };
652
653    if order.fields.is_empty() {
654        return Err(CursorOrderPlanShapeError::EmptyOrderSpec);
655    }
656
657    Ok(Some(order))
658}
659
660/// Resolve one grouped field into a stable field slot.
661pub(crate) fn resolve_group_field_slot(
662    model: &EntityModel,
663    field: &str,
664) -> Result<FieldSlot, PlanError> {
665    FieldSlot::resolve(model, field).ok_or_else(|| {
666        PlanError::from(GroupPlanError::UnknownGroupField {
667            field: field.to_string(),
668        })
669    })
670}
671
672/// Validate intent key-access policy before planning.
673pub(crate) const fn validate_intent_key_access_policy(
674    key_access_conflict: bool,
675    key_access_kind: Option<IntentKeyAccessKind>,
676    has_predicate: bool,
677) -> Result<(), IntentKeyAccessPolicyViolation> {
678    if key_access_conflict {
679        return Err(IntentKeyAccessPolicyViolation::KeyAccessConflict);
680    }
681
682    match key_access_kind {
683        Some(IntentKeyAccessKind::Many) if has_predicate => {
684            Err(IntentKeyAccessPolicyViolation::ByIdsWithPredicate)
685        }
686        Some(IntentKeyAccessKind::Only) if has_predicate => {
687            Err(IntentKeyAccessPolicyViolation::OnlyWithPredicate)
688        }
689        Some(
690            IntentKeyAccessKind::Single | IntentKeyAccessKind::Many | IntentKeyAccessKind::Only,
691        )
692        | None => Ok(()),
693    }
694}
695
696/// Validate grouped field-target terminal compatibility at intent boundaries.
697pub(crate) const fn validate_grouped_field_target_extrema_policy()
698-> Result<(), IntentTerminalPolicyViolation> {
699    Err(IntentTerminalPolicyViolation::GroupedFieldTargetExtremaUnsupported)
700}
701
702/// Validate fluent non-paged load entry policy.
703pub(crate) const fn validate_fluent_non_paged_mode(
704    has_cursor_token: bool,
705    has_grouping: bool,
706) -> Result<(), FluentLoadPolicyViolation> {
707    if has_cursor_token {
708        return Err(FluentLoadPolicyViolation::CursorRequiresPagedExecution);
709    }
710    if has_grouping {
711        return Err(FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped);
712    }
713
714    Ok(())
715}
716
717/// Validate fluent paged load entry policy.
718pub(crate) fn validate_fluent_paged_mode(
719    has_grouping: bool,
720    has_explicit_order: bool,
721    spec: Option<LoadSpec>,
722) -> Result<(), FluentLoadPolicyViolation> {
723    if has_grouping {
724        return Err(FluentLoadPolicyViolation::GroupedRequiresExecuteGrouped);
725    }
726
727    let Some(spec) = spec else {
728        return Ok(());
729    };
730
731    validate_cursor_paging_requirements(has_explicit_order, spec).map_err(|err| match err {
732        CursorPagingPolicyError::CursorRequiresOrder => {
733            FluentLoadPolicyViolation::CursorRequiresOrder
734        }
735        CursorPagingPolicyError::CursorRequiresLimit => {
736            FluentLoadPolicyViolation::CursorRequiresLimit
737        }
738    })
739}
740
741/// Validate mode/order/pagination invariants for one logical plan.
742pub(crate) fn validate_plan_shape(plan: &LogicalPlan) -> Result<(), PolicyPlanError> {
743    let grouped = matches!(plan, LogicalPlan::Grouped(_));
744    let plan = match plan {
745        LogicalPlan::Scalar(plan) => plan,
746        LogicalPlan::Grouped(plan) => &plan.scalar,
747    };
748    validate_order_shape(plan.order.as_ref())?;
749
750    let has_order = has_explicit_order(plan.order.as_ref());
751    if plan.delete_limit.is_some() && !has_order {
752        return Err(PolicyPlanError::DeleteLimitRequiresOrder);
753    }
754
755    match plan.mode {
756        QueryMode::Delete(_) => {
757            if plan.page.is_some() {
758                return Err(PolicyPlanError::DeletePlanWithPagination);
759            }
760        }
761        QueryMode::Load(_) => {
762            if plan.delete_limit.is_some() {
763                return Err(PolicyPlanError::LoadPlanWithDeleteLimit);
764            }
765            // GROUP BY v1 uses canonical grouped key ordering when ORDER BY is
766            // omitted, so grouped pagination remains deterministic without an
767            // explicit sort clause.
768            if plan.page.is_some() && !has_order && !grouped {
769                return Err(PolicyPlanError::UnorderedPagination);
770            }
771        }
772    }
773
774    Ok(())
775}
776
777/// Validate ORDER BY fields against the schema.
778pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
779    for (field, _) in &order.fields {
780        let field_type = schema
781            .field(field)
782            .ok_or_else(|| OrderPlanError::UnknownField {
783                field: field.clone(),
784            })
785            .map_err(PlanError::from)?;
786
787        if !field_type.is_orderable() {
788            // CONTRACT: ORDER BY rejects non-queryable or unordered fields.
789            return Err(PlanError::from(OrderPlanError::UnorderableField {
790                field: field.clone(),
791            }));
792        }
793    }
794
795    Ok(())
796}
797
798/// Reject duplicate non-primary-key fields in ORDER BY.
799pub(crate) fn validate_no_duplicate_non_pk_order_fields(
800    model: &EntityModel,
801    order: &OrderSpec,
802) -> Result<(), PlanError> {
803    let mut seen = BTreeSet::new();
804    let pk_field = model.primary_key.name;
805
806    for (field, _) in &order.fields {
807        if field == pk_field {
808            continue;
809        }
810        if !seen.insert(field.as_str()) {
811            return Err(PlanError::from(OrderPlanError::DuplicateOrderField {
812                field: field.clone(),
813            }));
814        }
815    }
816
817    Ok(())
818}
819
820// Ordered plans must include exactly one terminal primary-key field so ordering is total and
821// deterministic across explain, fingerprint, and executor comparison paths.
822pub(crate) fn validate_primary_key_tie_break(
823    model: &EntityModel,
824    order: &OrderSpec,
825) -> Result<(), PlanError> {
826    if order.fields.is_empty() {
827        return Ok(());
828    }
829
830    let pk_field = model.primary_key.name;
831    let pk_count = order
832        .fields
833        .iter()
834        .filter(|(field, _)| field == pk_field)
835        .count();
836    let trailing_pk = order
837        .fields
838        .last()
839        .is_some_and(|(field, _)| field == pk_field);
840
841    if pk_count == 1 && trailing_pk {
842        Ok(())
843    } else {
844        Err(PlanError::from(OrderPlanError::MissingPrimaryKeyTieBreak {
845            field: pk_field.to_string(),
846        }))
847    }
848}