Skip to main content

icydb_core/db/query/plan/
validate.rs

1//! Executor-ready plan validation against a concrete entity schema.
2
3use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
4use crate::{
5    db::query::predicate::{
6        self, SchemaInfo,
7        validate::{FieldType, ScalarType},
8    },
9    key::Key,
10    model::entity::EntityModel,
11    model::index::IndexModel,
12    value::Value,
13};
14use thiserror::Error as ThisError;
15
16/// Executor-visible validation failures for logical plans.
17///
18/// These errors indicate that a plan cannot be safely executed against the
19/// current schema or entity definition. They are *not* planner bugs.
20#[derive(Debug, ThisError)]
21pub enum PlanError {
22    /// Predicate failed schema-level validation.
23    #[error("predicate validation failed: {0}")]
24    PredicateInvalid(#[from] predicate::ValidateError),
25
26    /// ORDER BY references an unknown field.
27    #[error("unknown order field '{field}'")]
28    UnknownOrderField { field: String },
29
30    /// ORDER BY references a field that cannot be ordered.
31    #[error("order field '{field}' is not orderable")]
32    UnorderableField { field: String },
33
34    /// Access plan references an index not declared on the entity.
35    #[error("index '{index}' not found on entity")]
36    IndexNotFound { index: IndexModel },
37
38    /// Index prefix exceeds the number of indexed fields.
39    #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
40    IndexPrefixTooLong { prefix_len: usize, field_len: usize },
41
42    /// Index prefix literal does not match indexed field type.
43    #[error("index prefix value for field '{field}' is incompatible")]
44    IndexPrefixValueMismatch { field: String },
45
46    /// Primary key field exists but is not key-compatible.
47    #[error("primary key field '{field}' is not key-compatible")]
48    PrimaryKeyUnsupported { field: String },
49
50    /// Supplied key does not match the primary key type.
51    #[error("key '{key}' is incompatible with primary key '{field}'")]
52    PrimaryKeyMismatch { field: String, key: Key },
53
54    /// Key range has invalid ordering.
55    #[error("key range start is greater than end")]
56    InvalidKeyRange,
57}
58
59/// Validate a logical plan against the runtime entity model.
60///
61/// This is the executor-safe entrypoint and must not consult global schema.
62#[cfg(test)]
63pub(crate) fn validate_plan_with_model(
64    plan: &LogicalPlan,
65    model: &EntityModel,
66) -> Result<(), PlanError> {
67    let schema = SchemaInfo::from_entity_model(model)?;
68    validate_plan_with_schema_info(&schema, model, plan)
69}
70
71/// Validate a logical plan using a prebuilt schema surface.
72#[cfg(test)]
73pub(crate) fn validate_plan_with_schema_info(
74    schema: &SchemaInfo,
75    model: &EntityModel,
76    plan: &LogicalPlan,
77) -> Result<(), PlanError> {
78    if let Some(predicate) = &plan.predicate {
79        predicate::validate(schema, predicate)?;
80    }
81
82    if let Some(order) = &plan.order {
83        validate_order(schema, order)?;
84    }
85
86    validate_access_plan(schema, model, &plan.access)?;
87
88    Ok(())
89}
90
91impl LogicalPlan {}
92
93/// Validate ORDER BY fields against the schema.
94pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
95    for (field, _) in &order.fields {
96        let field_type = schema
97            .field(field)
98            .ok_or_else(|| PlanError::UnknownOrderField {
99                field: field.clone(),
100            })?;
101
102        if !field_type.is_orderable() {
103            return Err(PlanError::UnorderableField {
104                field: field.clone(),
105            });
106        }
107    }
108
109    Ok(())
110}
111
112/// Validate executor-visible access paths.
113///
114/// This ensures keys, ranges, and index prefixes are schema-compatible.
115pub(crate) fn validate_access_plan(
116    schema: &SchemaInfo,
117    model: &EntityModel,
118    access: &AccessPlan,
119) -> Result<(), PlanError> {
120    match access {
121        AccessPlan::Path(path) => validate_access_path(schema, model, path),
122        AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
123            for child in children {
124                validate_access_plan(schema, model, child)?;
125            }
126            Ok(())
127        }
128    }
129}
130
131fn validate_access_path(
132    schema: &SchemaInfo,
133    model: &EntityModel,
134    access: &AccessPath,
135) -> Result<(), PlanError> {
136    match access {
137        AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
138        AccessPath::ByKeys(keys) => {
139            for key in keys {
140                validate_pk_key(schema, model, key)?;
141            }
142            Ok(())
143        }
144        AccessPath::KeyRange { start, end } => {
145            validate_pk_key(schema, model, start)?;
146            validate_pk_key(schema, model, end)?;
147            if start > end {
148                return Err(PlanError::InvalidKeyRange);
149            }
150            Ok(())
151        }
152        AccessPath::IndexPrefix { index, values } => {
153            validate_index_prefix(schema, model, index, values)
154        }
155        AccessPath::FullScan => Ok(()),
156    }
157}
158
159///
160/// TESTS
161///
162
163#[cfg(test)]
164mod tests {
165    use super::{PlanError, validate_plan_with_model, validate_plan_with_schema_info};
166    use crate::{
167        db::query::{
168            plan::{
169                AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec,
170                planner::PlannerEntity,
171            },
172            predicate::{SchemaInfo, ValidateError},
173        },
174        key::Key,
175        model::{
176            entity::EntityModel,
177            field::{EntityFieldKind, EntityFieldModel},
178            index::IndexModel,
179        },
180        traits::EntityKind,
181        types::Ulid,
182        value::Value,
183    };
184
185    fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
186        EntityFieldModel { name, kind }
187    }
188
189    fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
190        let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
191        let primary_key = &fields[pk_index];
192        let indexes: &'static [&'static IndexModel] = &[];
193
194        EntityModel {
195            path: "test::Entity",
196            entity_name: "TestEntity",
197            primary_key,
198            fields,
199            indexes,
200        }
201    }
202
203    #[test]
204    fn model_rejects_missing_primary_key() {
205        let fields: &'static [EntityFieldModel] =
206            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
207        let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
208
209        let model = EntityModel {
210            path: "test::Entity",
211            entity_name: "TestEntity",
212            primary_key: missing_pk,
213            fields,
214            indexes: &[],
215        };
216
217        assert!(matches!(
218            SchemaInfo::from_entity_model(&model),
219            Err(ValidateError::InvalidPrimaryKey { .. })
220        ));
221    }
222
223    #[test]
224    fn model_rejects_duplicate_fields() {
225        let model = model_with_fields(
226            vec![
227                field("dup", EntityFieldKind::Text),
228                field("dup", EntityFieldKind::Text),
229            ],
230            0,
231        );
232
233        assert!(matches!(
234            SchemaInfo::from_entity_model(&model),
235            Err(ValidateError::DuplicateField { .. })
236        ));
237    }
238
239    #[test]
240    fn model_rejects_invalid_primary_key_type() {
241        let model = model_with_fields(
242            vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
243            0,
244        );
245
246        assert!(matches!(
247            SchemaInfo::from_entity_model(&model),
248            Err(ValidateError::InvalidPrimaryKeyType { .. })
249        ));
250    }
251
252    #[test]
253    fn model_rejects_index_unknown_field() {
254        const INDEX_FIELDS: [&str; 1] = ["missing"];
255        const INDEX_MODEL: IndexModel = IndexModel::new(
256            "test::idx_missing",
257            "test::IndexStore",
258            &INDEX_FIELDS,
259            false,
260        );
261        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
262
263        let fields: &'static [EntityFieldModel] =
264            Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
265        let model = EntityModel {
266            path: "test::Entity",
267            entity_name: "TestEntity",
268            primary_key: &fields[0],
269            fields,
270            indexes: &INDEXES,
271        };
272
273        assert!(matches!(
274            SchemaInfo::from_entity_model(&model),
275            Err(ValidateError::IndexFieldUnknown { .. })
276        ));
277    }
278
279    #[test]
280    fn model_rejects_index_unsupported_field() {
281        const INDEX_FIELDS: [&str; 1] = ["broken"];
282        const INDEX_MODEL: IndexModel =
283            IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
284        const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
285
286        let fields: &'static [EntityFieldModel] = Box::leak(
287            vec![
288                field("id", EntityFieldKind::Ulid),
289                field("broken", EntityFieldKind::Unsupported),
290            ]
291            .into_boxed_slice(),
292        );
293        let model = EntityModel {
294            path: "test::Entity",
295            entity_name: "TestEntity",
296            primary_key: &fields[0],
297            fields,
298            indexes: &INDEXES,
299        };
300
301        assert!(matches!(
302            SchemaInfo::from_entity_model(&model),
303            Err(ValidateError::IndexFieldUnsupported { .. })
304        ));
305    }
306
307    #[test]
308    fn model_rejects_duplicate_index_names() {
309        const INDEX_FIELDS_A: [&str; 1] = ["id"];
310        const INDEX_FIELDS_B: [&str; 1] = ["other"];
311        const INDEX_A: IndexModel = IndexModel::new(
312            "test::dup_index",
313            "test::IndexStore",
314            &INDEX_FIELDS_A,
315            false,
316        );
317        const INDEX_B: IndexModel = IndexModel::new(
318            "test::dup_index",
319            "test::IndexStore",
320            &INDEX_FIELDS_B,
321            false,
322        );
323        const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
324
325        let fields: &'static [EntityFieldModel] = Box::leak(
326            vec![
327                field("id", EntityFieldKind::Ulid),
328                field("other", EntityFieldKind::Text),
329            ]
330            .into_boxed_slice(),
331        );
332        let model = EntityModel {
333            path: "test::Entity",
334            entity_name: "TestEntity",
335            primary_key: &fields[0],
336            fields,
337            indexes: &INDEXES,
338        };
339
340        assert!(matches!(
341            SchemaInfo::from_entity_model(&model),
342            Err(ValidateError::DuplicateIndexName { .. })
343        ));
344    }
345
346    #[test]
347    fn plan_rejects_unorderable_field() {
348        let model = model_with_fields(
349            vec![
350                field("id", EntityFieldKind::Ulid),
351                field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
352            ],
353            0,
354        );
355
356        let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
357        let plan = LogicalPlan {
358            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
359            access: AccessPlan::Path(AccessPath::FullScan),
360            predicate: None,
361            order: Some(OrderSpec {
362                fields: vec![("tags".to_string(), OrderDirection::Asc)],
363            }),
364            delete_limit: None,
365            page: None,
366            projection: crate::db::query::plan::ProjectionSpec::All,
367            consistency: crate::db::query::ReadConsistency::MissingOk,
368        };
369
370        let err =
371            validate_plan_with_schema_info(&schema, &model, &plan).expect_err("unorderable field");
372        assert!(matches!(err, PlanError::UnorderableField { .. }));
373    }
374
375    #[test]
376    fn plan_rejects_index_prefix_too_long() {
377        let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
378        let plan = LogicalPlan {
379            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
380            access: AccessPlan::Path(AccessPath::IndexPrefix {
381                index: *PlannerEntity::INDEXES[0],
382                values: vec![
383                    Value::Text("a".to_string()),
384                    Value::Text("b".to_string()),
385                    Value::Text("c".to_string()),
386                ],
387            }),
388            predicate: None,
389            order: None,
390            delete_limit: None,
391            page: None,
392            projection: crate::db::query::plan::ProjectionSpec::All,
393            consistency: crate::db::query::ReadConsistency::MissingOk,
394        };
395
396        let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
397            .expect_err("index prefix too long");
398        assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
399    }
400
401    #[test]
402    fn plan_accepts_model_based_validation() {
403        let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
404        let plan = LogicalPlan {
405            mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
406            access: AccessPlan::Path(AccessPath::ByKey(Key::Ulid(Ulid::nil()))),
407            predicate: None,
408            order: None,
409            delete_limit: None,
410            page: None,
411            projection: crate::db::query::plan::ProjectionSpec::All,
412            consistency: crate::db::query::ReadConsistency::MissingOk,
413        };
414
415        validate_plan_with_model(&plan, &model).expect("valid plan");
416    }
417}
418
419/// Validate that a key matches the entity's primary key type.
420fn validate_pk_key(schema: &SchemaInfo, model: &EntityModel, key: &Key) -> Result<(), PlanError> {
421    let field = model.primary_key.name;
422
423    let field_type = schema
424        .field(field)
425        .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
426            field: field.to_string(),
427        })?;
428
429    let expected =
430        key_type_for_field(field_type).ok_or_else(|| PlanError::PrimaryKeyUnsupported {
431            field: field.to_string(),
432        })?;
433
434    if key_variant(key) != expected {
435        return Err(PlanError::PrimaryKeyMismatch {
436            field: field.to_string(),
437            key: *key,
438        });
439    }
440
441    Ok(())
442}
443
444/// Validate that an index prefix is valid for execution.
445fn validate_index_prefix(
446    schema: &SchemaInfo,
447    model: &EntityModel,
448    index: &IndexModel,
449    values: &[Value],
450) -> Result<(), PlanError> {
451    if !model.indexes.contains(&index) {
452        return Err(PlanError::IndexNotFound { index: *index });
453    }
454
455    if values.len() > index.fields.len() {
456        return Err(PlanError::IndexPrefixTooLong {
457            prefix_len: values.len(),
458            field_len: index.fields.len(),
459        });
460    }
461
462    for (field, value) in index.fields.iter().zip(values.iter()) {
463        let field_type =
464            schema
465                .field(field)
466                .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
467                    field: field.to_string(),
468                })?;
469
470        if !predicate::validate::literal_matches_type(value, field_type) {
471            return Err(PlanError::IndexPrefixValueMismatch {
472                field: field.to_string(),
473            });
474        }
475    }
476
477    Ok(())
478}
479
480/// Internal classification of primary-key-compatible value variants.
481///
482/// This exists purely to decouple `Key` from `FieldType`.
483#[derive(Clone, Copy, Debug, Eq, PartialEq)]
484enum KeyVariant {
485    Account,
486    Int,
487    Principal,
488    Subaccount,
489    Timestamp,
490    Uint,
491    Ulid,
492    Unit,
493}
494
495const fn key_variant(key: &Key) -> KeyVariant {
496    match key {
497        Key::Account(_) => KeyVariant::Account,
498        Key::Int(_) => KeyVariant::Int,
499        Key::Principal(_) => KeyVariant::Principal,
500        Key::Subaccount(_) => KeyVariant::Subaccount,
501        Key::Timestamp(_) => KeyVariant::Timestamp,
502        Key::Uint(_) => KeyVariant::Uint,
503        Key::Ulid(_) => KeyVariant::Ulid,
504        Key::Unit => KeyVariant::Unit,
505    }
506}
507
508/// Map scalar field types to compatible key variants.
509///
510/// Non-scalar and unsupported field types are intentionally excluded.
511const fn key_type_for_field(field_type: &FieldType) -> Option<KeyVariant> {
512    match field_type {
513        FieldType::Scalar(ScalarType::Account) => Some(KeyVariant::Account),
514        FieldType::Scalar(ScalarType::Int) => Some(KeyVariant::Int),
515        FieldType::Scalar(ScalarType::Principal) => Some(KeyVariant::Principal),
516        FieldType::Scalar(ScalarType::Subaccount) => Some(KeyVariant::Subaccount),
517        FieldType::Scalar(ScalarType::Timestamp) => Some(KeyVariant::Timestamp),
518        FieldType::Scalar(ScalarType::Uint) => Some(KeyVariant::Uint),
519        FieldType::Scalar(ScalarType::Ulid) => Some(KeyVariant::Ulid),
520        FieldType::Scalar(ScalarType::Unit) => Some(KeyVariant::Unit),
521        _ => None,
522    }
523}