Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Executor-ready plan validation against a concrete entity schema.
2
3use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
4use crate::{
5    db::query::predicate::{
6        self, SchemaInfo,
7        validate::{FieldType, ScalarType},
8    },
9    key::Key,
10    model::entity::EntityModel,
11    model::index::IndexModel,
12    value::Value,
13};
14use thiserror::Error as ThisError;
15
16/// Executor-visible validation failures for logical plans.
17///
18/// These errors indicate that a plan cannot be safely executed against the
19/// current schema or entity definition. They are *not* planner bugs.
20#[derive(Debug, ThisError)]
21pub enum PlanError {
22    /// Predicate failed schema-level validation.
23    #[error("predicate validation failed: {0}")]
24    PredicateInvalid(#[from] predicate::ValidateError),
25
26    /// ORDER BY references an unknown field.
27    #[error("unknown order field '{field}'")]
28    UnknownOrderField { field: String },
29
30    /// ORDER BY references a field that cannot be ordered.
31    #[error("order field '{field}' is not orderable")]
32    UnorderableField { field: String },
33
34    /// Access plan references an index not declared on the entity.
35    #[error("index '{index}' not found on entity")]
36    IndexNotFound { index: IndexModel },
37
38    /// Index prefix exceeds the number of indexed fields.
39    #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
40    IndexPrefixTooLong { prefix_len: usize, field_len: usize },
41
42    /// Index prefix literal does not match indexed field type.
43    #[error("index prefix value for field '{field}' is incompatible")]
44    IndexPrefixValueMismatch { field: String },
45
46    /// Primary key field exists but is not key-compatible.
47    #[error("primary key field '{field}' is not key-compatible")]
48    PrimaryKeyUnsupported { field: String },
49
50    /// Supplied key does not match the primary key type.
51    #[error("key '{key}' is incompatible with primary key '{field}'")]
52    PrimaryKeyMismatch { field: String, key: Key },
53
54    /// Key range has invalid ordering.
55    #[error("key range start is greater than end")]
56    InvalidKeyRange,
57}
58
59/// Validate a logical plan against the runtime entity model.
60///
61/// This is the executor-safe entrypoint and must not consult global schema.
62#[cfg(test)]
63pub(crate) fn validate_plan_with_model(
64    plan: &LogicalPlan,
65    model: &EntityModel,
66) -> Result<(), PlanError> {
67    let schema = SchemaInfo::from_entity_model(model)?;
68    validate_plan_with_schema_info(&schema, model, plan)
69}
70
71/// Validate a logical plan using a prebuilt schema surface.
72#[cfg(test)]
73pub(crate) fn validate_plan_with_schema_info(
74    schema: &SchemaInfo,
75    model: &EntityModel,
76    plan: &LogicalPlan,
77) -> Result<(), PlanError> {
78    if let Some(predicate) = &plan.predicate {
79        predicate::validate(schema, predicate)?;
80    }
81
82    if let Some(order) = &plan.order {
83        validate_order(schema, order)?;
84    }
85
86    validate_access_plan(schema, model, &plan.access)?;
87
88    Ok(())
89}
90
91impl LogicalPlan {}
92
93/// Validate ORDER BY fields against the schema.
94pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
95    for (field, _) in &order.fields {
96        let field_type = schema
97            .field(field)
98            .ok_or_else(|| PlanError::UnknownOrderField {
99                field: field.clone(),
100            })?;
101
102        if !field_type.is_orderable() {
103            return Err(PlanError::UnorderableField {
104                field: field.clone(),
105            });
106        }
107    }
108
109    Ok(())
110}
111
112/// Validate executor-visible access paths.
113///
114/// This ensures keys, ranges, and index prefixes are schema-compatible.
115pub(crate) fn validate_access_plan(
116    schema: &SchemaInfo,
117    model: &EntityModel,
118    access: &AccessPlan,
119) -> Result<(), PlanError> {
120    match access {
121        AccessPlan::Path(path) => validate_access_path(schema, model, path),
122        AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
123            for child in children {
124                validate_access_plan(schema, model, child)?;
125            }
126            Ok(())
127        }
128    }
129}
130
131fn validate_access_path(
132    schema: &SchemaInfo,
133    model: &EntityModel,
134    access: &AccessPath,
135) -> Result<(), PlanError> {
136    match access {
137        AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
138        AccessPath::ByKeys(keys) => {
139            // Empty key lists are a valid no-op.
140            if keys.is_empty() {
141                return Ok(());
142            }
143            for key in keys {
144                validate_pk_key(schema, model, key)?;
145            }
146            Ok(())
147        }
148        AccessPath::KeyRange { start, end } => {
149            validate_pk_key(schema, model, start)?;
150            validate_pk_key(schema, model, end)?;
151            if start > end {
152                return Err(PlanError::InvalidKeyRange);
153            }
154            Ok(())
155        }
156        AccessPath::IndexPrefix { index, values } => {
157            validate_index_prefix(schema, model, index, values)
158        }
159        AccessPath::FullScan => Ok(()),
160    }
161}
162
163///
164/// TESTS
165///
166
167#[cfg(test)]
168mod tests {
169    use super::{PlanError, validate_plan_with_model, validate_plan_with_schema_info};
170    use crate::{
171        db::query::{
172            plan::{
173                AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec,
174                planner::PlannerEntity,
175            },
176            predicate::{SchemaInfo, ValidateError},
177        },
178        key::Key,
179        model::{
180            entity::EntityModel,
181            field::{EntityFieldKind, EntityFieldModel},
182            index::IndexModel,
183        },
184        traits::EntityKind,
185        types::Ulid,
186        value::Value,
187    };
188
189    fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
190        EntityFieldModel { name, kind }
191    }
192
193    fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
194        let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
195        let primary_key = &fields[pk_index];
196        let indexes: &'static [&'static IndexModel] = &[];
197
198        EntityModel {
199            path: "test::Entity",
200            entity_name: "TestEntity",
201            primary_key,
202            fields,
203            indexes,
204        }
205    }
206
207    #[test]
208    fn model_rejects_missing_primary_key() {
209        let fields: &'static [EntityFieldModel] =
210            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
211        let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
212
213        let model = EntityModel {
214            path: "test::Entity",
215            entity_name: "TestEntity",
216            primary_key: missing_pk,
217            fields,
218            indexes: &[],
219        };
220
221        assert!(matches!(
222            SchemaInfo::from_entity_model(&model),
223            Err(ValidateError::InvalidPrimaryKey { .. })
224        ));
225    }
226
227    #[test]
228    fn model_rejects_duplicate_fields() {
229        let model = model_with_fields(
230            vec![
231                field("dup", EntityFieldKind::Text),
232                field("dup", EntityFieldKind::Text),
233            ],
234            0,
235        );
236
237        assert!(matches!(
238            SchemaInfo::from_entity_model(&model),
239            Err(ValidateError::DuplicateField { .. })
240        ));
241    }
242
243    #[test]
244    fn model_rejects_invalid_primary_key_type() {
245        let model = model_with_fields(
246            vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
247            0,
248        );
249
250        assert!(matches!(
251            SchemaInfo::from_entity_model(&model),
252            Err(ValidateError::InvalidPrimaryKeyType { .. })
253        ));
254    }
255
256    #[test]
257    fn model_rejects_index_unknown_field() {
258        const INDEX_FIELDS: [&str; 1] = ["missing"];
259        const INDEX_MODEL: IndexModel = IndexModel::new(
260            "test::idx_missing",
261            "test::IndexStore",
262            &INDEX_FIELDS,
263            false,
264        );
265        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
266
267        let fields: &'static [EntityFieldModel] =
268            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
269        let model = EntityModel {
270            path: "test::Entity",
271            entity_name: "TestEntity",
272            primary_key: &fields[0],
273            fields,
274            indexes: &INDEXES,
275        };
276
277        assert!(matches!(
278            SchemaInfo::from_entity_model(&model),
279            Err(ValidateError::IndexFieldUnknown { .. })
280        ));
281    }
282
283    #[test]
284    fn model_rejects_index_unsupported_field() {
285        const INDEX_FIELDS: [&str; 1] = ["broken"];
286        const INDEX_MODEL: IndexModel =
287            IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
288        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
289
290        let fields: &'static [EntityFieldModel] = Box::leak(
291            vec![
292                field("id", EntityFieldKind::Ulid),
293                field("broken", EntityFieldKind::Unsupported),
294            ]
295            .into_boxed_slice(),
296        );
297        let model = EntityModel {
298            path: "test::Entity",
299            entity_name: "TestEntity",
300            primary_key: &fields[0],
301            fields,
302            indexes: &INDEXES,
303        };
304
305        assert!(matches!(
306            SchemaInfo::from_entity_model(&model),
307            Err(ValidateError::IndexFieldUnsupported { .. })
308        ));
309    }
310
311    #[test]
312    fn model_rejects_duplicate_index_names() {
313        const INDEX_FIELDS_A: [&str; 1] = ["id"];
314        const INDEX_FIELDS_B: [&str; 1] = ["other"];
315        const INDEX_A: IndexModel = IndexModel::new(
316            "test::dup_index",
317            "test::IndexStore",
318            &INDEX_FIELDS_A,
319            false,
320        );
321        const INDEX_B: IndexModel = IndexModel::new(
322            "test::dup_index",
323            "test::IndexStore",
324            &INDEX_FIELDS_B,
325            false,
326        );
327        const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
328
329        let fields: &'static [EntityFieldModel] = Box::leak(
330            vec![
331                field("id", EntityFieldKind::Ulid),
332                field("other", EntityFieldKind::Text),
333            ]
334            .into_boxed_slice(),
335        );
336        let model = EntityModel {
337            path: "test::Entity",
338            entity_name: "TestEntity",
339            primary_key: &fields[0],
340            fields,
341            indexes: &INDEXES,
342        };
343
344        assert!(matches!(
345            SchemaInfo::from_entity_model(&model),
346            Err(ValidateError::DuplicateIndexName { .. })
347        ));
348    }
349
350    #[test]
351    fn plan_rejects_unorderable_field() {
352        let model = model_with_fields(
353            vec![
354                field("id", EntityFieldKind::Ulid),
355                field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
356            ],
357            0,
358        );
359
360        let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
361        let plan = LogicalPlan {
362            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
363            access: AccessPlan::Path(AccessPath::FullScan),
364            predicate: None,
365            order: Some(OrderSpec {
366                fields: vec![("tags".to_string(), OrderDirection::Asc)],
367            }),
368            delete_limit: None,
369            page: None,
370            projection: crate::db::query::plan::ProjectionSpec::All,
371            consistency: crate::db::query::ReadConsistency::MissingOk,
372        };
373
374        let err =
375            validate_plan_with_schema_info(&schema, &model, &plan).expect_err("unorderable field");
376        assert!(matches!(err, PlanError::UnorderableField { .. }));
377    }
378
379    #[test]
380    fn plan_rejects_index_prefix_too_long() {
381        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
382        let plan = LogicalPlan {
383            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
384            access: AccessPlan::Path(AccessPath::IndexPrefix {
385                index: *PlannerEntity::INDEXES[0],
386                values: vec![
387                    Value::Text("a".to_string()),
388                    Value::Text("b".to_string()),
389                    Value::Text("c".to_string()),
390                ],
391            }),
392            predicate: None,
393            order: None,
394            delete_limit: None,
395            page: None,
396            projection: crate::db::query::plan::ProjectionSpec::All,
397            consistency: crate::db::query::ReadConsistency::MissingOk,
398        };
399
400        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
401            .expect_err("index prefix too long");
402        assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
403    }
404
405    #[test]
406    fn plan_accepts_model_based_validation() {
407        let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
408        let plan = LogicalPlan {
409            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
410            access: AccessPlan::Path(AccessPath::ByKey(Key::Ulid(Ulid::nil()))),
411            predicate: None,
412            order: None,
413            delete_limit: None,
414            page: None,
415            projection: crate::db::query::plan::ProjectionSpec::All,
416            consistency: crate::db::query::ReadConsistency::MissingOk,
417        };
418
419        validate_plan_with_model(&plan, &model).expect("valid plan");
420    }
421}
422
423/// Validate that a key matches the entity's primary key type.
424fn validate_pk_key(schema: &SchemaInfo, model: &EntityModel, key: &Key) -> Result<(), PlanError> {
425    let field = model.primary_key.name;
426
427    let field_type = schema
428        .field(field)
429        .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
430            field: field.to_string(),
431        })?;
432
433    let expected =
434        key_type_for_field(field_type).ok_or_else(|| PlanError::PrimaryKeyUnsupported {
435            field: field.to_string(),
436        })?;
437
438    if key_variant(key) != expected {
439        return Err(PlanError::PrimaryKeyMismatch {
440            field: field.to_string(),
441            key: *key,
442        });
443    }
444
445    Ok(())
446}
447
448/// Validate that an index prefix is valid for execution.
449fn validate_index_prefix(
450    schema: &SchemaInfo,
451    model: &EntityModel,
452    index: &IndexModel,
453    values: &[Value],
454) -> Result<(), PlanError> {
455    if !model.indexes.contains(&index) {
456        return Err(PlanError::IndexNotFound { index: *index });
457    }
458
459    if values.len() > index.fields.len() {
460        return Err(PlanError::IndexPrefixTooLong {
461            prefix_len: values.len(),
462            field_len: index.fields.len(),
463        });
464    }
465
466    for (field, value) in index.fields.iter().zip(values.iter()) {
467        let field_type =
468            schema
469                .field(field)
470                .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
471                    field: field.to_string(),
472                })?;
473
474        if !predicate::validate::literal_matches_type(value, field_type) {
475            return Err(PlanError::IndexPrefixValueMismatch {
476                field: field.to_string(),
477            });
478        }
479    }
480
481    Ok(())
482}
483
484/// Internal classification of primary-key-compatible value variants.
485///
486/// This exists purely to decouple `Key` from `FieldType`.
487#[derive(Clone, Copy, Debug, Eq, PartialEq)]
488enum KeyVariant {
489    Account,
490    Int,
491    Principal,
492    Subaccount,
493    Timestamp,
494    Uint,
495    Ulid,
496    Unit,
497}
498
499const fn key_variant(key: &Key) -> KeyVariant {
500    match key {
501        Key::Account(_) => KeyVariant::Account,
502        Key::Int(_) => KeyVariant::Int,
503        Key::Principal(_) => KeyVariant::Principal,
504        Key::Subaccount(_) => KeyVariant::Subaccount,
505        Key::Timestamp(_) => KeyVariant::Timestamp,
506        Key::Uint(_) => KeyVariant::Uint,
507        Key::Ulid(_) => KeyVariant::Ulid,
508        Key::Unit => KeyVariant::Unit,
509    }
510}
511
512/// Map scalar field types to compatible key variants.
513///
514/// Non-scalar and unsupported field types are intentionally excluded.
515const fn key_type_for_field(field_type: &FieldType) -> Option<KeyVariant> {
516    match field_type {
517        FieldType::Scalar(ScalarType::Account) => Some(KeyVariant::Account),
518        FieldType::Scalar(ScalarType::Int) => Some(KeyVariant::Int),
519        FieldType::Scalar(ScalarType::Principal) => Some(KeyVariant::Principal),
520        FieldType::Scalar(ScalarType::Subaccount) => Some(KeyVariant::Subaccount),
521        FieldType::Scalar(ScalarType::Timestamp) => Some(KeyVariant::Timestamp),
522        FieldType::Scalar(ScalarType::Uint) => Some(KeyVariant::Uint),
523        FieldType::Scalar(ScalarType::Ulid) => Some(KeyVariant::Ulid),
524        FieldType::Scalar(ScalarType::Unit) => Some(KeyVariant::Unit),
525        _ => None,
526    }
527}