1use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
3use crate::{
4 db::query::predicate::{
5 self, SchemaInfo,
6 validate::{FieldType, ScalarType},
7 },
8 error::{ErrorClass, ErrorOrigin, InternalError},
9 model::entity::EntityModel,
10 model::index::IndexModel,
11 traits::{EntityKind, FieldValue},
12 value::Value,
13};
14use thiserror::Error as ThisError;
15
16#[derive(Debug, ThisError)]
26pub enum PlanError {
27 #[error("predicate validation failed: {0}")]
28 PredicateInvalid(#[from] predicate::ValidateError),
29
30 #[error("unknown order field '{field}'")]
32 UnknownOrderField { field: String },
33
34 #[error("order field '{field}' is not orderable")]
36 UnorderableField { field: String },
37
38 #[error("index '{index}' not found on entity")]
40 IndexNotFound { index: IndexModel },
41
42 #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
44 IndexPrefixTooLong { prefix_len: usize, field_len: usize },
45
46 #[error("index prefix must include at least one value")]
48 IndexPrefixEmpty,
49
50 #[error("index prefix value for field '{field}' is incompatible")]
52 IndexPrefixValueMismatch { field: String },
53
54 #[error("primary key field '{field}' is not key-compatible")]
56 PrimaryKeyUnsupported { field: String },
57
58 #[error("key '{key:?}' is incompatible with primary key '{field}'")]
60 PrimaryKeyMismatch { field: String, key: Value },
61
62 #[error("key range start is greater than end")]
64 InvalidKeyRange,
65
66 #[error("order specification must include at least one field")]
68 EmptyOrderSpec,
69
70 #[error("delete plans must not include pagination")]
72 DeletePlanWithPagination,
73
74 #[error("load plans must not include delete limits")]
76 LoadPlanWithDeleteLimit,
77
78 #[error("delete limit requires an explicit ordering")]
80 DeleteLimitRequiresOrder,
81}
82
83#[cfg(test)]
85pub(crate) fn validate_plan_with_schema_info<K>(
86 schema: &SchemaInfo,
87 model: &EntityModel,
88 plan: &LogicalPlan<K>,
89) -> Result<(), PlanError>
90where
91 K: FieldValue + Ord,
92{
93 validate_logical_plan(schema, model, plan)
94}
95
96#[cfg(test)]
100#[expect(dead_code)]
101pub(crate) fn validate_plan_with_model<K>(
102 plan: &LogicalPlan<K>,
103 model: &EntityModel,
104) -> Result<(), PlanError>
105where
106 K: FieldValue + Ord,
107{
108 let schema = SchemaInfo::from_entity_model(model)?;
109 validate_plan_with_schema_info(&schema, model, plan)
110}
111
112#[cfg(test)]
114pub(crate) fn validate_logical_plan<K>(
115 schema: &SchemaInfo,
116 model: &EntityModel,
117 plan: &LogicalPlan<K>,
118) -> Result<(), PlanError>
119where
120 K: FieldValue + Ord,
121{
122 if let Some(predicate) = &plan.predicate {
123 predicate::validate(schema, predicate)?;
124 }
125
126 if let Some(order) = &plan.order {
127 validate_order(schema, order)?;
128 }
129
130 validate_access_plan(schema, model, &plan.access)?;
131 validate_plan_semantics(plan)?;
132
133 Ok(())
134}
135
136pub(crate) fn validate_logical_plan_model(
138 schema: &SchemaInfo,
139 model: &EntityModel,
140 plan: &LogicalPlan<Value>,
141) -> Result<(), PlanError> {
142 if let Some(predicate) = &plan.predicate {
143 predicate::validate(schema, predicate)?;
144 }
145
146 if let Some(order) = &plan.order {
147 validate_order(schema, order)?;
148 }
149
150 validate_access_plan_model(schema, model, &plan.access)?;
151 validate_plan_semantics(plan)?;
152
153 Ok(())
154}
155
156fn validate_plan_semantics<K>(plan: &LogicalPlan<K>) -> Result<(), PlanError> {
158 if let Some(order) = &plan.order
159 && order.fields.is_empty()
160 {
161 return Err(PlanError::EmptyOrderSpec);
162 }
163
164 if plan.mode.is_delete() {
165 if plan.page.is_some() {
166 return Err(PlanError::DeletePlanWithPagination);
167 }
168
169 if plan.delete_limit.is_some()
170 && plan
171 .order
172 .as_ref()
173 .is_none_or(|order| order.fields.is_empty())
174 {
175 return Err(PlanError::DeleteLimitRequiresOrder);
176 }
177 }
178
179 if plan.mode.is_load() && plan.delete_limit.is_some() {
180 return Err(PlanError::LoadPlanWithDeleteLimit);
181 }
182
183 Ok(())
184}
185
186pub(crate) fn validate_executor_plan<E: EntityKind>(
188 plan: &LogicalPlan<E::Id>,
189) -> Result<(), InternalError> {
190 let schema = SchemaInfo::from_entity_model(E::MODEL).map_err(|err| {
191 InternalError::new(
192 ErrorClass::InvariantViolation,
193 ErrorOrigin::Query,
194 format!("entity schema invalid for {}: {err}", E::PATH),
195 )
196 })?;
197
198 if let Some(predicate) = &plan.predicate {
199 predicate::validate(&schema, predicate).map_err(|err| {
200 InternalError::new(
201 ErrorClass::InvariantViolation,
202 ErrorOrigin::Query,
203 err.to_string(),
204 )
205 })?;
206 }
207
208 if let Some(order) = &plan.order {
209 validate_executor_order(&schema, order).map_err(|err| {
210 InternalError::new(
211 ErrorClass::InvariantViolation,
212 ErrorOrigin::Query,
213 err.to_string(),
214 )
215 })?;
216 }
217
218 validate_access_plan(&schema, E::MODEL, &plan.access).map_err(|err| {
219 InternalError::new(
220 ErrorClass::InvariantViolation,
221 ErrorOrigin::Query,
222 err.to_string(),
223 )
224 })?;
225
226 validate_plan_semantics(plan).map_err(|err| {
227 InternalError::new(
228 ErrorClass::InvariantViolation,
229 ErrorOrigin::Query,
230 err.to_string(),
231 )
232 })?;
233
234 Ok(())
235}
236
237pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
239 for (field, _) in &order.fields {
240 let field_type = schema
241 .field(field)
242 .ok_or_else(|| PlanError::UnknownOrderField {
243 field: field.clone(),
244 })?;
245
246 if !field_type.is_orderable() {
247 return Err(PlanError::UnorderableField {
249 field: field.clone(),
250 });
251 }
252 }
253
254 Ok(())
255}
256
257fn validate_executor_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
261 validate_order(schema, order)
262}
263
264pub(crate) fn validate_access_plan<K>(
268 schema: &SchemaInfo,
269 model: &EntityModel,
270 access: &AccessPlan<K>,
271) -> Result<(), PlanError>
272where
273 K: FieldValue + Ord,
274{
275 match access {
276 AccessPlan::Path(path) => validate_access_path(schema, model, path),
277 AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
278 for child in children {
279 validate_access_plan(schema, model, child)?;
280 }
281 Ok(())
282 }
283 }
284}
285
286pub(crate) fn validate_access_plan_model(
288 schema: &SchemaInfo,
289 model: &EntityModel,
290 access: &AccessPlan<Value>,
291) -> Result<(), PlanError> {
292 match access {
293 AccessPlan::Path(path) => validate_access_path_model(schema, model, path),
294 AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
295 for child in children {
296 validate_access_plan_model(schema, model, child)?;
297 }
298 Ok(())
299 }
300 }
301}
302
303fn validate_access_path<K>(
304 schema: &SchemaInfo,
305 model: &EntityModel,
306 access: &AccessPath<K>,
307) -> Result<(), PlanError>
308where
309 K: FieldValue + Ord,
310{
311 match access {
312 AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
313 AccessPath::ByKeys(keys) => {
314 if keys.is_empty() {
316 return Ok(());
317 }
318 for key in keys {
319 validate_pk_key(schema, model, key)?;
320 }
321 Ok(())
322 }
323 AccessPath::KeyRange { start, end } => {
324 validate_pk_key(schema, model, start)?;
325 validate_pk_key(schema, model, end)?;
326 if start > end {
327 return Err(PlanError::InvalidKeyRange);
328 }
329 Ok(())
330 }
331 AccessPath::IndexPrefix { index, values } => {
332 validate_index_prefix(schema, model, index, values)
333 }
334 AccessPath::FullScan => Ok(()),
335 }
336}
337
338fn validate_access_path_model(
340 schema: &SchemaInfo,
341 model: &EntityModel,
342 access: &AccessPath<Value>,
343) -> Result<(), PlanError> {
344 match access {
345 AccessPath::ByKey(key) => validate_pk_value(schema, model, key),
346 AccessPath::ByKeys(keys) => {
347 if keys.is_empty() {
348 return Ok(());
349 }
350 for key in keys {
351 validate_pk_value(schema, model, key)?;
352 }
353 Ok(())
354 }
355 AccessPath::KeyRange { start, end } => {
356 validate_pk_value(schema, model, start)?;
357 validate_pk_value(schema, model, end)?;
358 let Some(ordering) = start.partial_cmp(end) else {
359 return Err(PlanError::InvalidKeyRange);
360 };
361 if ordering == std::cmp::Ordering::Greater {
362 return Err(PlanError::InvalidKeyRange);
363 }
364 Ok(())
365 }
366 AccessPath::IndexPrefix { index, values } => {
367 validate_index_prefix(schema, model, index, values)
368 }
369 AccessPath::FullScan => Ok(()),
370 }
371}
372
373fn validate_pk_key<K>(schema: &SchemaInfo, model: &EntityModel, key: &K) -> Result<(), PlanError>
375where
376 K: FieldValue,
377{
378 let field = model.primary_key.name;
379
380 let field_type = schema
381 .field(field)
382 .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
383 field: field.to_string(),
384 })?;
385
386 if !is_key_compatible(field_type) {
387 return Err(PlanError::PrimaryKeyUnsupported {
388 field: field.to_string(),
389 });
390 }
391
392 let value = key.to_value();
393 if !predicate::validate::literal_matches_type(&value, field_type) {
394 return Err(PlanError::PrimaryKeyMismatch {
395 field: field.to_string(),
396 key: value,
397 });
398 }
399
400 Ok(())
401}
402
403fn validate_pk_value(
405 schema: &SchemaInfo,
406 model: &EntityModel,
407 key: &Value,
408) -> Result<(), PlanError> {
409 let field = model.primary_key.name;
410
411 let field_type = schema
412 .field(field)
413 .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
414 field: field.to_string(),
415 })?;
416
417 if !is_key_compatible(field_type) {
418 return Err(PlanError::PrimaryKeyUnsupported {
419 field: field.to_string(),
420 });
421 }
422
423 if !predicate::validate::literal_matches_type(key, field_type) {
424 return Err(PlanError::PrimaryKeyMismatch {
425 field: field.to_string(),
426 key: key.clone(),
427 });
428 }
429
430 Ok(())
431}
432
433fn validate_index_prefix(
435 schema: &SchemaInfo,
436 model: &EntityModel,
437 index: &IndexModel,
438 values: &[Value],
439) -> Result<(), PlanError> {
440 if !model.indexes.contains(&index) {
441 return Err(PlanError::IndexNotFound { index: *index });
442 }
443
444 if values.is_empty() {
445 return Err(PlanError::IndexPrefixEmpty);
446 }
447
448 if values.len() > index.fields.len() {
449 return Err(PlanError::IndexPrefixTooLong {
450 prefix_len: values.len(),
451 field_len: index.fields.len(),
452 });
453 }
454
455 for (field, value) in index.fields.iter().zip(values.iter()) {
456 let field_type =
457 schema
458 .field(field)
459 .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
460 field: field.to_string(),
461 })?;
462
463 if !predicate::validate::literal_matches_type(value, field_type) {
464 return Err(PlanError::IndexPrefixValueMismatch {
465 field: field.to_string(),
466 });
467 }
468 }
469
470 Ok(())
471}
472
473const fn is_key_compatible(field_type: &FieldType) -> bool {
477 matches!(
478 field_type,
479 FieldType::Scalar(
480 ScalarType::Account
481 | ScalarType::Int
482 | ScalarType::Principal
483 | ScalarType::Subaccount
484 | ScalarType::Timestamp
485 | ScalarType::Uint
486 | ScalarType::Ulid
487 | ScalarType::Unit
488 )
489 )
490}
491
492#[cfg(test)]
497mod tests {
498 use super::{PlanError, validate_logical_plan_model};
499 use crate::{
500 db::query::{
501 plan::{AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec},
502 predicate::{SchemaInfo, ValidateError},
503 },
504 model::{
505 entity::EntityModel,
506 field::{EntityFieldKind, EntityFieldModel},
507 index::IndexModel,
508 },
509 types::Ulid,
510 value::Value,
511 };
512
513 fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
514 EntityFieldModel { name, kind }
515 }
516
517 fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
518 let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
519 let primary_key = &fields[pk_index];
520 let indexes: &'static [&'static IndexModel] = &[];
521
522 EntityModel {
523 path: "test::Entity",
524 entity_name: "TestEntity",
525 primary_key,
526 fields,
527 indexes,
528 }
529 }
530
531 const INDEX_FIELDS: [&str; 1] = ["tag"];
532 const INDEX_MODEL: IndexModel =
533 IndexModel::new("test::idx_tag", "test::IndexStore", &INDEX_FIELDS, false);
534 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
535
536 fn model_with_index() -> EntityModel {
537 let fields: &'static [EntityFieldModel] = Box::leak(
538 vec![
539 field("id", EntityFieldKind::Ulid),
540 field("tag", EntityFieldKind::Text),
541 ]
542 .into_boxed_slice(),
543 );
544
545 EntityModel {
546 path: "test::Entity",
547 entity_name: "TestEntity",
548 primary_key: &fields[0],
549 fields,
550 indexes: &INDEXES,
551 }
552 }
553
554 #[test]
555 fn model_rejects_missing_primary_key() {
556 let fields: &'static [EntityFieldModel] =
557 Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
558 let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
559
560 let model = EntityModel {
561 path: "test::Entity",
562 entity_name: "TestEntity",
563 primary_key: missing_pk,
564 fields,
565 indexes: &[],
566 };
567
568 assert!(matches!(
569 SchemaInfo::from_entity_model(&model),
570 Err(ValidateError::InvalidPrimaryKey { .. })
571 ));
572 }
573
574 #[test]
575 fn model_rejects_duplicate_fields() {
576 let model = model_with_fields(
577 vec![
578 field("dup", EntityFieldKind::Text),
579 field("dup", EntityFieldKind::Text),
580 ],
581 0,
582 );
583
584 assert!(matches!(
585 SchemaInfo::from_entity_model(&model),
586 Err(ValidateError::DuplicateField { .. })
587 ));
588 }
589
590 #[test]
591 fn model_rejects_invalid_primary_key_type() {
592 let model = model_with_fields(
593 vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
594 0,
595 );
596
597 assert!(matches!(
598 SchemaInfo::from_entity_model(&model),
599 Err(ValidateError::InvalidPrimaryKeyType { .. })
600 ));
601 }
602
603 #[test]
604 fn model_rejects_index_unknown_field() {
605 const INDEX_FIELDS: [&str; 1] = ["missing"];
606 const INDEX_MODEL: IndexModel = IndexModel::new(
607 "test::idx_missing",
608 "test::IndexStore",
609 &INDEX_FIELDS,
610 false,
611 );
612 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
613
614 let fields: &'static [EntityFieldModel] =
615 Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
616 let model = EntityModel {
617 path: "test::Entity",
618 entity_name: "TestEntity",
619 primary_key: &fields[0],
620 fields,
621 indexes: &INDEXES,
622 };
623
624 assert!(matches!(
625 SchemaInfo::from_entity_model(&model),
626 Err(ValidateError::IndexFieldUnknown { .. })
627 ));
628 }
629
630 #[test]
631 fn model_rejects_index_unsupported_field() {
632 const INDEX_FIELDS: [&str; 1] = ["broken"];
633 const INDEX_MODEL: IndexModel =
634 IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
635 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
636
637 let fields: &'static [EntityFieldModel] = Box::leak(
638 vec![
639 field("id", EntityFieldKind::Ulid),
640 field("broken", EntityFieldKind::Unsupported),
641 ]
642 .into_boxed_slice(),
643 );
644 let model = EntityModel {
645 path: "test::Entity",
646 entity_name: "TestEntity",
647 primary_key: &fields[0],
648 fields,
649 indexes: &INDEXES,
650 };
651
652 assert!(matches!(
653 SchemaInfo::from_entity_model(&model),
654 Err(ValidateError::IndexFieldUnsupported { .. })
655 ));
656 }
657
658 #[test]
659 fn model_rejects_duplicate_index_names() {
660 const INDEX_FIELDS_A: [&str; 1] = ["id"];
661 const INDEX_FIELDS_B: [&str; 1] = ["other"];
662 const INDEX_A: IndexModel = IndexModel::new(
663 "test::dup_index",
664 "test::IndexStore",
665 &INDEX_FIELDS_A,
666 false,
667 );
668 const INDEX_B: IndexModel = IndexModel::new(
669 "test::dup_index",
670 "test::IndexStore",
671 &INDEX_FIELDS_B,
672 false,
673 );
674 const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
675
676 let fields: &'static [EntityFieldModel] = Box::leak(
677 vec![
678 field("id", EntityFieldKind::Ulid),
679 field("other", EntityFieldKind::Text),
680 ]
681 .into_boxed_slice(),
682 );
683 let model = EntityModel {
684 path: "test::Entity",
685 entity_name: "TestEntity",
686 primary_key: &fields[0],
687 fields,
688 indexes: &INDEXES,
689 };
690
691 assert!(matches!(
692 SchemaInfo::from_entity_model(&model),
693 Err(ValidateError::DuplicateIndexName { .. })
694 ));
695 }
696
697 #[test]
698 fn plan_rejects_unorderable_field() {
699 let model = model_with_fields(
700 vec![
701 field("id", EntityFieldKind::Ulid),
702 field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
703 ],
704 0,
705 );
706
707 let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
708 let plan: LogicalPlan<Value> = LogicalPlan {
709 mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
710 access: AccessPlan::Path(AccessPath::FullScan),
711 predicate: None,
712 order: Some(OrderSpec {
713 fields: vec![("tags".to_string(), OrderDirection::Asc)],
714 }),
715 delete_limit: None,
716 page: None,
717 consistency: crate::db::query::ReadConsistency::MissingOk,
718 };
719
720 let err =
721 validate_logical_plan_model(&schema, &model, &plan).expect_err("unorderable field");
722 assert!(matches!(err, PlanError::UnorderableField { .. }));
723 }
724
725 #[test]
726 fn plan_rejects_index_prefix_too_long() {
727 let model = model_with_index();
728 let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
729 let plan: LogicalPlan<Value> = LogicalPlan {
730 mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
731 access: AccessPlan::Path(AccessPath::IndexPrefix {
732 index: INDEX_MODEL,
733 values: vec![Value::Text("a".to_string()), Value::Text("b".to_string())],
734 }),
735 predicate: None,
736 order: None,
737 delete_limit: None,
738 page: None,
739 consistency: crate::db::query::ReadConsistency::MissingOk,
740 };
741
742 let err =
743 validate_logical_plan_model(&schema, &model, &plan).expect_err("index prefix too long");
744 assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
745 }
746
747 #[test]
748 fn plan_rejects_empty_index_prefix() {
749 let model = model_with_index();
750 let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
751 let plan: LogicalPlan<Value> = LogicalPlan {
752 mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
753 access: AccessPlan::Path(AccessPath::IndexPrefix {
754 index: INDEX_MODEL,
755 values: vec![],
756 }),
757 predicate: None,
758 order: None,
759 delete_limit: None,
760 page: None,
761 consistency: crate::db::query::ReadConsistency::MissingOk,
762 };
763
764 let err =
765 validate_logical_plan_model(&schema, &model, &plan).expect_err("index prefix empty");
766 assert!(matches!(err, PlanError::IndexPrefixEmpty));
767 }
768
769 #[test]
770 fn plan_accepts_model_based_validation() {
771 let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
772 let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
773 let plan: LogicalPlan<Value> = LogicalPlan {
774 mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
775 access: AccessPlan::Path(AccessPath::ByKey(Value::Ulid(Ulid::nil()))),
776 predicate: None,
777 order: None,
778 delete_limit: None,
779 page: None,
780 consistency: crate::db::query::ReadConsistency::MissingOk,
781 };
782
783 validate_logical_plan_model(&schema, &model, &plan).expect("valid plan");
784 }
785}