Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Executor-facing validation for logical plans.
2//!
3//! This module validates that a `LogicalPlan` is *safe for execution* against a
4//! concrete entity schema. It must not:
5//!   - infer planner semantics
6//!   - assert planner invariants
7//!   - normalize or rewrite plans
8//!
9//! Planner correctness is enforced elsewhere. This layer exists solely to
10//! protect the executor from malformed or schema-incompatible plans.
11
12use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
13use crate::{
14    db::query::predicate::{
15        self, SchemaInfo,
16        validate::{FieldType, ScalarType},
17    },
18    key::Key,
19    model::entity::EntityModel,
20    model::index::IndexModel,
21    value::Value,
22};
23use thiserror::Error as ThisError;
24
25/// Executor-visible validation failures for logical plans.
26///
27/// These errors indicate that a plan cannot be safely executed against the
28/// current schema or entity definition. They are *not* planner bugs.
29#[derive(Debug, ThisError)]
30pub enum PlanError {
31    /// Predicate failed schema-level validation.
32    #[error("predicate validation failed: {0}")]
33    PredicateInvalid(#[from] predicate::ValidateError),
34
35    /// ORDER BY references an unknown field.
36    #[error("unknown order field '{field}'")]
37    UnknownOrderField { field: String },
38
39    /// ORDER BY references a field that cannot be ordered.
40    #[error("order field '{field}' is not orderable")]
41    UnorderableField { field: String },
42
43    /// Access plan references an index not declared on the entity.
44    #[error("index '{index}' not found on entity")]
45    IndexNotFound { index: IndexModel },
46
47    /// Index prefix exceeds the number of indexed fields.
48    #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
49    IndexPrefixTooLong { prefix_len: usize, field_len: usize },
50
51    /// Index prefix literal does not match indexed field type.
52    #[error("index prefix value for field '{field}' is incompatible")]
53    IndexPrefixValueMismatch { field: String },
54
55    /// Primary key field exists but is not key-compatible.
56    #[error("primary key field '{field}' is not key-compatible")]
57    PrimaryKeyUnsupported { field: String },
58
59    /// Supplied key does not match the primary key type.
60    #[error("key '{key}' is incompatible with primary key '{field}'")]
61    PrimaryKeyMismatch { field: String, key: Key },
62
63    /// Key range has invalid ordering.
64    #[error("key range start is greater than end")]
65    InvalidKeyRange,
66}
67
68/// Validate a logical plan against the runtime entity model.
69///
70/// This is the executor-safe entrypoint and must not consult global schema.
71#[cfg(test)]
72pub(crate) fn validate_plan_with_model(
73    plan: &LogicalPlan,
74    model: &EntityModel,
75) -> Result<(), PlanError> {
76    let schema = SchemaInfo::from_entity_model(model)?;
77    validate_plan_with_schema_info(&schema, model, plan)
78}
79
80/// Validate a logical plan using a prebuilt schema surface.
81#[cfg(test)]
82pub(crate) fn validate_plan_with_schema_info(
83    schema: &SchemaInfo,
84    model: &EntityModel,
85    plan: &LogicalPlan,
86) -> Result<(), PlanError> {
87    if let Some(predicate) = &plan.predicate {
88        predicate::validate(schema, predicate)?;
89    }
90
91    if let Some(order) = &plan.order {
92        validate_order(schema, order)?;
93    }
94
95    validate_access_plan(schema, model, &plan.access)?;
96
97    Ok(())
98}
99
100impl LogicalPlan {}
101
102/// Validate ORDER BY fields against the schema.
103pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
104    for (field, _) in &order.fields {
105        let field_type = schema
106            .field(field)
107            .ok_or_else(|| PlanError::UnknownOrderField {
108                field: field.clone(),
109            })?;
110
111        if !field_type.is_orderable() {
112            return Err(PlanError::UnorderableField {
113                field: field.clone(),
114            });
115        }
116    }
117
118    Ok(())
119}
120
121/// Validate executor-visible access paths.
122///
123/// This ensures keys, ranges, and index prefixes are schema-compatible.
124pub(crate) fn validate_access_plan(
125    schema: &SchemaInfo,
126    model: &EntityModel,
127    access: &AccessPlan,
128) -> Result<(), PlanError> {
129    match access {
130        AccessPlan::Path(path) => validate_access_path(schema, model, path),
131        AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
132            for child in children {
133                validate_access_plan(schema, model, child)?;
134            }
135            Ok(())
136        }
137    }
138}
139
140fn validate_access_path(
141    schema: &SchemaInfo,
142    model: &EntityModel,
143    access: &AccessPath,
144) -> Result<(), PlanError> {
145    match access {
146        AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
147        AccessPath::ByKeys(keys) => {
148            for key in keys {
149                validate_pk_key(schema, model, key)?;
150            }
151            Ok(())
152        }
153        AccessPath::KeyRange { start, end } => {
154            validate_pk_key(schema, model, start)?;
155            validate_pk_key(schema, model, end)?;
156            if start > end {
157                return Err(PlanError::InvalidKeyRange);
158            }
159            Ok(())
160        }
161        AccessPath::IndexPrefix { index, values } => {
162            validate_index_prefix(schema, model, index, values)
163        }
164        AccessPath::FullScan => Ok(()),
165    }
166}
167
168///
169/// TESTS
170///
171
172#[cfg(test)]
173mod tests {
174    use super::{PlanError, validate_plan_with_model, validate_plan_with_schema_info};
175    use crate::{
176        db::query::{
177            plan::{
178                AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec,
179                planner::PlannerEntity,
180            },
181            predicate::{SchemaInfo, ValidateError},
182        },
183        key::Key,
184        model::{
185            entity::EntityModel,
186            field::{EntityFieldKind, EntityFieldModel},
187            index::IndexModel,
188        },
189        traits::EntityKind,
190        types::Ulid,
191        value::Value,
192    };
193
194    fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
195        EntityFieldModel { name, kind }
196    }
197
198    fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
199        let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
200        let primary_key = &fields[pk_index];
201        let indexes: &'static [&'static IndexModel] = &[];
202
203        EntityModel {
204            path: "test::Entity",
205            entity_name: "TestEntity",
206            primary_key,
207            fields,
208            indexes,
209        }
210    }
211
212    #[test]
213    fn model_rejects_missing_primary_key() {
214        let fields: &'static [EntityFieldModel] =
215            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
216        let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
217
218        let model = EntityModel {
219            path: "test::Entity",
220            entity_name: "TestEntity",
221            primary_key: missing_pk,
222            fields,
223            indexes: &[],
224        };
225
226        assert!(matches!(
227            SchemaInfo::from_entity_model(&model),
228            Err(ValidateError::InvalidPrimaryKey { .. })
229        ));
230    }
231
232    #[test]
233    fn model_rejects_duplicate_fields() {
234        let model = model_with_fields(
235            vec![
236                field("dup", EntityFieldKind::Text),
237                field("dup", EntityFieldKind::Text),
238            ],
239            0,
240        );
241
242        assert!(matches!(
243            SchemaInfo::from_entity_model(&model),
244            Err(ValidateError::DuplicateField { .. })
245        ));
246    }
247
248    #[test]
249    fn model_rejects_invalid_primary_key_type() {
250        let model = model_with_fields(
251            vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
252            0,
253        );
254
255        assert!(matches!(
256            SchemaInfo::from_entity_model(&model),
257            Err(ValidateError::InvalidPrimaryKeyType { .. })
258        ));
259    }
260
261    #[test]
262    fn model_rejects_index_unknown_field() {
263        const INDEX_FIELDS: [&str; 1] = ["missing"];
264        const INDEX_MODEL: IndexModel = IndexModel::new(
265            "test::idx_missing",
266            "test::IndexStore",
267            &INDEX_FIELDS,
268            false,
269        );
270        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
271
272        let fields: &'static [EntityFieldModel] =
273            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
274        let model = EntityModel {
275            path: "test::Entity",
276            entity_name: "TestEntity",
277            primary_key: &fields[0],
278            fields,
279            indexes: &INDEXES,
280        };
281
282        assert!(matches!(
283            SchemaInfo::from_entity_model(&model),
284            Err(ValidateError::IndexFieldUnknown { .. })
285        ));
286    }
287
288    #[test]
289    fn model_rejects_index_unsupported_field() {
290        const INDEX_FIELDS: [&str; 1] = ["broken"];
291        const INDEX_MODEL: IndexModel =
292            IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
293        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
294
295        let fields: &'static [EntityFieldModel] = Box::leak(
296            vec![
297                field("id", EntityFieldKind::Ulid),
298                field("broken", EntityFieldKind::Unsupported),
299            ]
300            .into_boxed_slice(),
301        );
302        let model = EntityModel {
303            path: "test::Entity",
304            entity_name: "TestEntity",
305            primary_key: &fields[0],
306            fields,
307            indexes: &INDEXES,
308        };
309
310        assert!(matches!(
311            SchemaInfo::from_entity_model(&model),
312            Err(ValidateError::IndexFieldUnsupported { .. })
313        ));
314    }
315
316    #[test]
317    fn model_rejects_duplicate_index_names() {
318        const INDEX_FIELDS_A: [&str; 1] = ["id"];
319        const INDEX_FIELDS_B: [&str; 1] = ["other"];
320        const INDEX_A: IndexModel = IndexModel::new(
321            "test::dup_index",
322            "test::IndexStore",
323            &INDEX_FIELDS_A,
324            false,
325        );
326        const INDEX_B: IndexModel = IndexModel::new(
327            "test::dup_index",
328            "test::IndexStore",
329            &INDEX_FIELDS_B,
330            false,
331        );
332        const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
333
334        let fields: &'static [EntityFieldModel] = Box::leak(
335            vec![
336                field("id", EntityFieldKind::Ulid),
337                field("other", EntityFieldKind::Text),
338            ]
339            .into_boxed_slice(),
340        );
341        let model = EntityModel {
342            path: "test::Entity",
343            entity_name: "TestEntity",
344            primary_key: &fields[0],
345            fields,
346            indexes: &INDEXES,
347        };
348
349        assert!(matches!(
350            SchemaInfo::from_entity_model(&model),
351            Err(ValidateError::DuplicateIndexName { .. })
352        ));
353    }
354
355    #[test]
356    fn plan_rejects_unorderable_field() {
357        let model = model_with_fields(
358            vec![
359                field("id", EntityFieldKind::Ulid),
360                field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
361            ],
362            0,
363        );
364
365        let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
366        let plan = LogicalPlan {
367            mode: crate::db::query::QueryMode::Load,
368            access: AccessPlan::Path(AccessPath::FullScan),
369            predicate: None,
370            order: Some(OrderSpec {
371                fields: vec![("tags".to_string(), OrderDirection::Asc)],
372            }),
373            delete_limit: None,
374            page: None,
375            projection: crate::db::query::plan::ProjectionSpec::All,
376            consistency: crate::db::query::ReadConsistency::MissingOk,
377        };
378
379        let err =
380            validate_plan_with_schema_info(&schema, &model, &plan).expect_err("unorderable field");
381        assert!(matches!(err, PlanError::UnorderableField { .. }));
382    }
383
384    #[test]
385    fn plan_rejects_index_prefix_too_long() {
386        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
387        let plan = LogicalPlan {
388            mode: crate::db::query::QueryMode::Load,
389            access: AccessPlan::Path(AccessPath::IndexPrefix {
390                index: *PlannerEntity::INDEXES[0],
391                values: vec![
392                    Value::Text("a".to_string()),
393                    Value::Text("b".to_string()),
394                    Value::Text("c".to_string()),
395                ],
396            }),
397            predicate: None,
398            order: None,
399            delete_limit: None,
400            page: None,
401            projection: crate::db::query::plan::ProjectionSpec::All,
402            consistency: crate::db::query::ReadConsistency::MissingOk,
403        };
404
405        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
406            .expect_err("index prefix too long");
407        assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
408    }
409
410    #[test]
411    fn plan_accepts_model_based_validation() {
412        let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
413        let plan = LogicalPlan {
414            mode: crate::db::query::QueryMode::Load,
415            access: AccessPlan::Path(AccessPath::ByKey(Key::Ulid(Ulid::nil()))),
416            predicate: None,
417            order: None,
418            delete_limit: None,
419            page: None,
420            projection: crate::db::query::plan::ProjectionSpec::All,
421            consistency: crate::db::query::ReadConsistency::MissingOk,
422        };
423
424        validate_plan_with_model(&plan, &model).expect("valid plan");
425    }
426}
427
428/// Validate that a key matches the entity's primary key type.
429fn validate_pk_key(schema: &SchemaInfo, model: &EntityModel, key: &Key) -> Result<(), PlanError> {
430    let field = model.primary_key.name;
431
432    let field_type = schema
433        .field(field)
434        .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
435            field: field.to_string(),
436        })?;
437
438    let expected =
439        key_type_for_field(field_type).ok_or_else(|| PlanError::PrimaryKeyUnsupported {
440            field: field.to_string(),
441        })?;
442
443    if key_variant(key) != expected {
444        return Err(PlanError::PrimaryKeyMismatch {
445            field: field.to_string(),
446            key: *key,
447        });
448    }
449
450    Ok(())
451}
452
453/// Validate that an index prefix is valid for execution.
454fn validate_index_prefix(
455    schema: &SchemaInfo,
456    model: &EntityModel,
457    index: &IndexModel,
458    values: &[Value],
459) -> Result<(), PlanError> {
460    if !model.indexes.contains(&index) {
461        return Err(PlanError::IndexNotFound { index: *index });
462    }
463
464    if values.len() > index.fields.len() {
465        return Err(PlanError::IndexPrefixTooLong {
466            prefix_len: values.len(),
467            field_len: index.fields.len(),
468        });
469    }
470
471    for (field, value) in index.fields.iter().zip(values.iter()) {
472        let field_type =
473            schema
474                .field(field)
475                .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
476                    field: field.to_string(),
477                })?;
478
479        if !predicate::validate::literal_matches_type(value, field_type) {
480            return Err(PlanError::IndexPrefixValueMismatch {
481                field: field.to_string(),
482            });
483        }
484    }
485
486    Ok(())
487}
488
489/// Internal classification of primary-key-compatible value variants.
490///
491/// This exists purely to decouple `Key` from `FieldType`.
492#[derive(Clone, Copy, Debug, Eq, PartialEq)]
493enum KeyVariant {
494    Account,
495    Int,
496    Principal,
497    Subaccount,
498    Timestamp,
499    Uint,
500    Ulid,
501    Unit,
502}
503
504const fn key_variant(key: &Key) -> KeyVariant {
505    match key {
506        Key::Account(_) => KeyVariant::Account,
507        Key::Int(_) => KeyVariant::Int,
508        Key::Principal(_) => KeyVariant::Principal,
509        Key::Subaccount(_) => KeyVariant::Subaccount,
510        Key::Timestamp(_) => KeyVariant::Timestamp,
511        Key::Uint(_) => KeyVariant::Uint,
512        Key::Ulid(_) => KeyVariant::Ulid,
513        Key::Unit => KeyVariant::Unit,
514    }
515}
516
517/// Map scalar field types to compatible key variants.
518///
519/// Non-scalar and unsupported field types are intentionally excluded.
520const fn key_type_for_field(field_type: &FieldType) -> Option<KeyVariant> {
521    match field_type {
522        FieldType::Scalar(ScalarType::Account) => Some(KeyVariant::Account),
523        FieldType::Scalar(ScalarType::Int) => Some(KeyVariant::Int),
524        FieldType::Scalar(ScalarType::Principal) => Some(KeyVariant::Principal),
525        FieldType::Scalar(ScalarType::Subaccount) => Some(KeyVariant::Subaccount),
526        FieldType::Scalar(ScalarType::Timestamp) => Some(KeyVariant::Timestamp),
527        FieldType::Scalar(ScalarType::Uint) => Some(KeyVariant::Uint),
528        FieldType::Scalar(ScalarType::Ulid) => Some(KeyVariant::Ulid),
529        FieldType::Scalar(ScalarType::Unit) => Some(KeyVariant::Unit),
530        _ => None,
531    }
532}