1use super::{AccessPath, AccessPlan, LogicalPlan, OrderSpec};
4use crate::{
5 db::query::predicate::{
6 self, SchemaInfo,
7 validate::{FieldType, ScalarType},
8 },
9 key::Key,
10 model::entity::EntityModel,
11 model::index::IndexModel,
12 value::Value,
13};
14use thiserror::Error as ThisError;
15
16#[derive(Debug, ThisError)]
21pub enum PlanError {
22 #[error("predicate validation failed: {0}")]
24 PredicateInvalid(#[from] predicate::ValidateError),
25
26 #[error("unknown order field '{field}'")]
28 UnknownOrderField { field: String },
29
30 #[error("order field '{field}' is not orderable")]
32 UnorderableField { field: String },
33
34 #[error("index '{index}' not found on entity")]
36 IndexNotFound { index: IndexModel },
37
38 #[error("index prefix length {prefix_len} exceeds index field count {field_len}")]
40 IndexPrefixTooLong { prefix_len: usize, field_len: usize },
41
42 #[error("index prefix value for field '{field}' is incompatible")]
44 IndexPrefixValueMismatch { field: String },
45
46 #[error("primary key field '{field}' is not key-compatible")]
48 PrimaryKeyUnsupported { field: String },
49
50 #[error("key '{key}' is incompatible with primary key '{field}'")]
52 PrimaryKeyMismatch { field: String, key: Key },
53
54 #[error("key range start is greater than end")]
56 InvalidKeyRange,
57}
58
59#[cfg(test)]
63pub(crate) fn validate_plan_with_model(
64 plan: &LogicalPlan,
65 model: &EntityModel,
66) -> Result<(), PlanError> {
67 let schema = SchemaInfo::from_entity_model(model)?;
68 validate_plan_with_schema_info(&schema, model, plan)
69}
70
71#[cfg(test)]
73pub(crate) fn validate_plan_with_schema_info(
74 schema: &SchemaInfo,
75 model: &EntityModel,
76 plan: &LogicalPlan,
77) -> Result<(), PlanError> {
78 if let Some(predicate) = &plan.predicate {
79 predicate::validate(schema, predicate)?;
80 }
81
82 if let Some(order) = &plan.order {
83 validate_order(schema, order)?;
84 }
85
86 validate_access_plan(schema, model, &plan.access)?;
87
88 Ok(())
89}
90
91impl LogicalPlan {}
92
93pub(crate) fn validate_order(schema: &SchemaInfo, order: &OrderSpec) -> Result<(), PlanError> {
95 for (field, _) in &order.fields {
96 let field_type = schema
97 .field(field)
98 .ok_or_else(|| PlanError::UnknownOrderField {
99 field: field.clone(),
100 })?;
101
102 if !field_type.is_orderable() {
103 return Err(PlanError::UnorderableField {
104 field: field.clone(),
105 });
106 }
107 }
108
109 Ok(())
110}
111
112pub(crate) fn validate_access_plan(
116 schema: &SchemaInfo,
117 model: &EntityModel,
118 access: &AccessPlan,
119) -> Result<(), PlanError> {
120 match access {
121 AccessPlan::Path(path) => validate_access_path(schema, model, path),
122 AccessPlan::Union(children) | AccessPlan::Intersection(children) => {
123 for child in children {
124 validate_access_plan(schema, model, child)?;
125 }
126 Ok(())
127 }
128 }
129}
130
131fn validate_access_path(
132 schema: &SchemaInfo,
133 model: &EntityModel,
134 access: &AccessPath,
135) -> Result<(), PlanError> {
136 match access {
137 AccessPath::ByKey(key) => validate_pk_key(schema, model, key),
138 AccessPath::ByKeys(keys) => {
139 if keys.is_empty() {
141 return Ok(());
142 }
143 for key in keys {
144 validate_pk_key(schema, model, key)?;
145 }
146 Ok(())
147 }
148 AccessPath::KeyRange { start, end } => {
149 validate_pk_key(schema, model, start)?;
150 validate_pk_key(schema, model, end)?;
151 if start > end {
152 return Err(PlanError::InvalidKeyRange);
153 }
154 Ok(())
155 }
156 AccessPath::IndexPrefix { index, values } => {
157 validate_index_prefix(schema, model, index, values)
158 }
159 AccessPath::FullScan => Ok(()),
160 }
161}
162
163#[cfg(test)]
168mod tests {
169 use super::{PlanError, validate_plan_with_model, validate_plan_with_schema_info};
170 use crate::{
171 db::query::{
172 plan::{
173 AccessPath, AccessPlan, LogicalPlan, OrderDirection, OrderSpec,
174 planner::PlannerEntity,
175 },
176 predicate::{SchemaInfo, ValidateError},
177 },
178 key::Key,
179 model::{
180 entity::EntityModel,
181 field::{EntityFieldKind, EntityFieldModel},
182 index::IndexModel,
183 },
184 traits::EntityKind,
185 types::Ulid,
186 value::Value,
187 };
188
189 fn field(name: &'static str, kind: EntityFieldKind) -> EntityFieldModel {
190 EntityFieldModel { name, kind }
191 }
192
193 fn model_with_fields(fields: Vec<EntityFieldModel>, pk_index: usize) -> EntityModel {
194 let fields: &'static [EntityFieldModel] = Box::leak(fields.into_boxed_slice());
195 let primary_key = &fields[pk_index];
196 let indexes: &'static [&'static IndexModel] = &[];
197
198 EntityModel {
199 path: "test::Entity",
200 entity_name: "TestEntity",
201 primary_key,
202 fields,
203 indexes,
204 }
205 }
206
207 #[test]
208 fn model_rejects_missing_primary_key() {
209 let fields: &'static [EntityFieldModel] =
210 Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
211 let missing_pk = Box::leak(Box::new(field("missing", EntityFieldKind::Ulid)));
212
213 let model = EntityModel {
214 path: "test::Entity",
215 entity_name: "TestEntity",
216 primary_key: missing_pk,
217 fields,
218 indexes: &[],
219 };
220
221 assert!(matches!(
222 SchemaInfo::from_entity_model(&model),
223 Err(ValidateError::InvalidPrimaryKey { .. })
224 ));
225 }
226
227 #[test]
228 fn model_rejects_duplicate_fields() {
229 let model = model_with_fields(
230 vec![
231 field("dup", EntityFieldKind::Text),
232 field("dup", EntityFieldKind::Text),
233 ],
234 0,
235 );
236
237 assert!(matches!(
238 SchemaInfo::from_entity_model(&model),
239 Err(ValidateError::DuplicateField { .. })
240 ));
241 }
242
243 #[test]
244 fn model_rejects_invalid_primary_key_type() {
245 let model = model_with_fields(
246 vec![field("pk", EntityFieldKind::List(&EntityFieldKind::Text))],
247 0,
248 );
249
250 assert!(matches!(
251 SchemaInfo::from_entity_model(&model),
252 Err(ValidateError::InvalidPrimaryKeyType { .. })
253 ));
254 }
255
256 #[test]
257 fn model_rejects_index_unknown_field() {
258 const INDEX_FIELDS: [&str; 1] = ["missing"];
259 const INDEX_MODEL: IndexModel = IndexModel::new(
260 "test::idx_missing",
261 "test::IndexStore",
262 &INDEX_FIELDS,
263 false,
264 );
265 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
266
267 let fields: &'static [EntityFieldModel] =
268 Box::leak(vec![field("id", EntityFieldKind::Ulid)].into_boxed_slice());
269 let model = EntityModel {
270 path: "test::Entity",
271 entity_name: "TestEntity",
272 primary_key: &fields[0],
273 fields,
274 indexes: &INDEXES,
275 };
276
277 assert!(matches!(
278 SchemaInfo::from_entity_model(&model),
279 Err(ValidateError::IndexFieldUnknown { .. })
280 ));
281 }
282
283 #[test]
284 fn model_rejects_index_unsupported_field() {
285 const INDEX_FIELDS: [&str; 1] = ["broken"];
286 const INDEX_MODEL: IndexModel =
287 IndexModel::new("test::idx_broken", "test::IndexStore", &INDEX_FIELDS, false);
288 const INDEXES: [&IndexModel; 1] = [&INDEX_MODEL];
289
290 let fields: &'static [EntityFieldModel] = Box::leak(
291 vec![
292 field("id", EntityFieldKind::Ulid),
293 field("broken", EntityFieldKind::Unsupported),
294 ]
295 .into_boxed_slice(),
296 );
297 let model = EntityModel {
298 path: "test::Entity",
299 entity_name: "TestEntity",
300 primary_key: &fields[0],
301 fields,
302 indexes: &INDEXES,
303 };
304
305 assert!(matches!(
306 SchemaInfo::from_entity_model(&model),
307 Err(ValidateError::IndexFieldUnsupported { .. })
308 ));
309 }
310
311 #[test]
312 fn model_rejects_duplicate_index_names() {
313 const INDEX_FIELDS_A: [&str; 1] = ["id"];
314 const INDEX_FIELDS_B: [&str; 1] = ["other"];
315 const INDEX_A: IndexModel = IndexModel::new(
316 "test::dup_index",
317 "test::IndexStore",
318 &INDEX_FIELDS_A,
319 false,
320 );
321 const INDEX_B: IndexModel = IndexModel::new(
322 "test::dup_index",
323 "test::IndexStore",
324 &INDEX_FIELDS_B,
325 false,
326 );
327 const INDEXES: [&IndexModel; 2] = [&INDEX_A, &INDEX_B];
328
329 let fields: &'static [EntityFieldModel] = Box::leak(
330 vec![
331 field("id", EntityFieldKind::Ulid),
332 field("other", EntityFieldKind::Text),
333 ]
334 .into_boxed_slice(),
335 );
336 let model = EntityModel {
337 path: "test::Entity",
338 entity_name: "TestEntity",
339 primary_key: &fields[0],
340 fields,
341 indexes: &INDEXES,
342 };
343
344 assert!(matches!(
345 SchemaInfo::from_entity_model(&model),
346 Err(ValidateError::DuplicateIndexName { .. })
347 ));
348 }
349
350 #[test]
351 fn plan_rejects_unorderable_field() {
352 let model = model_with_fields(
353 vec![
354 field("id", EntityFieldKind::Ulid),
355 field("tags", EntityFieldKind::List(&EntityFieldKind::Text)),
356 ],
357 0,
358 );
359
360 let schema = SchemaInfo::from_entity_model(&model).expect("valid model");
361 let plan = LogicalPlan {
362 mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
363 access: AccessPlan::Path(AccessPath::FullScan),
364 predicate: None,
365 order: Some(OrderSpec {
366 fields: vec![("tags".to_string(), OrderDirection::Asc)],
367 }),
368 delete_limit: None,
369 page: None,
370 projection: crate::db::query::plan::ProjectionSpec::All,
371 consistency: crate::db::query::ReadConsistency::MissingOk,
372 };
373
374 let err =
375 validate_plan_with_schema_info(&schema, &model, &plan).expect_err("unorderable field");
376 assert!(matches!(err, PlanError::UnorderableField { .. }));
377 }
378
379 #[test]
380 fn plan_rejects_index_prefix_too_long() {
381 let schema = SchemaInfo::from_entity_model(PlannerEntity::MODEL).expect("valid model");
382 let plan = LogicalPlan {
383 mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
384 access: AccessPlan::Path(AccessPath::IndexPrefix {
385 index: *PlannerEntity::INDEXES[0],
386 values: vec![
387 Value::Text("a".to_string()),
388 Value::Text("b".to_string()),
389 Value::Text("c".to_string()),
390 ],
391 }),
392 predicate: None,
393 order: None,
394 delete_limit: None,
395 page: None,
396 projection: crate::db::query::plan::ProjectionSpec::All,
397 consistency: crate::db::query::ReadConsistency::MissingOk,
398 };
399
400 let err = validate_plan_with_schema_info(&schema, PlannerEntity::MODEL, &plan)
401 .expect_err("index prefix too long");
402 assert!(matches!(err, PlanError::IndexPrefixTooLong { .. }));
403 }
404
405 #[test]
406 fn plan_accepts_model_based_validation() {
407 let model = model_with_fields(vec![field("id", EntityFieldKind::Ulid)], 0);
408 let plan = LogicalPlan {
409 mode: crate::db::query::QueryMode::Load(crate::db::query::LoadSpec::new()),
410 access: AccessPlan::Path(AccessPath::ByKey(Key::Ulid(Ulid::nil()))),
411 predicate: None,
412 order: None,
413 delete_limit: None,
414 page: None,
415 projection: crate::db::query::plan::ProjectionSpec::All,
416 consistency: crate::db::query::ReadConsistency::MissingOk,
417 };
418
419 validate_plan_with_model(&plan, &model).expect("valid plan");
420 }
421}
422
423fn validate_pk_key(schema: &SchemaInfo, model: &EntityModel, key: &Key) -> Result<(), PlanError> {
425 let field = model.primary_key.name;
426
427 let field_type = schema
428 .field(field)
429 .ok_or_else(|| PlanError::PrimaryKeyUnsupported {
430 field: field.to_string(),
431 })?;
432
433 let expected =
434 key_type_for_field(field_type).ok_or_else(|| PlanError::PrimaryKeyUnsupported {
435 field: field.to_string(),
436 })?;
437
438 if key_variant(key) != expected {
439 return Err(PlanError::PrimaryKeyMismatch {
440 field: field.to_string(),
441 key: *key,
442 });
443 }
444
445 Ok(())
446}
447
448fn validate_index_prefix(
450 schema: &SchemaInfo,
451 model: &EntityModel,
452 index: &IndexModel,
453 values: &[Value],
454) -> Result<(), PlanError> {
455 if !model.indexes.contains(&index) {
456 return Err(PlanError::IndexNotFound { index: *index });
457 }
458
459 if values.len() > index.fields.len() {
460 return Err(PlanError::IndexPrefixTooLong {
461 prefix_len: values.len(),
462 field_len: index.fields.len(),
463 });
464 }
465
466 for (field, value) in index.fields.iter().zip(values.iter()) {
467 let field_type =
468 schema
469 .field(field)
470 .ok_or_else(|| PlanError::IndexPrefixValueMismatch {
471 field: field.to_string(),
472 })?;
473
474 if !predicate::validate::literal_matches_type(value, field_type) {
475 return Err(PlanError::IndexPrefixValueMismatch {
476 field: field.to_string(),
477 });
478 }
479 }
480
481 Ok(())
482}
483
484#[derive(Clone, Copy, Debug, Eq, PartialEq)]
488enum KeyVariant {
489 Account,
490 Int,
491 Principal,
492 Subaccount,
493 Timestamp,
494 Uint,
495 Ulid,
496 Unit,
497}
498
499const fn key_variant(key: &Key) -> KeyVariant {
500 match key {
501 Key::Account(_) => KeyVariant::Account,
502 Key::Int(_) => KeyVariant::Int,
503 Key::Principal(_) => KeyVariant::Principal,
504 Key::Subaccount(_) => KeyVariant::Subaccount,
505 Key::Timestamp(_) => KeyVariant::Timestamp,
506 Key::Uint(_) => KeyVariant::Uint,
507 Key::Ulid(_) => KeyVariant::Ulid,
508 Key::Unit => KeyVariant::Unit,
509 }
510}
511
512const fn key_type_for_field(field_type: &FieldType) -> Option<KeyVariant> {
516 match field_type {
517 FieldType::Scalar(ScalarType::Account) => Some(KeyVariant::Account),
518 FieldType::Scalar(ScalarType::Int) => Some(KeyVariant::Int),
519 FieldType::Scalar(ScalarType::Principal) => Some(KeyVariant::Principal),
520 FieldType::Scalar(ScalarType::Subaccount) => Some(KeyVariant::Subaccount),
521 FieldType::Scalar(ScalarType::Timestamp) => Some(KeyVariant::Timestamp),
522 FieldType::Scalar(ScalarType::Uint) => Some(KeyVariant::Uint),
523 FieldType::Scalar(ScalarType::Ulid) => Some(KeyVariant::Ulid),
524 FieldType::Scalar(ScalarType::Unit) => Some(KeyVariant::Unit),
525 _ => None,
526 }
527}