1use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
13use crate::{
14 db::query::predicate::{
15 self, SchemaInfo,
16 validate::{FieldType, ScalarType},
17 },
18 key::Key,
19 model::entity::EntityModel,
20 model::index::IndexModel,
21 value::Value,
22};
23use thiserror::Error as ThisError;
24
25#[derive(Debug, ThisError)]
30pub enum PlanError {
31 #[error("predicate validation failed: {0}")]
33 PredicateInvalid(#[from] predicate::ValidateError),
34
35 #[error("unknown order field '{field}'")]
37 UnknownOrderField { field: String },
38
39 #[error("order field '{field}' is not orderable")]
41 UnorderableField { field: String },
42
43 #[error("index '{index}' not found on entity")]
45 IndexNotFound { index: IndexModel },
46
47 #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
49 IndexPrefixTooLong { prefix_len: usize, field_len: usize },
50
51 #[error("index prefix value for field '{field}' is incompatible")]
53 IndexPrefixValueMismatch { field: String },
54
55 #[error("primary key field '{field}' is not key-compatible")]
57 PrimaryKeyUnsupported { field: String },
58
59 #[error("key '{key}' is incompatible with primary key '{field}'")]
61 PrimaryKeyMismatch { field: String, key: Key },
62
63 #[error("key range start is greater than end")]
65 InvalidKeyRange,
66}
67
68#[cfg(test)]
72pub(crate) fn validate_plan_with_model(
73 plan: &LogicalPlan,
74 model: &EntityModel,
75) -> Result<(), PlanError> {
76 let schema = SchemaInfo::from_entity_model(model)?;
77 validate_plan_with_schema_info(&schema, model, plan)
78}
79
80#[cfg(test)]
82pub(crate) fn validate_plan_with_schema_info(
83 schema: &SchemaInfo,
84 model: &EntityModel,
85 plan: &LogicalPlan,
86) -> Result<(), PlanError> {
87 if let Some(predicate) = &plan.predicate {
88 predicate::validate(schema, predicate)?;
89 }
90
91 if let Some(order) = &plan.order {
92 validate_order(schema, order)?;
93 }
94
95 validate_access_plan(schema, model, &plan.access)?;
96
97 Ok(())
98}
99
100impl LogicalPlan {}
101
102pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
104 for (field, _) in &order.fields {
105 let field_type = schema
106 .field(field)
107 .ok_or_else(|| PlanError::UnknownOrderField {
108 field: field.clone(),
109 })?;
110
111 if !field_type.is_orderable() {
112 return Err(PlanError::UnorderableField {
113 field: field.clone(),
114 });
115 }
116 }
117
118 Ok(())
119}
120
121pub(crate) fn validate_access_plan(
125 schema: &SchemaInfo,
126 model: &EntityModel,
127 access: &AccessPlan,
128) -> Result<(), PlanError> {
129 match access {
130 AccessPlan::Path(path) => validate_access_path(schema, model, path),
131 AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
132 for child in children {
133 validate_access_plan(schema, model, child)?;
134 }
135 Ok(())
136 }
137 }
138}
139
140fn validate_access_path(
141 schema: &SchemaInfo,
142 model: &EntityModel,
143 access: &AccessPath,
144) -> Result<(), PlanError> {
145 match access {
146 AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
147 AccessPath::ByKeys(keys) => {
148 for key in keys {
149 validate_pk_key(schema, model, key)?;
150 }
151 Ok(())
152 }
153 AccessPath::KeyRange { start, end } => {
154 validate_pk_key(schema, model, start)?;
155 validate_pk_key(schema, model, end)?;
156 if start > end {
157 return Err(PlanError::InvalidKeyRange);
158 }
159 Ok(())
160 }
161 AccessPath::IndexPrefix { index, values } => {
162 validate_index_prefix(schema, model, index, values)
163 }
164 AccessPath::FullScan => Ok(()),
165 }
166}
167
168#[cfg(test)]
173mod tests {
174 use super::{PlanError, validate_plan_with_model, validate_plan_with_schema_info};
175 use crate::{
176 db::query::{
177 plan::{
178 AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec,
179 planner::PlannerEntity,
180 },
181 predicate::{SchemaInfo, ValidateError},
182 },
183 key::Key,
184 model::{
185 entity::EntityModel,
186 field::{EntityFieldKind, EntityFieldModel},
187 index::IndexModel,
188 },
189 traits::EntityKind,
190 types::Ulid,
191 value::Value,
192 };
193
194 fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
195 EntityFieldModel { name, kind }
196 }
197
198 fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
199 let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
200 let primary_key = &fields[pk_index];
201 let indexes: &'static [&'static IndexModel] = &[];
202
203 EntityModel {
204 path: "test::Entity",
205 entity_name: "TestEntity",
206 primary_key,
207 fields,
208 indexes,
209 }
210 }
211
212 #[test]
213 fn model_rejects_missing_primary_key() {
214 let fields: &'static [EntityFieldModel] =
215 Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
216 let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
217
218 let model = EntityModel {
219 path: "test::Entity",
220 entity_name: "TestEntity",
221 primary_key: missing_pk,
222 fields,
223 indexes: &[],
224 };
225
226 assert!(matches!(
227 SchemaInfo::from_entity_model(&model),
228 Err(ValidateError::InvalidPrimaryKey { .. })
229 ));
230 }
231
232 #[test]
233 fn model_rejects_duplicate_fields() {
234 let model = model_with_fields(
235 vec![
236 field("dup", EntityFieldKind::Text),
237 field("dup", EntityFieldKind::Text),
238 ],
239 0,
240 );
241
242 assert!(matches!(
243 SchemaInfo::from_entity_model(&model),
244 Err(ValidateError::DuplicateField { .. })
245 ));
246 }
247
248 #[test]
249 fn model_rejects_invalid_primary_key_type() {
250 let model = model_with_fields(
251 vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
252 0,
253 );
254
255 assert!(matches!(
256 SchemaInfo::from_entity_model(&model),
257 Err(ValidateError::InvalidPrimaryKeyType { .. })
258 ));
259 }
260
261 #[test]
262 fn model_rejects_index_unknown_field() {
263 const INDEX_FIELDS: [&str; 1] = ["missing"];
264 const INDEX_MODEL: IndexModel = IndexModel::new(
265 "test::idx_missing",
266 "test::IndexStore",
267 &INDEX_FIELDS,
268 false,
269 );
270 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
271
272 let fields: &'static [EntityFieldModel] =
273 Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
274 let model = EntityModel {
275 path: "test::Entity",
276 entity_name: "TestEntity",
277 primary_key: &fields[0],
278 fields,
279 indexes: &INDEXES,
280 };
281
282 assert!(matches!(
283 SchemaInfo::from_entity_model(&model),
284 Err(ValidateError::IndexFieldUnknown { .. })
285 ));
286 }
287
288 #[test]
289 fn model_rejects_index_unsupported_field() {
290 const INDEX_FIELDS: [&str; 1] = ["broken"];
291 const INDEX_MODEL: IndexModel =
292 IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
293 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
294
295 let fields: &'static [EntityFieldModel] = Box::leak(
296 vec![
297 field("id", EntityFieldKind::Ulid),
298 field("broken", EntityFieldKind::Unsupported),
299 ]
300 .into_boxed_slice(),
301 );
302 let model = EntityModel {
303 path: "test::Entity",
304 entity_name: "TestEntity",
305 primary_key: &fields[0],
306 fields,
307 indexes: &INDEXES,
308 };
309
310 assert!(matches!(
311 SchemaInfo::from_entity_model(&model),
312 Err(ValidateError::IndexFieldUnsupported { .. })
313 ));
314 }
315
316 #[test]
317 fn model_rejects_duplicate_index_names() {
318 const INDEX_FIELDS_A: [&str; 1] = ["id"];
319 const INDEX_FIELDS_B: [&str; 1] = ["other"];
320 const INDEX_A: IndexModel = IndexModel::new(
321 "test::dup_index",
322 "test::IndexStore",
323 &INDEX_FIELDS_A,
324 false,
325 );
326 const INDEX_B: IndexModel = IndexModel::new(
327 "test::dup_index",
328 "test::IndexStore",
329 &INDEX_FIELDS_B,
330 false,
331 );
332 const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
333
334 let fields: &'static [EntityFieldModel] = Box::leak(
335 vec![
336 field("id", EntityFieldKind::Ulid),
337 field("other", EntityFieldKind::Text),
338 ]
339 .into_boxed_slice(),
340 );
341 let model = EntityModel {
342 path: "test::Entity",
343 entity_name: "TestEntity",
344 primary_key: &fields[0],
345 fields,
346 indexes: &INDEXES,
347 };
348
349 assert!(matches!(
350 SchemaInfo::from_entity_model(&model),
351 Err(ValidateError::DuplicateIndexName { .. })
352 ));
353 }
354
355 #[test]
356 fn plan_rejects_unorderable_field() {
357 let model = model_with_fields(
358 vec![
359 field("id", EntityFieldKind::Ulid),
360 field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
361 ],
362 0,
363 );
364
365 let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
366 let plan = LogicalPlan {
367 mode: crate::db::query::QueryMode::Load,
368 access: AccessPlan::Path(AccessPath::FullScan),
369 predicate: None,
370 order: Some(OrderSpec {
371 fields: vec![("tags".to_string(), OrderDirection::Asc)],
372 }),
373 delete_limit: None,
374 page: None,
375 projection: crate::db::query::plan::ProjectionSpec::All,
376 consistency: crate::db::query::ReadConsistency::MissingOk,
377 };
378
379 let err =
380 validate_plan_with_schema_info(&schema, &model, &plan).expect_err("unorderable field");
381 assert!(matches!(err, PlanError::UnorderableField { .. }));
382 }
383
384 #[test]
385 fn plan_rejects_index_prefix_too_long() {
386 let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
387 let plan = LogicalPlan {
388 mode: crate::db::query::QueryMode::Load,
389 access: AccessPlan::Path(AccessPath::IndexPrefix {
390 index: *PlannerEntity::INDEXES[0],
391 values: vec![
392 Value::Text("a".to_string()),
393 Value::Text("b".to_string()),
394 Value::Text("c".to_string()),
395 ],
396 }),
397 predicate: None,
398 order: None,
399 delete_limit: None,
400 page: None,
401 projection: crate::db::query::plan::ProjectionSpec::All,
402 consistency: crate::db::query::ReadConsistency::MissingOk,
403 };
404
405 let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
406 .expect_err("index prefix too long");
407 assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
408 }
409
410 #[test]
411 fn plan_accepts_model_based_validation() {
412 let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
413 let plan = LogicalPlan {
414 mode: crate::db::query::QueryMode::Load,
415 access: AccessPlan::Path(AccessPath::ByKey(Key::Ulid(Ulid::nil()))),
416 predicate: None,
417 order: None,
418 delete_limit: None,
419 page: None,
420 projection: crate::db::query::plan::ProjectionSpec::All,
421 consistency: crate::db::query::ReadConsistency::MissingOk,
422 };
423
424 validate_plan_with_model(&plan, &model).expect("valid plan");
425 }
426}
427
428fn validate_pk_key(schema: &SchemaInfo, model: &EntityModel, key: &Key) -> Result<(), PlanError> {
430 let field = model.primary_key.name;
431
432 let field_type = schema
433 .field(field)
434 .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
435 field: field.to_string(),
436 })?;
437
438 let expected =
439 key_type_for_field(field_type).ok_or_else(|| PlanError::PrimaryKeyUnsupported {
440 field: field.to_string(),
441 })?;
442
443 if key_variant(key) != expected {
444 return Err(PlanError::PrimaryKeyMismatch {
445 field: field.to_string(),
446 key: *key,
447 });
448 }
449
450 Ok(())
451}
452
453fn validate_index_prefix(
455 schema: &SchemaInfo,
456 model: &EntityModel,
457 index: &IndexModel,
458 values: &[Value],
459) -> Result<(), PlanError> {
460 if !model.indexes.contains(&index) {
461 return Err(PlanError::IndexNotFound { index: *index });
462 }
463
464 if values.len() > index.fields.len() {
465 return Err(PlanError::IndexPrefixTooLong {
466 prefix_len: values.len(),
467 field_len: index.fields.len(),
468 });
469 }
470
471 for (field, value) in index.fields.iter().zip(values.iter()) {
472 let field_type =
473 schema
474 .field(field)
475 .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
476 field: field.to_string(),
477 })?;
478
479 if !predicate::validate::literal_matches_type(value, field_type) {
480 return Err(PlanError::IndexPrefixValueMismatch {
481 field: field.to_string(),
482 });
483 }
484 }
485
486 Ok(())
487}
488
489#[derive(Clone, Copy, Debug, Eq, PartialEq)]
493enum KeyVariant {
494 Account,
495 Int,
496 Principal,
497 Subaccount,
498 Timestamp,
499 Uint,
500 Ulid,
501 Unit,
502}
503
504const fn key_variant(key: &Key) -> KeyVariant {
505 match key {
506 Key::Account(_) => KeyVariant::Account,
507 Key::Int(_) => KeyVariant::Int,
508 Key::Principal(_) => KeyVariant::Principal,
509 Key::Subaccount(_) => KeyVariant::Subaccount,
510 Key::Timestamp(_) => KeyVariant::Timestamp,
511 Key::Uint(_) => KeyVariant::Uint,
512 Key::Ulid(_) => KeyVariant::Ulid,
513 Key::Unit => KeyVariant::Unit,
514 }
515}
516
517const fn key_type_for_field(field_type: &FieldType) -> Option<KeyVariant> {
521 match field_type {
522 FieldType::Scalar(ScalarType::Account) => Some(KeyVariant::Account),
523 FieldType::Scalar(ScalarType::Int) => Some(KeyVariant::Int),
524 FieldType::Scalar(ScalarType::Principal) => Some(KeyVariant::Principal),
525 FieldType::Scalar(ScalarType::Subaccount) => Some(KeyVariant::Subaccount),
526 FieldType::Scalar(ScalarType::Timestamp) => Some(KeyVariant::Timestamp),
527 FieldType::Scalar(ScalarType::Uint) => Some(KeyVariant::Uint),
528 FieldType::Scalar(ScalarType::Ulid) => Some(KeyVariant::Ulid),
529 FieldType::Scalar(ScalarType::Unit) => Some(KeyVariant::Unit),
530 _ => None,
531 }
532}