1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
7pub type LoadSpec = crate::db::query::plan::LoadSpec;
8pub type QueryMode = crate::db::query::plan::QueryMode;
9
10use crate::{
11 db::{
12 access::{AccessPath, AccessPlan, AccessPlanError},
13 contracts::{Predicate, ReadConsistency, SchemaInfo, ValidateError},
14 executor::ExecutablePlan,
15 policy,
16 query::{
17 explain::ExplainPlan,
18 expr::{FilterExpr, SortExpr, SortLowerError},
19 plan::{
20 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
21 PageSpec, PlanError, PlannerError, canonical, plan_access,
22 },
23 plan_validate::validate_query_semantics,
24 predicate::{
25 lower_to_execution_model, normalize, normalize_enum_literals,
26 validate::reject_unsupported_query_features,
27 },
28 },
29 response::ResponseError,
30 },
31 error::InternalError,
32 model::entity::EntityModel,
33 traits::{EntityKind, FieldValue, SingletonEntity},
34 value::Value,
35};
36use std::marker::PhantomData;
37use thiserror::Error as ThisError;
38
39#[derive(Clone, Debug, Eq, PartialEq)]
45pub(crate) enum KeyAccess<K> {
46 Single(K),
47 Many(Vec<K>),
48}
49
50#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub(crate) enum KeyAccessKind {
57 Single,
58 Many,
59 Only,
60}
61
62#[derive(Clone, Debug, Eq, PartialEq)]
68pub(crate) struct KeyAccessState<K> {
69 pub kind: KeyAccessKind,
70 pub access: KeyAccess<K>,
71}
72
73pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
75where
76 K: FieldValue,
77{
78 match access {
79 KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
80 KeyAccess::Many(keys) => {
81 let mut values: Vec<Value> = keys.iter().map(FieldValue::to_value).collect();
82 canonical::canonicalize_key_values(&mut values);
83 if let Some(first) = values.first()
84 && values.len() == 1
85 {
86 return AccessPlan::path(AccessPath::ByKey(first.clone()));
87 }
88
89 AccessPlan::path(AccessPath::ByKeys(values))
90 }
91 }
92}
93
94pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
96 model: &EntityModel,
97 access: AccessPlan<Value>,
98) -> Result<AccessPlan<E::Key>, PlanError> {
99 access.into_executable::<E>(model)
100}
101
102pub(crate) fn coerce_entity_key<E: EntityKind>(
104 model: &EntityModel,
105 key: &Value,
106) -> Result<E::Key, PlanError> {
107 E::Key::from_value(key).ok_or_else(|| {
108 PlanError::from(AccessPlanError::PrimaryKeyMismatch {
109 field: model.primary_key.name.to_string(),
110 key: key.clone(),
111 })
112 })
113}
114
115impl AccessPlan<Value> {
116 pub(crate) fn into_executable<E: EntityKind>(
118 self,
119 model: &EntityModel,
120 ) -> Result<AccessPlan<E::Key>, PlanError> {
121 match self {
122 Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
123 Self::Union(children) => {
124 let mut out = Vec::with_capacity(children.len());
125 for child in children {
126 out.push(child.into_executable::<E>(model)?);
127 }
128
129 Ok(AccessPlan::Union(out))
130 }
131 Self::Intersection(children) => {
132 let mut out = Vec::with_capacity(children.len());
133 for child in children {
134 out.push(child.into_executable::<E>(model)?);
135 }
136
137 Ok(AccessPlan::Intersection(out))
138 }
139 }
140 }
141}
142
143impl AccessPath<Value> {
144 pub(crate) fn into_executable<E: EntityKind>(
146 self,
147 model: &EntityModel,
148 ) -> Result<AccessPath<E::Key>, PlanError> {
149 match self {
150 Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
151 Self::ByKeys(keys) => {
152 let mut out = Vec::with_capacity(keys.len());
153 for key in keys {
154 out.push(coerce_entity_key::<E>(model, &key)?);
155 }
156
157 Ok(AccessPath::ByKeys(out))
158 }
159 Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
160 start: coerce_entity_key::<E>(model, &start)?,
161 end: coerce_entity_key::<E>(model, &end)?,
162 }),
163 Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
164 Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
165 Self::FullScan => Ok(AccessPath::FullScan),
166 }
167 }
168}
169
170#[derive(Debug)]
178pub(crate) struct QueryModel<'m, K> {
179 model: &'m EntityModel,
180 mode: QueryMode,
181 predicate: Option<Predicate>,
182 key_access: Option<KeyAccessState<K>>,
183 key_access_conflict: bool,
184 order: Option<OrderSpec>,
185 distinct: bool,
186 consistency: ReadConsistency,
187}
188
189impl<'m, K: FieldValue> QueryModel<'m, K> {
190 #[must_use]
191 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
192 Self {
193 model,
194 mode: QueryMode::Load(LoadSpec::new()),
195 predicate: None,
196 key_access: None,
197 key_access_conflict: false,
198 order: None,
199 distinct: false,
200 consistency,
201 }
202 }
203
204 #[must_use]
206 pub(crate) const fn mode(&self) -> QueryMode {
207 self.mode
208 }
209
210 #[must_use]
211 fn has_explicit_order(&self) -> bool {
212 policy::has_explicit_order(self.order.as_ref())
213 }
214
215 #[must_use]
216 const fn load_spec(&self) -> Option<LoadSpec> {
217 match self.mode {
218 QueryMode::Load(spec) => Some(spec),
219 QueryMode::Delete(_) => None,
220 }
221 }
222
223 #[must_use]
225 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
226 self.predicate = match self.predicate.take() {
227 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
228 None => Some(predicate),
229 };
230 self
231 }
232
233 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
235 let schema = SchemaInfo::from_entity_model(self.model)?;
236 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
237
238 Ok(self.filter(predicate))
239 }
240
241 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
243 let schema = SchemaInfo::from_entity_model(self.model)?;
244 let order = match expr.lower_with(&schema) {
245 Ok(order) => order,
246 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
247 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
248 };
249
250 policy::validate_order_shape(Some(&order))
251 .map_err(IntentError::from)
252 .map_err(QueryError::from)?;
253
254 Ok(self.order_spec(order))
255 }
256
257 #[must_use]
259 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
260 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
261 self
262 }
263
264 #[must_use]
266 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
267 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
268 self
269 }
270
271 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
273 self.order = Some(order);
274 self
275 }
276
277 #[must_use]
279 pub(crate) const fn distinct(mut self) -> Self {
280 self.distinct = true;
281 self
282 }
283
284 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
286 if let Some(existing) = &self.key_access
287 && existing.kind != kind
288 {
289 self.key_access_conflict = true;
290 }
291
292 self.key_access = Some(KeyAccessState { kind, access });
293
294 self
295 }
296
297 pub(crate) fn by_id(self, id: K) -> Self {
299 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
300 }
301
302 pub(crate) fn by_ids<I>(self, ids: I) -> Self
304 where
305 I: IntoIterator<Item = K>,
306 {
307 self.set_key_access(
308 KeyAccessKind::Many,
309 KeyAccess::Many(ids.into_iter().collect()),
310 )
311 }
312
313 pub(crate) fn only(self, id: K) -> Self {
315 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
316 }
317
318 #[must_use]
320 pub(crate) const fn delete(mut self) -> Self {
321 if self.mode.is_load() {
322 self.mode = QueryMode::Delete(DeleteSpec::new());
323 }
324 self
325 }
326
327 #[must_use]
331 pub(crate) const fn limit(mut self, limit: u32) -> Self {
332 match self.mode {
333 QueryMode::Load(mut spec) => {
334 spec.limit = Some(limit);
335 self.mode = QueryMode::Load(spec);
336 }
337 QueryMode::Delete(mut spec) => {
338 spec.limit = Some(limit);
339 self.mode = QueryMode::Delete(spec);
340 }
341 }
342 self
343 }
344
345 #[must_use]
347 pub(crate) const fn offset(mut self, offset: u32) -> Self {
348 if let QueryMode::Load(mut spec) = self.mode {
349 spec.offset = offset;
350 self.mode = QueryMode::Load(spec);
351 }
352 self
353 }
354
355 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
357 let schema_info = SchemaInfo::from_entity_model(self.model)?;
359 self.validate_intent()?;
360
361 let normalized_predicate = self
363 .predicate
364 .as_ref()
365 .map(|predicate| {
366 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
367 let predicate = normalize_enum_literals(&schema_info, predicate)?;
368 Ok::<Predicate, ValidateError>(normalize(&predicate))
369 })
370 .transpose()?;
371 let access_plan_value = match &self.key_access {
372 Some(state) => access_plan_from_keys_value(&state.access),
373 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
374 };
375
376 let logical = LogicalPlan {
378 mode: self.mode,
379 predicate: normalized_predicate.map(lower_to_execution_model),
380 order: canonicalize_order_spec(self.model, self.order.clone()),
383 distinct: self.distinct,
384 delete_limit: match self.mode {
385 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
386 QueryMode::Load(_) => None,
387 },
388 page: match self.mode {
389 QueryMode::Load(spec) => {
390 if spec.limit.is_some() || spec.offset > 0 {
391 Some(PageSpec {
392 limit: spec.limit,
393 offset: spec.offset,
394 })
395 } else {
396 None
397 }
398 }
399 QueryMode::Delete(_) => None,
400 },
401 consistency: self.consistency,
402 };
403 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
404
405 validate_query_semantics(&schema_info, self.model, &plan)?;
406
407 Ok(plan)
408 }
409
410 fn validate_intent(&self) -> Result<(), IntentError> {
412 if self.key_access_conflict {
413 return Err(IntentError::KeyAccessConflict);
414 }
415
416 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
417 .map_err(IntentError::from)?;
418
419 if let Some(state) = &self.key_access {
420 match state.kind {
421 KeyAccessKind::Many if self.predicate.is_some() => {
422 return Err(IntentError::ByIdsWithPredicate);
423 }
424 KeyAccessKind::Only if self.predicate.is_some() => {
425 return Err(IntentError::OnlyWithPredicate);
426 }
427 _ => {
428 }
430 }
431 }
432
433 Ok(())
434 }
435}
436
437#[derive(Debug)]
455pub struct PlannedQuery<E: EntityKind> {
456 plan: AccessPlannedQuery<E::Key>,
457 _marker: PhantomData<E>,
458}
459
460impl<E: EntityKind> PlannedQuery<E> {
461 #[must_use]
462 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
463 Self {
464 plan,
465 _marker: PhantomData,
466 }
467 }
468
469 #[must_use]
470 pub fn explain(&self) -> ExplainPlan {
471 self.plan.explain_with_model(E::MODEL)
472 }
473
474 #[must_use]
475 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
476 self.plan
477 }
478}
479
480impl<E: EntityKind> From<PlannedQuery<E>> for ExecutablePlan<E> {
481 fn from(value: PlannedQuery<E>) -> Self {
482 Self::new(value.into_inner())
483 }
484}
485
486#[derive(Debug)]
487pub struct Query<E: EntityKind> {
488 intent: QueryModel<'static, E::Key>,
489 _marker: PhantomData<E>,
490}
491
492impl<E: EntityKind> Query<E> {
493 #[must_use]
497 pub const fn new(consistency: ReadConsistency) -> Self {
498 Self {
499 intent: QueryModel::new(E::MODEL, consistency),
500 _marker: PhantomData,
501 }
502 }
503
504 #[must_use]
506 pub const fn mode(&self) -> QueryMode {
507 self.intent.mode()
508 }
509
510 #[must_use]
511 pub(crate) fn has_explicit_order(&self) -> bool {
512 self.intent.has_explicit_order()
513 }
514
515 #[must_use]
516 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
517 self.intent.load_spec()
518 }
519
520 #[must_use]
522 pub fn filter(mut self, predicate: Predicate) -> Self {
523 self.intent = self.intent.filter(predicate);
524 self
525 }
526
527 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
529 let Self { intent, _marker } = self;
530 let intent = intent.filter_expr(expr)?;
531
532 Ok(Self { intent, _marker })
533 }
534
535 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
537 let Self { intent, _marker } = self;
538 let intent = intent.sort_expr(expr)?;
539
540 Ok(Self { intent, _marker })
541 }
542
543 #[must_use]
545 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
546 self.intent = self.intent.order_by(field);
547 self
548 }
549
550 #[must_use]
552 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
553 self.intent = self.intent.order_by_desc(field);
554 self
555 }
556
557 #[must_use]
559 pub fn distinct(mut self) -> Self {
560 self.intent = self.intent.distinct();
561 self
562 }
563
564 pub(crate) fn by_id(self, id: E::Key) -> Self {
566 let Self { intent, _marker } = self;
567 Self {
568 intent: intent.by_id(id),
569 _marker,
570 }
571 }
572
573 pub(crate) fn by_ids<I>(self, ids: I) -> Self
575 where
576 I: IntoIterator<Item = E::Key>,
577 {
578 let Self { intent, _marker } = self;
579 Self {
580 intent: intent.by_ids(ids),
581 _marker,
582 }
583 }
584
585 #[must_use]
587 pub fn delete(mut self) -> Self {
588 self.intent = self.intent.delete();
589 self
590 }
591
592 #[must_use]
598 pub fn limit(mut self, limit: u32) -> Self {
599 self.intent = self.intent.limit(limit);
600 self
601 }
602
603 #[must_use]
607 pub fn offset(mut self, offset: u32) -> Self {
608 self.intent = self.intent.offset(offset);
609 self
610 }
611
612 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
614 let plan = self.planned()?;
615
616 Ok(plan.explain())
617 }
618
619 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
621 let plan = self.build_plan()?;
622
623 Ok(PlannedQuery::new(plan))
624 }
625
626 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
628 self.planned().map(ExecutablePlan::from)
629 }
630
631 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
633 let plan_value = self.intent.build_plan_model()?;
634 let (logical, access) = plan_value.into_parts();
635 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
636 let plan = AccessPlannedQuery::from_parts(logical, access);
637
638 Ok(plan)
639 }
640}
641
642impl<E> Query<E>
643where
644 E: EntityKind + SingletonEntity,
645 E::Key: Default,
646{
647 pub(crate) fn only(self) -> Self {
649 let Self { intent, _marker } = self;
650
651 Self {
652 intent: intent.only(E::Key::default()),
653 _marker,
654 }
655 }
656}
657
658#[derive(Debug, ThisError)]
663pub enum QueryError {
664 #[error("{0}")]
665 Validate(#[from] ValidateError),
666
667 #[error("{0}")]
668 Plan(Box<PlanError>),
669
670 #[error("{0}")]
671 Intent(#[from] IntentError),
672
673 #[error("{0}")]
674 Response(#[from] ResponseError),
675
676 #[error("{0}")]
677 Execute(#[from] InternalError),
678}
679
680impl From<PlannerError> for QueryError {
681 fn from(err: PlannerError) -> Self {
682 match err {
683 PlannerError::Plan(err) => Self::from(*err),
684 PlannerError::Internal(err) => Self::Execute(*err),
685 }
686 }
687}
688
689impl From<PlanError> for QueryError {
690 fn from(err: PlanError) -> Self {
691 Self::Plan(Box::new(err))
692 }
693}
694
695#[derive(Clone, Copy, Debug, ThisError)]
700pub enum IntentError {
701 #[error("{0}")]
702 PlanShape(#[from] policy::PlanPolicyError),
703
704 #[error("by_ids() cannot be combined with predicates")]
705 ByIdsWithPredicate,
706
707 #[error("only() cannot be combined with predicates")]
708 OnlyWithPredicate,
709
710 #[error("multiple key access methods were used on the same query")]
711 KeyAccessConflict,
712
713 #[error("cursor pagination requires an explicit ordering")]
714 CursorRequiresOrder,
715
716 #[error("cursor pagination requires an explicit limit")]
717 CursorRequiresLimit,
718
719 #[error("cursor tokens can only be used with .page().execute()")]
720 CursorRequiresPagedExecution,
721}
722
723impl From<policy::CursorPagingPolicyError> for IntentError {
724 fn from(err: policy::CursorPagingPolicyError) -> Self {
725 match err {
726 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
727 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
728 }
729 }
730}
731
732fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
734 match order {
735 Some(mut spec) => {
736 spec.fields.push((field.to_string(), direction));
737 spec
738 }
739 None => OrderSpec {
740 fields: vec![(field.to_string(), direction)],
741 },
742 }
743}
744
745fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
750 let mut order = order?;
751 if order.fields.is_empty() {
752 return Some(order);
753 }
754
755 let pk_field = model.primary_key.name;
756 let mut pk_direction = None;
757 order.fields.retain(|(field, direction)| {
758 if field == pk_field {
759 if pk_direction.is_none() {
760 pk_direction = Some(*direction);
761 }
762 false
763 } else {
764 true
765 }
766 });
767
768 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
769 order.fields.push((pk_field.to_string(), pk_direction));
770
771 Some(order)
772}