Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Executor-ready plan validation against a concrete entity schema.
2use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
3use crate::{
4    db::query::predicate::{
5        self, SchemaInfo,
6        validate::{FieldType, ScalarType},
7    },
8    error::{ErrorClass, ErrorOrigin, InternalError},
9    model::entity::EntityModel,
10    model::index::IndexModel,
11    traits::{EntityKind, FieldValue},
12    value::Value,
13};
14use thiserror::Error as ThisError;
15
16///
17/// PlanError
18///
19/// Executor-visible validation failures for logical plans.
20///
21/// These errors indicate that a plan cannot be safely executed against the
22/// current schema or entity definition. They are *not* planner bugs.
23///
24
25#[derive(Debug, ThisError)]
26pub enum PlanError {
27    #[error("predicate validation failed: {0}")]
28    PredicateInvalid(#[from] predicate::ValidateError),
29
30    /// ORDER BY references an unknown field.
31    #[error("unknown order field '{field}'")]
32    UnknownOrderField { field: String },
33
34    /// ORDER BY references a field that cannot be ordered.
35    #[error("order field '{field}' is not orderable")]
36    UnorderableField { field: String },
37
38    /// Access plan references an index not declared on the entity.
39    #[error("index '{index}' not found on entity")]
40    IndexNotFound { index: IndexModel },
41
42    /// Index prefix exceeds the number of indexed fields.
43    #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
44    IndexPrefixTooLong { prefix_len: usize, field_len: usize },
45
46    /// Index prefix must include at least one value.
47    #[error("index prefix must include at least one value")]
48    IndexPrefixEmpty,
49
50    /// Index prefix literal does not match indexed field type.
51    #[error("index prefix value for field '{field}' is incompatible")]
52    IndexPrefixValueMismatch { field: String },
53
54    /// Primary key field exists but is not key-compatible.
55    #[error("primary key field '{field}' is not key-compatible")]
56    PrimaryKeyUnsupported { field: String },
57
58    /// Supplied key does not match the primary key type.
59    #[error("key '{key:?}' is incompatible with primary key '{field}'")]
60    PrimaryKeyMismatch { field: String, key: Value },
61
62    /// Key range has invalid ordering.
63    #[error("key range start is greater than end")]
64    InvalidKeyRange,
65
66    /// ORDER BY must specify at least one field.
67    #[error("order specification must include at least one field")]
68    EmptyOrderSpec,
69
70    /// Delete plans must not carry pagination.
71    #[error("delete plans must not include pagination")]
72    DeletePlanWithPagination,
73
74    /// Load plans must not carry delete limits.
75    #[error("load plans must not include delete limits")]
76    LoadPlanWithDeleteLimit,
77
78    /// Delete limits require an explicit ordering.
79    #[error("delete limit requires an explicit ordering")]
80    DeleteLimitRequiresOrder,
81}
82
83/// Validate a logical plan using a prebuilt schema surface.
84#[cfg(test)]
85pub(crate) fn validate_plan_with_schema_info<K>(
86    schema: &SchemaInfo,
87    model: &EntityModel,
88    plan: &LogicalPlan<K>,
89) -> Result<(), PlanError>
90where
91    K: FieldValue + Ord,
92{
93    validate_logical_plan(schema, model, plan)
94}
95
96/// Validate a logical plan against the runtime entity model.
97///
98/// This is the executor-safe entrypoint and must not consult global schema.
99#[cfg(test)]
100#[expect(dead_code)]
101pub(crate) fn validate_plan_with_model<K>(
102    plan: &LogicalPlan<K>,
103    model: &EntityModel,
104) -> Result<(), PlanError>
105where
106    K: FieldValue + Ord,
107{
108    let schema = SchemaInfo::from_entity_model(model)?;
109    validate_plan_with_schema_info(&schema, model, plan)
110}
111
112/// Validate a logical plan against schema and plan-level invariants.
113#[cfg(test)]
114pub(crate) fn validate_logical_plan<K>(
115    schema: &SchemaInfo,
116    model: &EntityModel,
117    plan: &LogicalPlan<K>,
118) -> Result<(), PlanError>
119where
120    K: FieldValue + Ord,
121{
122    if let Some(predicate) = &plan.predicate {
123        predicate::validate(schema, predicate)?;
124    }
125
126    if let Some(order) = &plan.order {
127        validate_order(schema, order)?;
128    }
129
130    validate_access_plan(schema, model, &plan.access)?;
131    validate_plan_semantics(plan)?;
132
133    Ok(())
134}
135
136/// Validate a logical plan with model-level key values.
137pub(crate) fn validate_logical_plan_model(
138    schema: &SchemaInfo,
139    model: &EntityModel,
140    plan: &LogicalPlan<Value>,
141) -> Result<(), PlanError> {
142    if let Some(predicate) = &plan.predicate {
143        predicate::validate(schema, predicate)?;
144    }
145
146    if let Some(order) = &plan.order {
147        validate_order(schema, order)?;
148    }
149
150    validate_access_plan_model(schema, model, &plan.access)?;
151    validate_plan_semantics(plan)?;
152
153    Ok(())
154}
155
156/// Validate plan-level invariants not covered by schema checks.
157fn validate_plan_semantics<K>(plan: &LogicalPlan<K>) -> Result<(), PlanError> {
158    if let Some(order) = &plan.order
159        && order.fields.is_empty()
160    {
161        return Err(PlanError::EmptyOrderSpec);
162    }
163
164    if plan.mode.is_delete() {
165        if plan.page.is_some() {
166            return Err(PlanError::DeletePlanWithPagination);
167        }
168
169        if plan.delete_limit.is_some()
170            && plan
171                .order
172                .as_ref()
173                .is_none_or(|order| order.fields.is_empty())
174        {
175            return Err(PlanError::DeleteLimitRequiresOrder);
176        }
177    }
178
179    if plan.mode.is_load() && plan.delete_limit.is_some() {
180        return Err(PlanError::LoadPlanWithDeleteLimit);
181    }
182
183    Ok(())
184}
185
186/// Validate plans at executor boundaries and surface invariant violations.
187pub(crate) fn validate_executor_plan<E: EntityKind>(
188    plan: &LogicalPlan<E::Id>,
189) -> Result<(), InternalError> {
190    let schema = SchemaInfo::from_entity_model(E::MODEL).map_err(|err| {
191        InternalError::new(
192            ErrorClass::InvariantViolation,
193            ErrorOrigin::Query,
194            format!("entity schema invalid for {}: {err}", E::PATH),
195        )
196    })?;
197
198    if let Some(predicate) = &plan.predicate {
199        predicate::validate(&schema, predicate).map_err(|err| {
200            InternalError::new(
201                ErrorClass::InvariantViolation,
202                ErrorOrigin::Query,
203                err.to_string(),
204            )
205        })?;
206    }
207
208    if let Some(order) = &plan.order {
209        validate_executor_order(&schema, order).map_err(|err| {
210            InternalError::new(
211                ErrorClass::InvariantViolation,
212                ErrorOrigin::Query,
213                err.to_string(),
214            )
215        })?;
216    }
217
218    validate_access_plan(&schema, E::MODEL, &plan.access).map_err(|err| {
219        InternalError::new(
220            ErrorClass::InvariantViolation,
221            ErrorOrigin::Query,
222            err.to_string(),
223        )
224    })?;
225
226    validate_plan_semantics(plan).map_err(|err| {
227        InternalError::new(
228            ErrorClass::InvariantViolation,
229            ErrorOrigin::Query,
230            err.to_string(),
231        )
232    })?;
233
234    Ok(())
235}
236
237/// Validate ORDER BY fields against the schema.
238pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
239    for (field, _) in &order.fields {
240        let field_type = schema
241            .field(field)
242            .ok_or_else(|| PlanError::UnknownOrderField {
243                field: field.clone(),
244            })?;
245
246        if !field_type.is_orderable() {
247            // CONTRACT: ORDER BY rejects unsupported or unordered fields.
248            return Err(PlanError::UnorderableField {
249                field: field.clone(),
250            });
251        }
252    }
253
254    Ok(())
255}
256
257/// Validate ORDER BY fields for executor-only plans.
258///
259/// CONTRACT: executor ordering validation matches planner rules.
260fn validate_executor_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
261    validate_order(schema, order)
262}
263
264/// Validate executor-visible access paths.
265///
266/// This ensures keys, ranges, and index prefixes are schema-compatible.
267pub(crate) fn validate_access_plan<K>(
268    schema: &SchemaInfo,
269    model: &EntityModel,
270    access: &AccessPlan<K>,
271) -> Result<(), PlanError>
272where
273    K: FieldValue + Ord,
274{
275    match access {
276        AccessPlan::Path(path) => validate_access_path(schema, model, path),
277        AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
278            for child in children {
279                validate_access_plan(schema, model, child)?;
280            }
281            Ok(())
282        }
283    }
284}
285
286/// Validate access paths that carry model-level key values.
287pub(crate) fn validate_access_plan_model(
288    schema: &SchemaInfo,
289    model: &EntityModel,
290    access: &AccessPlan<Value>,
291) -> Result<(), PlanError> {
292    match access {
293        AccessPlan::Path(path) => validate_access_path_model(schema, model, path),
294        AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
295            for child in children {
296                validate_access_plan_model(schema, model, child)?;
297            }
298            Ok(())
299        }
300    }
301}
302
303fn validate_access_path<K>(
304    schema: &SchemaInfo,
305    model: &EntityModel,
306    access: &AccessPath<K>,
307) -> Result<(), PlanError>
308where
309    K: FieldValue + Ord,
310{
311    match access {
312        AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
313        AccessPath::ByKeys(keys) => {
314            // Empty key lists are a valid no-op.
315            if keys.is_empty() {
316                return Ok(());
317            }
318            for key in keys {
319                validate_pk_key(schema, model, key)?;
320            }
321            Ok(())
322        }
323        AccessPath::KeyRange { start, end } => {
324            validate_pk_key(schema, model, start)?;
325            validate_pk_key(schema, model, end)?;
326            if start > end {
327                return Err(PlanError::InvalidKeyRange);
328            }
329            Ok(())
330        }
331        AccessPath::IndexPrefix { index, values } => {
332            validate_index_prefix(schema, model, index, values)
333        }
334        AccessPath::FullScan => Ok(()),
335    }
336}
337
338// Validate executor-visible access paths that carry model-level key values.
339fn validate_access_path_model(
340    schema: &SchemaInfo,
341    model: &EntityModel,
342    access: &AccessPath<Value>,
343) -> Result<(), PlanError> {
344    match access {
345        AccessPath::ByKey(key) => validate_pk_value(schema, model, key),
346        AccessPath::ByKeys(keys) => {
347            if keys.is_empty() {
348                return Ok(());
349            }
350            for key in keys {
351                validate_pk_value(schema, model, key)?;
352            }
353            Ok(())
354        }
355        AccessPath::KeyRange { start, end } => {
356            validate_pk_value(schema, model, start)?;
357            validate_pk_value(schema, model, end)?;
358            let Some(ordering) = start.partial_cmp(end) else {
359                return Err(PlanError::InvalidKeyRange);
360            };
361            if ordering == std::cmp::Ordering::Greater {
362                return Err(PlanError::InvalidKeyRange);
363            }
364            Ok(())
365        }
366        AccessPath::IndexPrefix { index, values } => {
367            validate_index_prefix(schema, model, index, values)
368        }
369        AccessPath::FullScan => Ok(()),
370    }
371}
372
373/// Validate that a key matches the entity's primary key type.
374fn validate_pk_key<K>(schema: &SchemaInfo, model: &EntityModel, key: &K) -> Result<(), PlanError>
375where
376    K: FieldValue,
377{
378    let field = model.primary_key.name;
379
380    let field_type = schema
381        .field(field)
382        .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
383            field: field.to_string(),
384        })?;
385
386    if !is_key_compatible(field_type) {
387        return Err(PlanError::PrimaryKeyUnsupported {
388            field: field.to_string(),
389        });
390    }
391
392    let value = key.to_value();
393    if !predicate::validate::literal_matches_type(&value, field_type) {
394        return Err(PlanError::PrimaryKeyMismatch {
395            field: field.to_string(),
396            key: value,
397        });
398    }
399
400    Ok(())
401}
402
403// Validate that a model-level key value matches the entity's primary key type.
404fn validate_pk_value(
405    schema: &SchemaInfo,
406    model: &EntityModel,
407    key: &Value,
408) -> Result<(), PlanError> {
409    let field = model.primary_key.name;
410
411    let field_type = schema
412        .field(field)
413        .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
414            field: field.to_string(),
415        })?;
416
417    if !is_key_compatible(field_type) {
418        return Err(PlanError::PrimaryKeyUnsupported {
419            field: field.to_string(),
420        });
421    }
422
423    if !predicate::validate::literal_matches_type(key, field_type) {
424        return Err(PlanError::PrimaryKeyMismatch {
425            field: field.to_string(),
426            key: key.clone(),
427        });
428    }
429
430    Ok(())
431}
432
433/// Validate that an index prefix is valid for execution.
434fn validate_index_prefix(
435    schema: &SchemaInfo,
436    model: &EntityModel,
437    index: &IndexModel,
438    values: &[Value],
439) -> Result<(), PlanError> {
440    if !model.indexes.contains(&index) {
441        return Err(PlanError::IndexNotFound { index: *index });
442    }
443
444    if values.is_empty() {
445        return Err(PlanError::IndexPrefixEmpty);
446    }
447
448    if values.len() > index.fields.len() {
449        return Err(PlanError::IndexPrefixTooLong {
450            prefix_len: values.len(),
451            field_len: index.fields.len(),
452        });
453    }
454
455    for (field, value) in index.fields.iter().zip(values.iter()) {
456        let field_type =
457            schema
458                .field(field)
459                .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
460                    field: field.to_string(),
461                })?;
462
463        if !predicate::validate::literal_matches_type(value, field_type) {
464            return Err(PlanError::IndexPrefixValueMismatch {
465                field: field.to_string(),
466            });
467        }
468    }
469
470    Ok(())
471}
472
473/// Map scalar field types to compatible key variants.
474///
475/// Non-scalar and unsupported field types are intentionally excluded.
476const fn is_key_compatible(field_type: &FieldType) -> bool {
477    matches!(
478        field_type,
479        FieldType::Scalar(
480            ScalarType::Account
481                | ScalarType::Int
482                | ScalarType::Principal
483                | ScalarType::Subaccount
484                | ScalarType::Timestamp
485                | ScalarType::Uint
486                | ScalarType::Ulid
487                | ScalarType::Unit
488        )
489    )
490}
491
492/*
493///
494/// TESTS
495///
496
497#[cfg(test)]
498mod tests {
499    use super::{PlanError, validate_plan_with_model, validate_plan_with_schema_info};
500    use crate::{
501        db::{
502            query::{
503                plan::{AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec},
504                predicate::{SchemaInfo, ValidateError},
505            },
506            store::StorageKey,
507        },
508        model::{
509            entity::EntityModel,
510            field::{EntityFieldKind, EntityFieldModel},
511            index::IndexModel,
512        },
513        types::{Ref, Ulid},
514        value::Value,
515    };
516
517    fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
518        EntityFieldModel { name, kind }
519    }
520
521    fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
522        let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
523        let primary_key = &fields[pk_index];
524        let indexes: &'static [&'static IndexModel] = &[];
525
526        EntityModel {
527            path: "test::Entity",
528            entity_name: "TestEntity",
529            primary_key,
530            fields,
531            indexes,
532        }
533    }
534
535    #[test]
536    fn model_rejects_missing_primary_key() {
537        let fields: &'static [EntityFieldModel] =
538            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
539        let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
540
541        let model = EntityModel {
542            path: "test::Entity",
543            entity_name: "TestEntity",
544            primary_key: missing_pk,
545            fields,
546            indexes: &[],
547        };
548
549        assert!(matches!(
550            SchemaInfo::from_entity_model(&model),
551            Err(ValidateError::InvalidPrimaryKey { .. })
552        ));
553    }
554
555    #[test]
556    fn model_rejects_duplicate_fields() {
557        let model = model_with_fields(
558            vec![
559                field("dup", EntityFieldKind::Text),
560                field("dup", EntityFieldKind::Text),
561            ],
562            0,
563        );
564
565        assert!(matches!(
566            SchemaInfo::from_entity_model(&model),
567            Err(ValidateError::DuplicateField { .. })
568        ));
569    }
570
571    #[test]
572    fn model_rejects_invalid_primary_key_type() {
573        let model = model_with_fields(
574            vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
575            0,
576        );
577
578        assert!(matches!(
579            SchemaInfo::from_entity_model(&model),
580            Err(ValidateError::InvalidPrimaryKeyType { .. })
581        ));
582    }
583
584    #[test]
585    fn model_rejects_index_unknown_field() {
586        const INDEX_FIELDS: [&str; 1] = ["missing"];
587        const INDEX_MODEL: IndexModel = IndexModel::new(
588            "test::idx_missing",
589            "test::IndexStore",
590            &INDEX_FIELDS,
591            false,
592        );
593        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
594
595        let fields: &'static [EntityFieldModel] =
596            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
597        let model = EntityModel {
598            path: "test::Entity",
599            entity_name: "TestEntity",
600            primary_key: &fields[0],
601            fields,
602            indexes: &INDEXES,
603        };
604
605        assert!(matches!(
606            SchemaInfo::from_entity_model(&model),
607            Err(ValidateError::IndexFieldUnknown { .. })
608        ));
609    }
610
611    #[test]
612    fn model_rejects_index_unsupported_field() {
613        const INDEX_FIELDS: [&str; 1] = ["broken"];
614        const INDEX_MODEL: IndexModel =
615            IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
616        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
617
618        let fields: &'static [EntityFieldModel] = Box::leak(
619            vec![
620                field("id", EntityFieldKind::Ulid),
621                field("broken", EntityFieldKind::Unsupported),
622            ]
623            .into_boxed_slice(),
624        );
625        let model = EntityModel {
626            path: "test::Entity",
627            entity_name: "TestEntity",
628            primary_key: &fields[0],
629            fields,
630            indexes: &INDEXES,
631        };
632
633        assert!(matches!(
634            SchemaInfo::from_entity_model(&model),
635            Err(ValidateError::IndexFieldUnsupported { .. })
636        ));
637    }
638
639    #[test]
640    fn model_rejects_duplicate_index_names() {
641        const INDEX_FIELDS_A: [&str; 1] = ["id"];
642        const INDEX_FIELDS_B: [&str; 1] = ["other"];
643        const INDEX_A: IndexModel = IndexModel::new(
644            "test::dup_index",
645            "test::IndexStore",
646            &INDEX_FIELDS_A,
647            false,
648        );
649        const INDEX_B: IndexModel = IndexModel::new(
650            "test::dup_index",
651            "test::IndexStore",
652            &INDEX_FIELDS_B,
653            false,
654        );
655        const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
656
657        let fields: &'static [EntityFieldModel] = Box::leak(
658            vec![
659                field("id", EntityFieldKind::Ulid),
660                field("other", EntityFieldKind::Text),
661            ]
662            .into_boxed_slice(),
663        );
664        let model = EntityModel {
665            path: "test::Entity",
666            entity_name: "TestEntity",
667            primary_key: &fields[0],
668            fields,
669            indexes: &INDEXES,
670        };
671
672        assert!(matches!(
673            SchemaInfo::from_entity_model(&model),
674            Err(ValidateError::DuplicateIndexName { .. })
675        ));
676    }
677
678    #[test]
679    fn plan_rejects_unorderable_field() {
680        let model = model_with_fields(
681            vec![
682                field("id", EntityFieldKind::Ulid),
683                field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
684            ],
685            0,
686        );
687
688        let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
689        let plan: LogicalPlan<Ref<PlannerEntity>> = LogicalPlan {
690            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
691            access: AccessPlan::Path(AccessPath::FullScan),
692            predicate: None,
693            order: Some(OrderSpec {
694                fields: vec![("tags".to_string(), OrderDirection::Asc)],
695            }),
696            delete_limit: None,
697            page: None,
698            consistency: crate::db::query::ReadConsistency::MissingOk,
699        };
700
701        let err =
702            validate_plan_with_schema_info(&schema, &model, &plan).expect_err("unorderable field");
703        assert!(matches!(err, PlanError::UnorderableField { .. }));
704    }
705
706    #[test]
707    fn plan_rejects_index_prefix_too_long() {
708        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
709        let plan: LogicalPlan<Ref<PlannerEntity>> = LogicalPlan {
710            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
711            access: AccessPlan::Path(AccessPath::IndexPrefix {
712                index: *PlannerEntity::INDEXES[0],
713                values: vec![
714                    Value::Text("a".to_string()),
715                    Value::Text("b".to_string()),
716                    Value::Text("c".to_string()),
717                ],
718            }),
719            predicate: None,
720            order: None,
721            delete_limit: None,
722            page: None,
723            consistency: crate::db::query::ReadConsistency::MissingOk,
724        };
725
726        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
727            .expect_err("index prefix too long");
728        assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
729    }
730
731    #[test]
732    fn plan_rejects_empty_index_prefix() {
733        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
734        let plan: LogicalPlan<Ref<PlannerEntity>> = LogicalPlan {
735            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
736            access: AccessPlan::Path(AccessPath::IndexPrefix {
737                index: *PlannerEntity::INDEXES[0],
738                values: vec![],
739            }),
740            predicate: None,
741            order: None,
742            delete_limit: None,
743            page: None,
744            consistency: crate::db::query::ReadConsistency::MissingOk,
745        };
746
747        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
748            .expect_err("index prefix empty");
749        assert!(matches!(err, PlanError::IndexPrefixEmpty));
750    }
751
752    #[test]
753    fn plan_accepts_model_based_validation() {
754        let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
755        let plan: LogicalPlan<Ref<PlannerEntity>> = LogicalPlan {
756            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
757            access: AccessPlan::Path(AccessPath::ByKey(Ref::new(Ulid::nil()))),
758            predicate: None,
759            order: None,
760            delete_limit: None,
761            page: None,
762            consistency: crate::db::query::ReadConsistency::MissingOk,
763        };
764
765        validate_plan_with_model(&plan, &model).expect("valid plan");
766    }
767}
768*/