Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Executor-ready plan validation against a concrete entity schema.
2use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
3use crate::{
4    db::query::predicate::{
5        self, SchemaInfo,
6        validate::{FieldType, ScalarType},
7    },
8    error::{ErrorClass, ErrorOrigin, InternalError},
9    key::Key,
10    model::entity::EntityModel,
11    model::index::IndexModel,
12    traits::EntityKind,
13    value::Value,
14};
15use thiserror::Error as ThisError;
16
17///
18/// PlanError
19///
20/// Executor-visible validation failures for logical plans.
21///
22/// These errors indicate that a plan cannot be safely executed against the
23/// current schema or entity definition. They are *not* planner bugs.
24///
25
26#[derive(Debug, ThisError)]
27pub enum PlanError {
28    /// Predicate failed schema-level validation.
29    #[error("predicate validation failed: {0}")]
30    PredicateInvalid(#[from] predicate::ValidateError),
31
32    /// ORDER BY references an unknown field.
33    #[error("unknown order field '{field}'")]
34    UnknownOrderField { field: String },
35
36    /// ORDER BY references a field that cannot be ordered.
37    #[error("order field '{field}' is not orderable")]
38    UnorderableField { field: String },
39
40    /// Access plan references an index not declared on the entity.
41    #[error("index '{index}' not found on entity")]
42    IndexNotFound { index: IndexModel },
43
44    /// Index prefix exceeds the number of indexed fields.
45    #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
46    IndexPrefixTooLong { prefix_len: usize, field_len: usize },
47
48    /// Index prefix must include at least one value.
49    #[error("index prefix must include at least one value")]
50    IndexPrefixEmpty,
51
52    /// Index prefix literal does not match indexed field type.
53    #[error("index prefix value for field '{field}' is incompatible")]
54    IndexPrefixValueMismatch { field: String },
55
56    /// Primary key field exists but is not key-compatible.
57    #[error("primary key field '{field}' is not key-compatible")]
58    PrimaryKeyUnsupported { field: String },
59
60    /// Supplied key does not match the primary key type.
61    #[error("key '{key}' is incompatible with primary key '{field}'")]
62    PrimaryKeyMismatch { field: String, key: Key },
63
64    /// Key range has invalid ordering.
65    #[error("key range start is greater than end")]
66    InvalidKeyRange,
67
68    /// ORDER BY must specify at least one field.
69    #[error("order specification must include at least one field")]
70    EmptyOrderSpec,
71
72    /// Delete plans must not carry pagination.
73    #[error("delete plans must not include pagination")]
74    DeletePlanWithPagination,
75
76    /// Load plans must not carry delete limits.
77    #[error("load plans must not include delete limits")]
78    LoadPlanWithDeleteLimit,
79
80    /// Delete limits require an explicit ordering.
81    #[error("delete limit requires an explicit ordering")]
82    DeleteLimitRequiresOrder,
83}
84
85/// Validate a logical plan using a prebuilt schema surface.
86#[cfg(test)]
87pub(crate) fn validate_plan_with_schema_info(
88    schema: &SchemaInfo,
89    model: &EntityModel,
90    plan: &LogicalPlan,
91) -> Result<(), PlanError> {
92    validate_logical_plan(schema, model, plan)
93}
94
95/// Validate a logical plan against the runtime entity model.
96///
97/// This is the executor-safe entrypoint and must not consult global schema.
98#[cfg(test)]
99pub(crate) fn validate_plan_with_model(
100    plan: &LogicalPlan,
101    model: &EntityModel,
102) -> Result<(), PlanError> {
103    let schema = SchemaInfo::from_entity_model(model)?;
104    validate_plan_with_schema_info(&schema, model, plan)
105}
106
107/// Validate a logical plan against schema and plan-level invariants.
108pub(crate) fn validate_logical_plan(
109    schema: &SchemaInfo,
110    model: &EntityModel,
111    plan: &LogicalPlan,
112) -> Result<(), PlanError> {
113    if let Some(predicate) = &plan.predicate {
114        predicate::validate(schema, predicate)?;
115    }
116
117    if let Some(order) = &plan.order {
118        validate_order(schema, order)?;
119    }
120
121    validate_access_plan(schema, model, &plan.access)?;
122    validate_plan_semantics(plan)?;
123
124    Ok(())
125}
126
127/// Validate plan-level invariants not covered by schema checks.
128fn validate_plan_semantics(plan: &LogicalPlan) -> Result<(), PlanError> {
129    if let Some(order) = &plan.order
130        && order.fields.is_empty()
131    {
132        return Err(PlanError::EmptyOrderSpec);
133    }
134
135    if plan.mode.is_delete() {
136        if plan.page.is_some() {
137            return Err(PlanError::DeletePlanWithPagination);
138        }
139
140        if plan.delete_limit.is_some()
141            && plan
142                .order
143                .as_ref()
144                .is_none_or(|order| order.fields.is_empty())
145        {
146            return Err(PlanError::DeleteLimitRequiresOrder);
147        }
148    }
149
150    if plan.mode.is_load() && plan.delete_limit.is_some() {
151        return Err(PlanError::LoadPlanWithDeleteLimit);
152    }
153
154    Ok(())
155}
156
157/// Validate plans at executor boundaries and surface invariant violations.
158pub(crate) fn validate_executor_plan<E: EntityKind>(
159    plan: &LogicalPlan,
160) -> Result<(), InternalError> {
161    let schema = SchemaInfo::from_entity_model(E::MODEL).map_err(|err| {
162        InternalError::new(
163            ErrorClass::InvariantViolation,
164            ErrorOrigin::Query,
165            format!("entity schema invalid for {}: {err}", E::PATH),
166        )
167    })?;
168
169    if let Some(predicate) = &plan.predicate {
170        predicate::validate(&schema, predicate).map_err(|err| {
171            InternalError::new(
172                ErrorClass::InvariantViolation,
173                ErrorOrigin::Query,
174                err.to_string(),
175            )
176        })?;
177    }
178
179    if let Some(order) = &plan.order {
180        validate_executor_order(&schema, order).map_err(|err| {
181            InternalError::new(
182                ErrorClass::InvariantViolation,
183                ErrorOrigin::Query,
184                err.to_string(),
185            )
186        })?;
187    }
188
189    validate_access_plan(&schema, E::MODEL, &plan.access).map_err(|err| {
190        InternalError::new(
191            ErrorClass::InvariantViolation,
192            ErrorOrigin::Query,
193            err.to_string(),
194        )
195    })?;
196    validate_plan_semantics(plan).map_err(|err| {
197        InternalError::new(
198            ErrorClass::InvariantViolation,
199            ErrorOrigin::Query,
200            err.to_string(),
201        )
202    })?;
203
204    Ok(())
205}
206
207/// Validate ORDER BY fields against the schema.
208pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
209    for (field, _) in &order.fields {
210        let field_type = schema
211            .field(field)
212            .ok_or_else(|| PlanError::UnknownOrderField {
213                field: field.clone(),
214            })?;
215
216        if !field_type.is_orderable() {
217            // ORDER BY permits opaque / unsupported fields.
218            // Executor treats incomparable values as Ordering::Equal and preserves input order.
219            if matches!(field_type, FieldType::Unsupported) {
220                continue;
221            }
222
223            return Err(PlanError::UnorderableField {
224                field: field.clone(),
225            });
226        }
227    }
228
229    Ok(())
230}
231
232/// Validate ORDER BY fields for executor-only plans.
233///
234/// CONTRACT: executor ordering validation matches planner rules.
235fn validate_executor_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
236    validate_order(schema, order)
237}
238
239/// Validate executor-visible access paths.
240///
241/// This ensures keys, ranges, and index prefixes are schema-compatible.
242pub(crate) fn validate_access_plan(
243    schema: &SchemaInfo,
244    model: &EntityModel,
245    access: &AccessPlan,
246) -> Result<(), PlanError> {
247    match access {
248        AccessPlan::Path(path) => validate_access_path(schema, model, path),
249        AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
250            for child in children {
251                validate_access_plan(schema, model, child)?;
252            }
253            Ok(())
254        }
255    }
256}
257
258fn validate_access_path(
259    schema: &SchemaInfo,
260    model: &EntityModel,
261    access: &AccessPath,
262) -> Result<(), PlanError> {
263    match access {
264        AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
265        AccessPath::ByKeys(keys) => {
266            // Empty key lists are a valid no-op.
267            if keys.is_empty() {
268                return Ok(());
269            }
270            for key in keys {
271                validate_pk_key(schema, model, key)?;
272            }
273            Ok(())
274        }
275        AccessPath::KeyRange { start, end } => {
276            validate_pk_key(schema, model, start)?;
277            validate_pk_key(schema, model, end)?;
278            if start > end {
279                return Err(PlanError::InvalidKeyRange);
280            }
281            Ok(())
282        }
283        AccessPath::IndexPrefix { index, values } => {
284            validate_index_prefix(schema, model, index, values)
285        }
286        AccessPath::FullScan => Ok(()),
287    }
288}
289
290///
291/// TESTS
292///
293
294#[cfg(test)]
295mod tests {
296    use super::{PlanError, validate_plan_with_model, validate_plan_with_schema_info};
297    use crate::{
298        db::query::{
299            plan::{
300                AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec,
301                planner::PlannerEntity,
302            },
303            predicate::{SchemaInfo, ValidateError},
304        },
305        key::Key,
306        model::{
307            entity::EntityModel,
308            field::{EntityFieldKind, EntityFieldModel},
309            index::IndexModel,
310        },
311        traits::EntityKind,
312        types::Ulid,
313        value::Value,
314    };
315
316    fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
317        EntityFieldModel { name, kind }
318    }
319
320    fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
321        let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
322        let primary_key = &fields[pk_index];
323        let indexes: &'static [&'static IndexModel] = &[];
324
325        EntityModel {
326            path: "test::Entity",
327            entity_name: "TestEntity",
328            primary_key,
329            fields,
330            indexes,
331        }
332    }
333
334    #[test]
335    fn model_rejects_missing_primary_key() {
336        let fields: &'static [EntityFieldModel] =
337            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
338        let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
339
340        let model = EntityModel {
341            path: "test::Entity",
342            entity_name: "TestEntity",
343            primary_key: missing_pk,
344            fields,
345            indexes: &[],
346        };
347
348        assert!(matches!(
349            SchemaInfo::from_entity_model(&model),
350            Err(ValidateError::InvalidPrimaryKey { .. })
351        ));
352    }
353
354    #[test]
355    fn model_rejects_duplicate_fields() {
356        let model = model_with_fields(
357            vec![
358                field("dup", EntityFieldKind::Text),
359                field("dup", EntityFieldKind::Text),
360            ],
361            0,
362        );
363
364        assert!(matches!(
365            SchemaInfo::from_entity_model(&model),
366            Err(ValidateError::DuplicateField { .. })
367        ));
368    }
369
370    #[test]
371    fn model_rejects_invalid_primary_key_type() {
372        let model = model_with_fields(
373            vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
374            0,
375        );
376
377        assert!(matches!(
378            SchemaInfo::from_entity_model(&model),
379            Err(ValidateError::InvalidPrimaryKeyType { .. })
380        ));
381    }
382
383    #[test]
384    fn model_rejects_index_unknown_field() {
385        const INDEX_FIELDS: [&str; 1] = ["missing"];
386        const INDEX_MODEL: IndexModel = IndexModel::new(
387            "test::idx_missing",
388            "test::IndexStore",
389            &INDEX_FIELDS,
390            false,
391        );
392        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
393
394        let fields: &'static [EntityFieldModel] =
395            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
396        let model = EntityModel {
397            path: "test::Entity",
398            entity_name: "TestEntity",
399            primary_key: &fields[0],
400            fields,
401            indexes: &INDEXES,
402        };
403
404        assert!(matches!(
405            SchemaInfo::from_entity_model(&model),
406            Err(ValidateError::IndexFieldUnknown { .. })
407        ));
408    }
409
410    #[test]
411    fn model_rejects_index_unsupported_field() {
412        const INDEX_FIELDS: [&str; 1] = ["broken"];
413        const INDEX_MODEL: IndexModel =
414            IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
415        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
416
417        let fields: &'static [EntityFieldModel] = Box::leak(
418            vec![
419                field("id", EntityFieldKind::Ulid),
420                field("broken", EntityFieldKind::Unsupported),
421            ]
422            .into_boxed_slice(),
423        );
424        let model = EntityModel {
425            path: "test::Entity",
426            entity_name: "TestEntity",
427            primary_key: &fields[0],
428            fields,
429            indexes: &INDEXES,
430        };
431
432        assert!(matches!(
433            SchemaInfo::from_entity_model(&model),
434            Err(ValidateError::IndexFieldUnsupported { .. })
435        ));
436    }
437
438    #[test]
439    fn model_rejects_duplicate_index_names() {
440        const INDEX_FIELDS_A: [&str; 1] = ["id"];
441        const INDEX_FIELDS_B: [&str; 1] = ["other"];
442        const INDEX_A: IndexModel = IndexModel::new(
443            "test::dup_index",
444            "test::IndexStore",
445            &INDEX_FIELDS_A,
446            false,
447        );
448        const INDEX_B: IndexModel = IndexModel::new(
449            "test::dup_index",
450            "test::IndexStore",
451            &INDEX_FIELDS_B,
452            false,
453        );
454        const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
455
456        let fields: &'static [EntityFieldModel] = Box::leak(
457            vec![
458                field("id", EntityFieldKind::Ulid),
459                field("other", EntityFieldKind::Text),
460            ]
461            .into_boxed_slice(),
462        );
463        let model = EntityModel {
464            path: "test::Entity",
465            entity_name: "TestEntity",
466            primary_key: &fields[0],
467            fields,
468            indexes: &INDEXES,
469        };
470
471        assert!(matches!(
472            SchemaInfo::from_entity_model(&model),
473            Err(ValidateError::DuplicateIndexName { .. })
474        ));
475    }
476
477    #[test]
478    fn plan_rejects_unorderable_field() {
479        let model = model_with_fields(
480            vec![
481                field("id", EntityFieldKind::Ulid),
482                field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
483            ],
484            0,
485        );
486
487        let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
488        let plan = LogicalPlan {
489            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
490            access: AccessPlan::Path(AccessPath::FullScan),
491            predicate: None,
492            order: Some(OrderSpec {
493                fields: vec![("tags".to_string(), OrderDirection::Asc)],
494            }),
495            delete_limit: None,
496            page: None,
497            consistency: crate::db::query::ReadConsistency::MissingOk,
498        };
499
500        let err =
501            validate_plan_with_schema_info(&schema, &model, &plan).expect_err("unorderable field");
502        assert!(matches!(err, PlanError::UnorderableField { .. }));
503    }
504
505    #[test]
506    fn plan_rejects_index_prefix_too_long() {
507        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
508        let plan = LogicalPlan {
509            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
510            access: AccessPlan::Path(AccessPath::IndexPrefix {
511                index: *PlannerEntity::INDEXES[0],
512                values: vec![
513                    Value::Text("a".to_string()),
514                    Value::Text("b".to_string()),
515                    Value::Text("c".to_string()),
516                ],
517            }),
518            predicate: None,
519            order: None,
520            delete_limit: None,
521            page: None,
522            consistency: crate::db::query::ReadConsistency::MissingOk,
523        };
524
525        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
526            .expect_err("index prefix too long");
527        assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
528    }
529
530    #[test]
531    fn plan_rejects_empty_index_prefix() {
532        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
533        let plan = LogicalPlan {
534            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
535            access: AccessPlan::Path(AccessPath::IndexPrefix {
536                index: *PlannerEntity::INDEXES[0],
537                values: vec![],
538            }),
539            predicate: None,
540            order: None,
541            delete_limit: None,
542            page: None,
543            consistency: crate::db::query::ReadConsistency::MissingOk,
544        };
545
546        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
547            .expect_err("index prefix empty");
548        assert!(matches!(err, PlanError::IndexPrefixEmpty));
549    }
550
551    #[test]
552    fn plan_accepts_model_based_validation() {
553        let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
554        let plan = LogicalPlan {
555            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
556            access: AccessPlan::Path(AccessPath::ByKey(Key::Ulid(Ulid::nil()))),
557            predicate: None,
558            order: None,
559            delete_limit: None,
560            page: None,
561            consistency: crate::db::query::ReadConsistency::MissingOk,
562        };
563
564        validate_plan_with_model(&plan, &model).expect("valid plan");
565    }
566}
567
568/// Validate that a key matches the entity's primary key type.
569fn validate_pk_key(schema: &SchemaInfo, model: &EntityModel, key: &Key) -> Result<(), PlanError> {
570    let field = model.primary_key.name;
571
572    let field_type = schema
573        .field(field)
574        .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
575            field: field.to_string(),
576        })?;
577
578    let expected =
579        key_type_for_field(field_type).ok_or_else(|| PlanError::PrimaryKeyUnsupported {
580            field: field.to_string(),
581        })?;
582
583    if key_variant(key) != expected {
584        return Err(PlanError::PrimaryKeyMismatch {
585            field: field.to_string(),
586            key: *key,
587        });
588    }
589
590    Ok(())
591}
592
593/// Validate that an index prefix is valid for execution.
594fn validate_index_prefix(
595    schema: &SchemaInfo,
596    model: &EntityModel,
597    index: &IndexModel,
598    values: &[Value],
599) -> Result<(), PlanError> {
600    if !model.indexes.contains(&index) {
601        return Err(PlanError::IndexNotFound { index: *index });
602    }
603
604    if values.is_empty() {
605        return Err(PlanError::IndexPrefixEmpty);
606    }
607
608    if values.len() > index.fields.len() {
609        return Err(PlanError::IndexPrefixTooLong {
610            prefix_len: values.len(),
611            field_len: index.fields.len(),
612        });
613    }
614
615    for (field, value) in index.fields.iter().zip(values.iter()) {
616        let field_type =
617            schema
618                .field(field)
619                .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
620                    field: field.to_string(),
621                })?;
622
623        if !predicate::validate::literal_matches_type(value, field_type) {
624            return Err(PlanError::IndexPrefixValueMismatch {
625                field: field.to_string(),
626            });
627        }
628    }
629
630    Ok(())
631}
632
633/// Internal classification of primary-key-compatible value variants.
634///
635/// This exists purely to decouple `Key` from `FieldType`.
636#[derive(Clone, Copy, Debug, Eq, PartialEq)]
637enum KeyVariant {
638    Account,
639    Int,
640    Principal,
641    Subaccount,
642    Timestamp,
643    Uint,
644    Ulid,
645    Unit,
646}
647
648const fn key_variant(key: &Key) -> KeyVariant {
649    match key {
650        Key::Account(_) => KeyVariant::Account,
651        Key::Int(_) => KeyVariant::Int,
652        Key::Principal(_) => KeyVariant::Principal,
653        Key::Subaccount(_) => KeyVariant::Subaccount,
654        Key::Timestamp(_) => KeyVariant::Timestamp,
655        Key::Uint(_) => KeyVariant::Uint,
656        Key::Ulid(_) => KeyVariant::Ulid,
657        Key::Unit => KeyVariant::Unit,
658    }
659}
660
661/// Map scalar field types to compatible key variants.
662///
663/// Non-scalar and unsupported field types are intentionally excluded.
664const fn key_type_for_field(field_type: &FieldType) -> Option<KeyVariant> {
665    match field_type {
666        FieldType::Scalar(ScalarType::Account) => Some(KeyVariant::Account),
667        FieldType::Scalar(ScalarType::Int) => Some(KeyVariant::Int),
668        FieldType::Scalar(ScalarType::Principal) => Some(KeyVariant::Principal),
669        FieldType::Scalar(ScalarType::Subaccount) => Some(KeyVariant::Subaccount),
670        FieldType::Scalar(ScalarType::Timestamp) => Some(KeyVariant::Timestamp),
671        FieldType::Scalar(ScalarType::Uint) => Some(KeyVariant::Uint),
672        FieldType::Scalar(ScalarType::Ulid) => Some(KeyVariant::Ulid),
673        FieldType::Scalar(ScalarType::Unit) => Some(KeyVariant::Unit),
674        _ => None,
675    }
676}