1#![expect(clippy::used_underscore_binding)]
2
3#[cfg(test)]
9mod tests;
10
11pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
13pub type LoadSpec = crate::db::query::plan::LoadSpec;
14pub type QueryMode = crate::db::query::plan::QueryMode;
15
16use crate::{
17 db::{
18 access::{AccessPath, AccessPlan, AccessPlanError, canonicalize_key_values},
19 predicate::{
20 MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
21 normalize_enum_literals, reject_unsupported_query_features,
22 },
23 query::{
24 explain::ExplainPlan,
25 expr::{FilterExpr, SortExpr, SortLowerError},
26 plan::{
27 AccessPlannedQuery, DeleteLimitSpec, FieldSlot, GroupAggregateKind,
28 GroupAggregateSpec, GroupPlanError, GroupSpec, GroupedExecutionConfig, LogicalPlan,
29 OrderDirection, OrderSpec, PageSpec, PlanError, PlannerError, ScalarPlan,
30 plan_access, validate::validate_query_semantics, validate_group_query_semantics,
31 },
32 policy,
33 },
34 response::ResponseError,
35 },
36 error::InternalError,
37 model::entity::EntityModel,
38 traits::{EntityKind, FieldValue, SingletonEntity},
39 value::Value,
40};
41use std::marker::PhantomData;
42use thiserror::Error as ThisError;
43
44#[derive(Clone, Debug, Eq, PartialEq)]
50pub(crate) enum KeyAccess<K> {
51 Single(K),
52 Many(Vec<K>),
53}
54
55#[derive(Clone, Copy, Debug, Eq, PartialEq)]
61pub(crate) enum KeyAccessKind {
62 Single,
63 Many,
64 Only,
65}
66
67#[derive(Clone, Debug, Eq, PartialEq)]
73pub(crate) struct KeyAccessState<K> {
74 pub kind: KeyAccessKind,
75 pub access: KeyAccess<K>,
76}
77
78pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
80where
81 K: FieldValue,
82{
83 match access {
85 KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
86 KeyAccess::Many(keys) => {
87 let mut values: Vec<Value> = keys.iter().map(FieldValue::to_value).collect();
88 canonicalize_key_values(&mut values);
89 if let Some(first) = values.first()
90 && values.len() == 1
91 {
92 return AccessPlan::path(AccessPath::ByKey(first.clone()));
93 }
94
95 AccessPlan::path(AccessPath::ByKeys(values))
96 }
97 }
98}
99
100pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
102 model: &EntityModel,
103 access: AccessPlan<Value>,
104) -> Result<AccessPlan<E::Key>, PlanError> {
105 access.into_executable::<E>(model)
106}
107
108pub(crate) fn coerce_entity_key<E: EntityKind>(
110 model: &EntityModel,
111 key: &Value,
112) -> Result<E::Key, PlanError> {
113 E::Key::from_value(key).ok_or_else(|| {
114 PlanError::from(AccessPlanError::PrimaryKeyMismatch {
115 field: model.primary_key.name.to_string(),
116 key: key.clone(),
117 })
118 })
119}
120
121impl AccessPlan<Value> {
122 pub(crate) fn into_executable<E: EntityKind>(
124 self,
125 model: &EntityModel,
126 ) -> Result<AccessPlan<E::Key>, PlanError> {
127 match self {
128 Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
129 Self::Union(children) => {
130 let mut out = Vec::with_capacity(children.len());
131 for child in children {
132 out.push(child.into_executable::<E>(model)?);
133 }
134
135 Ok(AccessPlan::union(out))
136 }
137 Self::Intersection(children) => {
138 let mut out = Vec::with_capacity(children.len());
139 for child in children {
140 out.push(child.into_executable::<E>(model)?);
141 }
142
143 Ok(AccessPlan::intersection(out))
144 }
145 }
146 }
147}
148
149impl AccessPath<Value> {
150 pub(crate) fn into_executable<E: EntityKind>(
152 self,
153 model: &EntityModel,
154 ) -> Result<AccessPath<E::Key>, PlanError> {
155 match self {
156 Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
157 Self::ByKeys(keys) => {
158 let mut out = Vec::with_capacity(keys.len());
159 for key in keys {
160 out.push(coerce_entity_key::<E>(model, &key)?);
161 }
162
163 Ok(AccessPath::ByKeys(out))
164 }
165 Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
166 start: coerce_entity_key::<E>(model, &start)?,
167 end: coerce_entity_key::<E>(model, &end)?,
168 }),
169 Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
170 Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
171 Self::FullScan => Ok(AccessPath::FullScan),
172 }
173 }
174}
175
176#[derive(Debug)]
184pub(crate) struct QueryModel<'m, K> {
185 model: &'m EntityModel,
186 mode: QueryMode,
187 predicate: Option<Predicate>,
188 key_access: Option<KeyAccessState<K>>,
189 key_access_conflict: bool,
190 group: Option<crate::db::query::plan::GroupSpec>,
191 order: Option<OrderSpec>,
192 distinct: bool,
193 consistency: MissingRowPolicy,
194}
195
196impl<'m, K: FieldValue> QueryModel<'m, K> {
197 #[must_use]
198 pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
199 Self {
200 model,
201 mode: QueryMode::Load(LoadSpec::new()),
202 predicate: None,
203 key_access: None,
204 key_access_conflict: false,
205 group: None,
206 order: None,
207 distinct: false,
208 consistency,
209 }
210 }
211
212 #[must_use]
214 pub(crate) const fn mode(&self) -> QueryMode {
215 self.mode
216 }
217
218 #[must_use]
219 fn has_explicit_order(&self) -> bool {
220 policy::has_explicit_order(self.order.as_ref())
221 }
222
223 #[must_use]
224 const fn load_spec(&self) -> Option<LoadSpec> {
225 match self.mode {
226 QueryMode::Load(spec) => Some(spec),
227 QueryMode::Delete(_) => None,
228 }
229 }
230
231 #[must_use]
233 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
234 self.predicate = match self.predicate.take() {
235 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
236 None => Some(predicate),
237 };
238 self
239 }
240
241 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
243 let schema = SchemaInfo::from_entity_model(self.model)?;
244 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
245
246 Ok(self.filter(predicate))
247 }
248
249 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
251 let schema = SchemaInfo::from_entity_model(self.model)?;
252 let order = match expr.lower_with(&schema) {
253 Ok(order) => order,
254 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
255 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
256 };
257
258 policy::validate_order_shape(Some(&order))
259 .map_err(IntentError::from)
260 .map_err(QueryError::from)?;
261
262 Ok(self.order_spec(order))
263 }
264
265 #[must_use]
267 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
268 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
269 self
270 }
271
272 #[must_use]
274 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
275 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
276 self
277 }
278
279 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
281 self.order = Some(order);
282 self
283 }
284
285 #[must_use]
287 pub(crate) const fn distinct(mut self) -> Self {
288 self.distinct = true;
289 self
290 }
291
292 fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
295 let Some(field_slot) = FieldSlot::resolve(self.model, field) else {
296 return Err(QueryError::from(PlanError::from(
297 GroupPlanError::UnknownGroupField {
298 field: field.to_string(),
299 },
300 )));
301 };
302 let group = self.group.get_or_insert(GroupSpec {
303 group_fields: Vec::new(),
304 aggregates: Vec::new(),
305 execution: GroupedExecutionConfig::unbounded(),
306 });
307 if !group
308 .group_fields
309 .iter()
310 .any(|existing| existing.index() == field_slot.index())
311 {
312 group.group_fields.push(field_slot);
313 }
314
315 Ok(self)
316 }
317
318 fn push_group_aggregate(
320 mut self,
321 kind: GroupAggregateKind,
322 target_field: Option<String>,
323 ) -> Self {
324 let group = self.group.get_or_insert(GroupSpec {
325 group_fields: Vec::new(),
326 aggregates: Vec::new(),
327 execution: GroupedExecutionConfig::unbounded(),
328 });
329 group
330 .aggregates
331 .push(GroupAggregateSpec { kind, target_field });
332
333 self
334 }
335
336 fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
338 let group = self.group.get_or_insert(GroupSpec {
339 group_fields: Vec::new(),
340 aggregates: Vec::new(),
341 execution: GroupedExecutionConfig::unbounded(),
342 });
343 group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
344
345 self
346 }
347
348 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
350 if let Some(existing) = &self.key_access
351 && existing.kind != kind
352 {
353 self.key_access_conflict = true;
354 }
355
356 self.key_access = Some(KeyAccessState { kind, access });
357
358 self
359 }
360
361 pub(crate) fn by_id(self, id: K) -> Self {
363 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
364 }
365
366 pub(crate) fn by_ids<I>(self, ids: I) -> Self
368 where
369 I: IntoIterator<Item = K>,
370 {
371 self.set_key_access(
372 KeyAccessKind::Many,
373 KeyAccess::Many(ids.into_iter().collect()),
374 )
375 }
376
377 pub(crate) fn only(self, id: K) -> Self {
379 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
380 }
381
382 #[must_use]
384 pub(crate) const fn delete(mut self) -> Self {
385 if self.mode.is_load() {
386 self.mode = QueryMode::Delete(DeleteSpec::new());
387 }
388 self
389 }
390
391 #[must_use]
395 pub(crate) const fn limit(mut self, limit: u32) -> Self {
396 match self.mode {
397 QueryMode::Load(mut spec) => {
398 spec.limit = Some(limit);
399 self.mode = QueryMode::Load(spec);
400 }
401 QueryMode::Delete(mut spec) => {
402 spec.limit = Some(limit);
403 self.mode = QueryMode::Delete(spec);
404 }
405 }
406 self
407 }
408
409 #[must_use]
411 pub(crate) const fn offset(mut self, offset: u32) -> Self {
412 if let QueryMode::Load(mut spec) = self.mode {
413 spec.offset = offset;
414 self.mode = QueryMode::Load(spec);
415 }
416 self
417 }
418
419 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
421 let schema_info = SchemaInfo::from_entity_model(self.model)?;
423 self.validate_intent()?;
424
425 let normalized_predicate = self
427 .predicate
428 .as_ref()
429 .map(|predicate| {
430 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
431 let predicate = normalize_enum_literals(&schema_info, predicate)?;
432 Ok::<Predicate, ValidateError>(normalize(&predicate))
433 })
434 .transpose()?;
435 let access_plan_value = match &self.key_access {
436 Some(state) => access_plan_from_keys_value(&state.access),
437 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
438 };
439
440 let scalar = ScalarPlan {
442 mode: self.mode,
443 predicate: normalized_predicate,
444 order: canonicalize_order_spec(self.model, self.order.clone()),
447 distinct: self.distinct,
448 delete_limit: match self.mode {
449 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
450 QueryMode::Load(_) => None,
451 },
452 page: match self.mode {
453 QueryMode::Load(spec) => {
454 if spec.limit.is_some() || spec.offset > 0 {
455 Some(PageSpec {
456 limit: spec.limit,
457 offset: spec.offset,
458 })
459 } else {
460 None
461 }
462 }
463 QueryMode::Delete(_) => None,
464 },
465 consistency: self.consistency,
466 };
467 let mut plan =
468 AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
469 if let Some(group) = self.group.clone() {
470 plan = plan.into_grouped(group);
471 }
472
473 if plan.grouped_plan().is_some() {
474 validate_group_query_semantics(&schema_info, self.model, &plan)?;
475 } else {
476 validate_query_semantics(&schema_info, self.model, &plan)?;
477 }
478
479 Ok(plan)
480 }
481
482 fn validate_intent(&self) -> Result<(), IntentError> {
484 if self.key_access_conflict {
485 return Err(IntentError::KeyAccessConflict);
486 }
487
488 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
489 .map_err(IntentError::from)?;
490
491 if let Some(state) = &self.key_access {
492 match state.kind {
493 KeyAccessKind::Many if self.predicate.is_some() => {
494 return Err(IntentError::ByIdsWithPredicate);
495 }
496 KeyAccessKind::Only if self.predicate.is_some() => {
497 return Err(IntentError::OnlyWithPredicate);
498 }
499 _ => {
500 }
502 }
503 }
504
505 Ok(())
506 }
507}
508
509#[derive(Debug)]
527pub struct PlannedQuery<E: EntityKind> {
528 plan: AccessPlannedQuery<E::Key>,
529 _marker: PhantomData<E>,
530}
531
532impl<E: EntityKind> PlannedQuery<E> {
533 #[must_use]
534 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
535 Self {
536 plan,
537 _marker: PhantomData,
538 }
539 }
540
541 #[must_use]
542 pub fn explain(&self) -> ExplainPlan {
543 self.plan.explain_with_model(E::MODEL)
544 }
545}
546
547#[derive(Clone, Debug)]
555pub struct CompiledQuery<E: EntityKind> {
556 plan: AccessPlannedQuery<E::Key>,
557 _marker: PhantomData<E>,
558}
559
560impl<E: EntityKind> CompiledQuery<E> {
561 #[must_use]
562 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
563 Self {
564 plan,
565 _marker: PhantomData,
566 }
567 }
568
569 #[must_use]
570 pub fn explain(&self) -> ExplainPlan {
571 self.plan.explain_with_model(E::MODEL)
572 }
573
574 #[must_use]
575 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
576 self.plan
577 }
578}
579
580#[derive(Debug)]
581pub struct Query<E: EntityKind> {
582 intent: QueryModel<'static, E::Key>,
583 _marker: PhantomData<E>,
584}
585
586impl<E: EntityKind> Query<E> {
587 #[must_use]
591 pub const fn new(consistency: MissingRowPolicy) -> Self {
592 Self {
593 intent: QueryModel::new(E::MODEL, consistency),
594 _marker: PhantomData,
595 }
596 }
597
598 #[must_use]
600 pub const fn mode(&self) -> QueryMode {
601 self.intent.mode()
602 }
603
604 #[must_use]
605 pub(crate) fn has_explicit_order(&self) -> bool {
606 self.intent.has_explicit_order()
607 }
608
609 #[must_use]
610 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
611 self.intent.load_spec()
612 }
613
614 #[must_use]
616 pub fn filter(mut self, predicate: Predicate) -> Self {
617 self.intent = self.intent.filter(predicate);
618 self
619 }
620
621 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
623 let Self { intent, _marker } = self;
624 let intent = intent.filter_expr(expr)?;
625
626 Ok(Self { intent, _marker })
627 }
628
629 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
631 let Self { intent, _marker } = self;
632 let intent = intent.sort_expr(expr)?;
633
634 Ok(Self { intent, _marker })
635 }
636
637 #[must_use]
639 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
640 self.intent = self.intent.order_by(field);
641 self
642 }
643
644 #[must_use]
646 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
647 self.intent = self.intent.order_by_desc(field);
648 self
649 }
650
651 #[must_use]
653 pub fn distinct(mut self) -> Self {
654 self.intent = self.intent.distinct();
655 self
656 }
657
658 pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
660 let Self { intent, _marker } = self;
661 let intent = intent.push_group_field(field.as_ref())?;
662
663 Ok(Self { intent, _marker })
664 }
665
666 #[must_use]
668 pub fn group_count(mut self) -> Self {
669 self.intent = self
670 .intent
671 .push_group_aggregate(GroupAggregateKind::Count, None);
672 self
673 }
674
675 #[must_use]
677 pub fn group_exists(mut self) -> Self {
678 self.intent = self
679 .intent
680 .push_group_aggregate(GroupAggregateKind::Exists, None);
681 self
682 }
683
684 #[must_use]
686 pub fn group_first(mut self) -> Self {
687 self.intent = self
688 .intent
689 .push_group_aggregate(GroupAggregateKind::First, None);
690 self
691 }
692
693 #[must_use]
695 pub fn group_last(mut self) -> Self {
696 self.intent = self
697 .intent
698 .push_group_aggregate(GroupAggregateKind::Last, None);
699 self
700 }
701
702 #[must_use]
704 pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
705 self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
706 self
707 }
708
709 pub(crate) fn by_id(self, id: E::Key) -> Self {
711 let Self { intent, _marker } = self;
712 Self {
713 intent: intent.by_id(id),
714 _marker,
715 }
716 }
717
718 pub(crate) fn by_ids<I>(self, ids: I) -> Self
720 where
721 I: IntoIterator<Item = E::Key>,
722 {
723 let Self { intent, _marker } = self;
724 Self {
725 intent: intent.by_ids(ids),
726 _marker,
727 }
728 }
729
730 #[must_use]
732 pub fn delete(mut self) -> Self {
733 self.intent = self.intent.delete();
734 self
735 }
736
737 #[must_use]
743 pub fn limit(mut self, limit: u32) -> Self {
744 self.intent = self.intent.limit(limit);
745 self
746 }
747
748 #[must_use]
752 pub fn offset(mut self, offset: u32) -> Self {
753 self.intent = self.intent.offset(offset);
754 self
755 }
756
757 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
759 let plan = self.planned()?;
760
761 Ok(plan.explain())
762 }
763
764 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
766 let plan = self.build_plan()?;
767
768 Ok(PlannedQuery::new(plan))
769 }
770
771 pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
775 let plan = self.build_plan()?;
776
777 Ok(CompiledQuery::new(plan))
778 }
779
780 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
782 let plan_value = self.intent.build_plan_model()?;
783 let (logical, access) = plan_value.into_parts();
784 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
785 let plan = AccessPlannedQuery::from_parts(logical, access);
786
787 Ok(plan)
788 }
789}
790
791impl<E> Query<E>
792where
793 E: EntityKind + SingletonEntity,
794 E::Key: Default,
795{
796 pub(crate) fn only(self) -> Self {
798 let Self { intent, _marker } = self;
799
800 Self {
801 intent: intent.only(E::Key::default()),
802 _marker,
803 }
804 }
805}
806
807#[derive(Debug, ThisError)]
812pub enum QueryError {
813 #[error("{0}")]
814 Validate(#[from] ValidateError),
815
816 #[error("{0}")]
817 Plan(Box<PlanError>),
818
819 #[error("{0}")]
820 Intent(#[from] IntentError),
821
822 #[error("{0}")]
823 Response(#[from] ResponseError),
824
825 #[error("{0}")]
826 Execute(#[from] InternalError),
827}
828
829impl From<PlannerError> for QueryError {
830 fn from(err: PlannerError) -> Self {
831 match err {
832 PlannerError::Plan(err) => Self::from(*err),
833 PlannerError::Internal(err) => Self::Execute(*err),
834 }
835 }
836}
837
838impl From<PlanError> for QueryError {
839 fn from(err: PlanError) -> Self {
840 Self::Plan(Box::new(err))
841 }
842}
843
844#[derive(Clone, Copy, Debug, ThisError)]
849pub enum IntentError {
850 #[error("{0}")]
851 PlanShape(#[from] policy::PlanPolicyError),
852
853 #[error("by_ids() cannot be combined with predicates")]
854 ByIdsWithPredicate,
855
856 #[error("only() cannot be combined with predicates")]
857 OnlyWithPredicate,
858
859 #[error("multiple key access methods were used on the same query")]
860 KeyAccessConflict,
861
862 #[error("cursor pagination requires an explicit ordering")]
863 CursorRequiresOrder,
864
865 #[error("cursor pagination requires an explicit limit")]
866 CursorRequiresLimit,
867
868 #[error("cursor tokens can only be used with .page().execute()")]
869 CursorRequiresPagedExecution,
870}
871
872impl From<policy::CursorPagingPolicyError> for IntentError {
873 fn from(err: policy::CursorPagingPolicyError) -> Self {
874 match err {
875 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
876 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
877 }
878 }
879}
880
881fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
883 match order {
884 Some(mut spec) => {
885 spec.fields.push((field.to_string(), direction));
886 spec
887 }
888 None => OrderSpec {
889 fields: vec![(field.to_string(), direction)],
890 },
891 }
892}
893
894fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
899 let mut order = order?;
900 if order.fields.is_empty() {
901 return Some(order);
902 }
903
904 let pk_field = model.primary_key.name;
905 let mut pk_direction = None;
906 order.fields.retain(|(field, direction)| {
907 if field == pk_field {
908 if pk_direction.is_none() {
909 pk_direction = Some(*direction);
910 }
911 false
912 } else {
913 true
914 }
915 });
916
917 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
918 order.fields.push((pk_field.to_string(), pk_direction));
919
920 Some(order)
921}