Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Executor-ready plan validation against a concrete entity schema.
2use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
3use crate::{
4    db::query::predicate::{
5        self, SchemaInfo,
6        validate::{FieldType, ScalarType},
7    },
8    error::{ErrorClass, ErrorOrigin, InternalError},
9    key::Key,
10    model::entity::EntityModel,
11    model::index::IndexModel,
12    traits::EntityKind,
13    value::Value,
14};
15use thiserror::Error as ThisError;
16
17///
18/// PlanError
19///
20/// Executor-visible validation failures for logical plans.
21///
22/// These errors indicate that a plan cannot be safely executed against the
23/// current schema or entity definition. They are *not* planner bugs.
24///
25
26#[derive(Debug, ThisError)]
27pub enum PlanError {
28    /// Predicate failed schema-level validation.
29    #[error("predicate validation failed: {0}")]
30    PredicateInvalid(#[from] predicate::ValidateError),
31
32    /// ORDER BY references an unknown field.
33    #[error("unknown order field '{field}'")]
34    UnknownOrderField { field: String },
35
36    /// ORDER BY references a field that cannot be ordered.
37    #[error("order field '{field}' is not orderable")]
38    UnorderableField { field: String },
39
40    /// Access plan references an index not declared on the entity.
41    #[error("index '{index}' not found on entity")]
42    IndexNotFound { index: IndexModel },
43
44    /// Index prefix exceeds the number of indexed fields.
45    #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
46    IndexPrefixTooLong { prefix_len: usize, field_len: usize },
47
48    /// Index prefix must include at least one value.
49    #[error("index prefix must include at least one value")]
50    IndexPrefixEmpty,
51
52    /// Index prefix literal does not match indexed field type.
53    #[error("index prefix value for field '{field}' is incompatible")]
54    IndexPrefixValueMismatch { field: String },
55
56    /// Primary key field exists but is not key-compatible.
57    #[error("primary key field '{field}' is not key-compatible")]
58    PrimaryKeyUnsupported { field: String },
59
60    /// Supplied key does not match the primary key type.
61    #[error("key '{key}' is incompatible with primary key '{field}'")]
62    PrimaryKeyMismatch { field: String, key: Key },
63
64    /// Key range has invalid ordering.
65    #[error("key range start is greater than end")]
66    InvalidKeyRange,
67
68    /// ORDER BY must specify at least one field.
69    #[error("order specification must include at least one field")]
70    EmptyOrderSpec,
71
72    /// Delete plans must not carry pagination.
73    #[error("delete plans must not include pagination")]
74    DeletePlanWithPagination,
75
76    /// Load plans must not carry delete limits.
77    #[error("load plans must not include delete limits")]
78    LoadPlanWithDeleteLimit,
79
80    /// Delete limits require an explicit ordering.
81    #[error("delete limit requires an explicit ordering")]
82    DeleteLimitRequiresOrder,
83}
84
85/// Validate a logical plan using a prebuilt schema surface.
86#[cfg(test)]
87pub(crate) fn validate_plan_with_schema_info(
88    schema: &SchemaInfo,
89    model: &EntityModel,
90    plan: &LogicalPlan,
91) -> Result<(), PlanError> {
92    validate_logical_plan(schema, model, plan)
93}
94
95/// Validate a logical plan against the runtime entity model.
96///
97/// This is the executor-safe entrypoint and must not consult global schema.
98#[cfg(test)]
99pub(crate) fn validate_plan_with_model(
100    plan: &LogicalPlan,
101    model: &EntityModel,
102) -> Result<(), PlanError> {
103    let schema = SchemaInfo::from_entity_model(model)?;
104    validate_plan_with_schema_info(&schema, model, plan)
105}
106
107/// Validate a logical plan against schema and plan-level invariants.
108pub(crate) fn validate_logical_plan(
109    schema: &SchemaInfo,
110    model: &EntityModel,
111    plan: &LogicalPlan,
112) -> Result<(), PlanError> {
113    if let Some(predicate) = &plan.predicate {
114        predicate::validate(schema, predicate)?;
115    }
116
117    if let Some(order) = &plan.order {
118        validate_order(schema, order)?;
119    }
120
121    validate_access_plan(schema, model, &plan.access)?;
122    validate_plan_semantics(plan)?;
123
124    Ok(())
125}
126
127/// Validate plan-level invariants not covered by schema checks.
128fn validate_plan_semantics(plan: &LogicalPlan) -> Result<(), PlanError> {
129    if let Some(order) = &plan.order
130        && order.fields.is_empty()
131    {
132        return Err(PlanError::EmptyOrderSpec);
133    }
134
135    if plan.mode.is_delete() {
136        if plan.page.is_some() {
137            return Err(PlanError::DeletePlanWithPagination);
138        }
139
140        if plan.delete_limit.is_some()
141            && plan
142                .order
143                .as_ref()
144                .is_none_or(|order| order.fields.is_empty())
145        {
146            return Err(PlanError::DeleteLimitRequiresOrder);
147        }
148    }
149
150    if plan.mode.is_load() && plan.delete_limit.is_some() {
151        return Err(PlanError::LoadPlanWithDeleteLimit);
152    }
153
154    Ok(())
155}
156
157/// Validate plans at executor boundaries and surface invariant violations.
158pub(crate) fn validate_executor_plan<E: EntityKind>(
159    plan: &LogicalPlan,
160) -> Result<(), InternalError> {
161    let schema = SchemaInfo::from_entity_model(E::MODEL).map_err(|err| {
162        InternalError::new(
163            ErrorClass::InvariantViolation,
164            ErrorOrigin::Query,
165            format!("entity schema invalid for {}: {err}", E::PATH),
166        )
167    })?;
168
169    if let Some(predicate) = &plan.predicate {
170        predicate::validate(&schema, predicate).map_err(|err| {
171            InternalError::new(
172                ErrorClass::InvariantViolation,
173                ErrorOrigin::Query,
174                err.to_string(),
175            )
176        })?;
177    }
178
179    if let Some(order) = &plan.order {
180        validate_executor_order(&schema, order).map_err(|err| {
181            InternalError::new(
182                ErrorClass::InvariantViolation,
183                ErrorOrigin::Query,
184                err.to_string(),
185            )
186        })?;
187    }
188
189    validate_access_plan(&schema, E::MODEL, &plan.access).map_err(|err| {
190        InternalError::new(
191            ErrorClass::InvariantViolation,
192            ErrorOrigin::Query,
193            err.to_string(),
194        )
195    })?;
196    validate_plan_semantics(plan).map_err(|err| {
197        InternalError::new(
198            ErrorClass::InvariantViolation,
199            ErrorOrigin::Query,
200            err.to_string(),
201        )
202    })?;
203
204    Ok(())
205}
206
207/// Validate ORDER BY fields against the schema.
208pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
209    for (field, _) in &order.fields {
210        let field_type = schema
211            .field(field)
212            .ok_or_else(|| PlanError::UnknownOrderField {
213                field: field.clone(),
214            })?;
215
216        if !field_type.is_orderable() {
217            // CONTRACT: ORDER BY rejects unsupported or unordered fields.
218            return Err(PlanError::UnorderableField {
219                field: field.clone(),
220            });
221        }
222    }
223
224    Ok(())
225}
226
227/// Validate ORDER BY fields for executor-only plans.
228///
229/// CONTRACT: executor ordering validation matches planner rules.
230fn validate_executor_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
231    validate_order(schema, order)
232}
233
234/// Validate executor-visible access paths.
235///
236/// This ensures keys, ranges, and index prefixes are schema-compatible.
237pub(crate) fn validate_access_plan(
238    schema: &SchemaInfo,
239    model: &EntityModel,
240    access: &AccessPlan,
241) -> Result<(), PlanError> {
242    match access {
243        AccessPlan::Path(path) => validate_access_path(schema, model, path),
244        AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
245            for child in children {
246                validate_access_plan(schema, model, child)?;
247            }
248            Ok(())
249        }
250    }
251}
252
253fn validate_access_path(
254    schema: &SchemaInfo,
255    model: &EntityModel,
256    access: &AccessPath,
257) -> Result<(), PlanError> {
258    match access {
259        AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
260        AccessPath::ByKeys(keys) => {
261            // Empty key lists are a valid no-op.
262            if keys.is_empty() {
263                return Ok(());
264            }
265            for key in keys {
266                validate_pk_key(schema, model, key)?;
267            }
268            Ok(())
269        }
270        AccessPath::KeyRange { start, end } => {
271            validate_pk_key(schema, model, start)?;
272            validate_pk_key(schema, model, end)?;
273            if start > end {
274                return Err(PlanError::InvalidKeyRange);
275            }
276            Ok(())
277        }
278        AccessPath::IndexPrefix { index, values } => {
279            validate_index_prefix(schema, model, index, values)
280        }
281        AccessPath::FullScan => Ok(()),
282    }
283}
284
285///
286/// TESTS
287///
288
289#[cfg(test)]
290mod tests {
291    use super::{PlanError, validate_plan_with_model, validate_plan_with_schema_info};
292    use crate::{
293        db::query::{
294            plan::{
295                AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec,
296                planner::PlannerEntity,
297            },
298            predicate::{SchemaInfo, ValidateError},
299        },
300        key::Key,
301        model::{
302            entity::EntityModel,
303            field::{EntityFieldKind, EntityFieldModel},
304            index::IndexModel,
305        },
306        traits::EntityKind,
307        types::Ulid,
308        value::Value,
309    };
310
311    fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
312        EntityFieldModel { name, kind }
313    }
314
315    fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
316        let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
317        let primary_key = &fields[pk_index];
318        let indexes: &'static [&'static IndexModel] = &[];
319
320        EntityModel {
321            path: "test::Entity",
322            entity_name: "TestEntity",
323            primary_key,
324            fields,
325            indexes,
326        }
327    }
328
329    #[test]
330    fn model_rejects_missing_primary_key() {
331        let fields: &'static [EntityFieldModel] =
332            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
333        let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
334
335        let model = EntityModel {
336            path: "test::Entity",
337            entity_name: "TestEntity",
338            primary_key: missing_pk,
339            fields,
340            indexes: &[],
341        };
342
343        assert!(matches!(
344            SchemaInfo::from_entity_model(&model),
345            Err(ValidateError::InvalidPrimaryKey { .. })
346        ));
347    }
348
349    #[test]
350    fn model_rejects_duplicate_fields() {
351        let model = model_with_fields(
352            vec![
353                field("dup", EntityFieldKind::Text),
354                field("dup", EntityFieldKind::Text),
355            ],
356            0,
357        );
358
359        assert!(matches!(
360            SchemaInfo::from_entity_model(&model),
361            Err(ValidateError::DuplicateField { .. })
362        ));
363    }
364
365    #[test]
366    fn model_rejects_invalid_primary_key_type() {
367        let model = model_with_fields(
368            vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
369            0,
370        );
371
372        assert!(matches!(
373            SchemaInfo::from_entity_model(&model),
374            Err(ValidateError::InvalidPrimaryKeyType { .. })
375        ));
376    }
377
378    #[test]
379    fn model_rejects_index_unknown_field() {
380        const INDEX_FIELDS: [&str; 1] = ["missing"];
381        const INDEX_MODEL: IndexModel = IndexModel::new(
382            "test::idx_missing",
383            "test::IndexStore",
384            &INDEX_FIELDS,
385            false,
386        );
387        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
388
389        let fields: &'static [EntityFieldModel] =
390            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
391        let model = EntityModel {
392            path: "test::Entity",
393            entity_name: "TestEntity",
394            primary_key: &fields[0],
395            fields,
396            indexes: &INDEXES,
397        };
398
399        assert!(matches!(
400            SchemaInfo::from_entity_model(&model),
401            Err(ValidateError::IndexFieldUnknown { .. })
402        ));
403    }
404
405    #[test]
406    fn model_rejects_index_unsupported_field() {
407        const INDEX_FIELDS: [&str; 1] = ["broken"];
408        const INDEX_MODEL: IndexModel =
409            IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
410        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
411
412        let fields: &'static [EntityFieldModel] = Box::leak(
413            vec![
414                field("id", EntityFieldKind::Ulid),
415                field("broken", EntityFieldKind::Unsupported),
416            ]
417            .into_boxed_slice(),
418        );
419        let model = EntityModel {
420            path: "test::Entity",
421            entity_name: "TestEntity",
422            primary_key: &fields[0],
423            fields,
424            indexes: &INDEXES,
425        };
426
427        assert!(matches!(
428            SchemaInfo::from_entity_model(&model),
429            Err(ValidateError::IndexFieldUnsupported { .. })
430        ));
431    }
432
433    #[test]
434    fn model_rejects_duplicate_index_names() {
435        const INDEX_FIELDS_A: [&str; 1] = ["id"];
436        const INDEX_FIELDS_B: [&str; 1] = ["other"];
437        const INDEX_A: IndexModel = IndexModel::new(
438            "test::dup_index",
439            "test::IndexStore",
440            &INDEX_FIELDS_A,
441            false,
442        );
443        const INDEX_B: IndexModel = IndexModel::new(
444            "test::dup_index",
445            "test::IndexStore",
446            &INDEX_FIELDS_B,
447            false,
448        );
449        const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
450
451        let fields: &'static [EntityFieldModel] = Box::leak(
452            vec![
453                field("id", EntityFieldKind::Ulid),
454                field("other", EntityFieldKind::Text),
455            ]
456            .into_boxed_slice(),
457        );
458        let model = EntityModel {
459            path: "test::Entity",
460            entity_name: "TestEntity",
461            primary_key: &fields[0],
462            fields,
463            indexes: &INDEXES,
464        };
465
466        assert!(matches!(
467            SchemaInfo::from_entity_model(&model),
468            Err(ValidateError::DuplicateIndexName { .. })
469        ));
470    }
471
472    #[test]
473    fn plan_rejects_unorderable_field() {
474        let model = model_with_fields(
475            vec![
476                field("id", EntityFieldKind::Ulid),
477                field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
478            ],
479            0,
480        );
481
482        let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
483        let plan = LogicalPlan {
484            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
485            access: AccessPlan::Path(AccessPath::FullScan),
486            predicate: None,
487            order: Some(OrderSpec {
488                fields: vec![("tags".to_string(), OrderDirection::Asc)],
489            }),
490            delete_limit: None,
491            page: None,
492            consistency: crate::db::query::ReadConsistency::MissingOk,
493        };
494
495        let err =
496            validate_plan_with_schema_info(&schema, &model, &plan).expect_err("unorderable field");
497        assert!(matches!(err, PlanError::UnorderableField { .. }));
498    }
499
500    #[test]
501    fn plan_rejects_index_prefix_too_long() {
502        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
503        let plan = LogicalPlan {
504            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
505            access: AccessPlan::Path(AccessPath::IndexPrefix {
506                index: *PlannerEntity::INDEXES[0],
507                values: vec![
508                    Value::Text("a".to_string()),
509                    Value::Text("b".to_string()),
510                    Value::Text("c".to_string()),
511                ],
512            }),
513            predicate: None,
514            order: None,
515            delete_limit: None,
516            page: None,
517            consistency: crate::db::query::ReadConsistency::MissingOk,
518        };
519
520        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
521            .expect_err("index prefix too long");
522        assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
523    }
524
525    #[test]
526    fn plan_rejects_empty_index_prefix() {
527        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
528        let plan = LogicalPlan {
529            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
530            access: AccessPlan::Path(AccessPath::IndexPrefix {
531                index: *PlannerEntity::INDEXES[0],
532                values: vec![],
533            }),
534            predicate: None,
535            order: None,
536            delete_limit: None,
537            page: None,
538            consistency: crate::db::query::ReadConsistency::MissingOk,
539        };
540
541        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
542            .expect_err("index prefix empty");
543        assert!(matches!(err, PlanError::IndexPrefixEmpty));
544    }
545
546    #[test]
547    fn plan_accepts_model_based_validation() {
548        let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
549        let plan = LogicalPlan {
550            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
551            access: AccessPlan::Path(AccessPath::ByKey(Key::Ulid(Ulid::nil()))),
552            predicate: None,
553            order: None,
554            delete_limit: None,
555            page: None,
556            consistency: crate::db::query::ReadConsistency::MissingOk,
557        };
558
559        validate_plan_with_model(&plan, &model).expect("valid plan");
560    }
561}
562
563/// Validate that a key matches the entity's primary key type.
564fn validate_pk_key(schema: &SchemaInfo, model: &EntityModel, key: &Key) -> Result<(), PlanError> {
565    let field = model.primary_key.name;
566
567    let field_type = schema
568        .field(field)
569        .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
570            field: field.to_string(),
571        })?;
572
573    let expected =
574        key_type_for_field(field_type).ok_or_else(|| PlanError::PrimaryKeyUnsupported {
575            field: field.to_string(),
576        })?;
577
578    if key_variant(key) != expected {
579        return Err(PlanError::PrimaryKeyMismatch {
580            field: field.to_string(),
581            key: *key,
582        });
583    }
584
585    Ok(())
586}
587
588/// Validate that an index prefix is valid for execution.
589fn validate_index_prefix(
590    schema: &SchemaInfo,
591    model: &EntityModel,
592    index: &IndexModel,
593    values: &[Value],
594) -> Result<(), PlanError> {
595    if !model.indexes.contains(&index) {
596        return Err(PlanError::IndexNotFound { index: *index });
597    }
598
599    if values.is_empty() {
600        return Err(PlanError::IndexPrefixEmpty);
601    }
602
603    if values.len() > index.fields.len() {
604        return Err(PlanError::IndexPrefixTooLong {
605            prefix_len: values.len(),
606            field_len: index.fields.len(),
607        });
608    }
609
610    for (field, value) in index.fields.iter().zip(values.iter()) {
611        let field_type =
612            schema
613                .field(field)
614                .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
615                    field: field.to_string(),
616                })?;
617
618        if !predicate::validate::literal_matches_type(value, field_type) {
619            return Err(PlanError::IndexPrefixValueMismatch {
620                field: field.to_string(),
621            });
622        }
623    }
624
625    Ok(())
626}
627
628/// Internal classification of primary-key-compatible value variants.
629///
630/// This exists purely to decouple `Key` from `FieldType`.
631#[derive(Clone, Copy, Debug, Eq, PartialEq)]
632enum KeyVariant {
633    Account,
634    Int,
635    Principal,
636    Subaccount,
637    Timestamp,
638    Uint,
639    Ulid,
640    Unit,
641}
642
643const fn key_variant(key: &Key) -> KeyVariant {
644    match key {
645        Key::Account(_) => KeyVariant::Account,
646        Key::Int(_) => KeyVariant::Int,
647        Key::Principal(_) => KeyVariant::Principal,
648        Key::Subaccount(_) => KeyVariant::Subaccount,
649        Key::Timestamp(_) => KeyVariant::Timestamp,
650        Key::Uint(_) => KeyVariant::Uint,
651        Key::Ulid(_) => KeyVariant::Ulid,
652        Key::Unit => KeyVariant::Unit,
653    }
654}
655
656/// Map scalar field types to compatible key variants.
657///
658/// Non-scalar and unsupported field types are intentionally excluded.
659const fn key_type_for_field(field_type: &FieldType) -> Option<KeyVariant> {
660    match field_type {
661        FieldType::Scalar(ScalarType::Account) => Some(KeyVariant::Account),
662        FieldType::Scalar(ScalarType::Int) => Some(KeyVariant::Int),
663        FieldType::Scalar(ScalarType::Principal) => Some(KeyVariant::Principal),
664        FieldType::Scalar(ScalarType::Subaccount) => Some(KeyVariant::Subaccount),
665        FieldType::Scalar(ScalarType::Timestamp) => Some(KeyVariant::Timestamp),
666        FieldType::Scalar(ScalarType::Uint) => Some(KeyVariant::Uint),
667        FieldType::Scalar(ScalarType::Ulid) => Some(KeyVariant::Ulid),
668        FieldType::Scalar(ScalarType::Unit) => Some(KeyVariant::Unit),
669        _ => None,
670    }
671}