1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
7pub type LoadSpec = crate::db::query::plan::LoadSpec;
8pub type QueryMode = crate::db::query::plan::QueryMode;
9
10use crate::{
11 db::{
12 access::{AccessPath, AccessPlan, AccessPlanError, canonicalize_key_values},
13 predicate::{
14 MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
15 normalize_enum_literals, reject_unsupported_query_features,
16 },
17 query::{
18 explain::ExplainPlan,
19 expr::{FilterExpr, SortExpr, SortLowerError},
20 plan::{
21 AccessPlannedQuery, DeleteLimitSpec, FieldSlot, GroupAggregateKind,
22 GroupAggregateSpec, GroupPlanError, GroupSpec, GroupedExecutionConfig, LogicalPlan,
23 OrderDirection, OrderSpec, PageSpec, PlanError, PlannerError, ScalarPlan,
24 plan_access, validate::validate_query_semantics, validate_group_query_semantics,
25 },
26 policy,
27 },
28 response::ResponseError,
29 },
30 error::InternalError,
31 model::entity::EntityModel,
32 traits::{EntityKind, FieldValue, SingletonEntity},
33 value::Value,
34};
35use std::marker::PhantomData;
36use thiserror::Error as ThisError;
37
38#[derive(Clone, Debug, Eq, PartialEq)]
44pub(crate) enum KeyAccess<K> {
45 Single(K),
46 Many(Vec<K>),
47}
48
49#[derive(Clone, Copy, Debug, Eq, PartialEq)]
55pub(crate) enum KeyAccessKind {
56 Single,
57 Many,
58 Only,
59}
60
61#[derive(Clone, Debug, Eq, PartialEq)]
67pub(crate) struct KeyAccessState<K> {
68 pub kind: KeyAccessKind,
69 pub access: KeyAccess<K>,
70}
71
72pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
74where
75 K: FieldValue,
76{
77 match access {
78 KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
79 KeyAccess::Many(keys) => {
80 let mut values: Vec<Value> = keys.iter().map(FieldValue::to_value).collect();
81 canonicalize_key_values(&mut values);
82 if let Some(first) = values.first()
83 && values.len() == 1
84 {
85 return AccessPlan::path(AccessPath::ByKey(first.clone()));
86 }
87
88 AccessPlan::path(AccessPath::ByKeys(values))
89 }
90 }
91}
92
93pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
95 model: &EntityModel,
96 access: AccessPlan<Value>,
97) -> Result<AccessPlan<E::Key>, PlanError> {
98 access.into_executable::<E>(model)
99}
100
101pub(crate) fn coerce_entity_key<E: EntityKind>(
103 model: &EntityModel,
104 key: &Value,
105) -> Result<E::Key, PlanError> {
106 E::Key::from_value(key).ok_or_else(|| {
107 PlanError::from(AccessPlanError::PrimaryKeyMismatch {
108 field: model.primary_key.name.to_string(),
109 key: key.clone(),
110 })
111 })
112}
113
114impl AccessPlan<Value> {
115 pub(crate) fn into_executable<E: EntityKind>(
117 self,
118 model: &EntityModel,
119 ) -> Result<AccessPlan<E::Key>, PlanError> {
120 match self {
121 Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
122 Self::Union(children) => {
123 let mut out = Vec::with_capacity(children.len());
124 for child in children {
125 out.push(child.into_executable::<E>(model)?);
126 }
127
128 Ok(AccessPlan::union(out))
129 }
130 Self::Intersection(children) => {
131 let mut out = Vec::with_capacity(children.len());
132 for child in children {
133 out.push(child.into_executable::<E>(model)?);
134 }
135
136 Ok(AccessPlan::intersection(out))
137 }
138 }
139 }
140}
141
142impl AccessPath<Value> {
143 pub(crate) fn into_executable<E: EntityKind>(
145 self,
146 model: &EntityModel,
147 ) -> Result<AccessPath<E::Key>, PlanError> {
148 match self {
149 Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
150 Self::ByKeys(keys) => {
151 let mut out = Vec::with_capacity(keys.len());
152 for key in keys {
153 out.push(coerce_entity_key::<E>(model, &key)?);
154 }
155
156 Ok(AccessPath::ByKeys(out))
157 }
158 Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
159 start: coerce_entity_key::<E>(model, &start)?,
160 end: coerce_entity_key::<E>(model, &end)?,
161 }),
162 Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
163 Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
164 Self::FullScan => Ok(AccessPath::FullScan),
165 }
166 }
167}
168
169#[derive(Debug)]
177pub(crate) struct QueryModel<'m, K> {
178 model: &'m EntityModel,
179 mode: QueryMode,
180 predicate: Option<Predicate>,
181 key_access: Option<KeyAccessState<K>>,
182 key_access_conflict: bool,
183 group: Option<crate::db::query::plan::GroupSpec>,
184 order: Option<OrderSpec>,
185 distinct: bool,
186 consistency: MissingRowPolicy,
187}
188
189impl<'m, K: FieldValue> QueryModel<'m, K> {
190 #[must_use]
191 pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
192 Self {
193 model,
194 mode: QueryMode::Load(LoadSpec::new()),
195 predicate: None,
196 key_access: None,
197 key_access_conflict: false,
198 group: None,
199 order: None,
200 distinct: false,
201 consistency,
202 }
203 }
204
205 #[must_use]
207 pub(crate) const fn mode(&self) -> QueryMode {
208 self.mode
209 }
210
211 #[must_use]
212 fn has_explicit_order(&self) -> bool {
213 policy::has_explicit_order(self.order.as_ref())
214 }
215
216 #[must_use]
217 const fn load_spec(&self) -> Option<LoadSpec> {
218 match self.mode {
219 QueryMode::Load(spec) => Some(spec),
220 QueryMode::Delete(_) => None,
221 }
222 }
223
224 #[must_use]
226 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
227 self.predicate = match self.predicate.take() {
228 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
229 None => Some(predicate),
230 };
231 self
232 }
233
234 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
236 let schema = SchemaInfo::from_entity_model(self.model)?;
237 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
238
239 Ok(self.filter(predicate))
240 }
241
242 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
244 let schema = SchemaInfo::from_entity_model(self.model)?;
245 let order = match expr.lower_with(&schema) {
246 Ok(order) => order,
247 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
248 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
249 };
250
251 policy::validate_order_shape(Some(&order))
252 .map_err(IntentError::from)
253 .map_err(QueryError::from)?;
254
255 Ok(self.order_spec(order))
256 }
257
258 #[must_use]
260 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
261 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
262 self
263 }
264
265 #[must_use]
267 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
268 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
269 self
270 }
271
272 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
274 self.order = Some(order);
275 self
276 }
277
278 #[must_use]
280 pub(crate) const fn distinct(mut self) -> Self {
281 self.distinct = true;
282 self
283 }
284
285 fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
288 let Some(field_slot) = FieldSlot::resolve(self.model, field) else {
289 return Err(QueryError::from(PlanError::from(
290 GroupPlanError::UnknownGroupField {
291 field: field.to_string(),
292 },
293 )));
294 };
295 let group = self.group.get_or_insert(GroupSpec {
296 group_fields: Vec::new(),
297 aggregates: Vec::new(),
298 execution: GroupedExecutionConfig::unbounded(),
299 });
300 if !group
301 .group_fields
302 .iter()
303 .any(|existing| existing.index() == field_slot.index())
304 {
305 group.group_fields.push(field_slot);
306 }
307
308 Ok(self)
309 }
310
311 fn push_group_aggregate(
313 mut self,
314 kind: GroupAggregateKind,
315 target_field: Option<String>,
316 ) -> Self {
317 let group = self.group.get_or_insert(GroupSpec {
318 group_fields: Vec::new(),
319 aggregates: Vec::new(),
320 execution: GroupedExecutionConfig::unbounded(),
321 });
322 group
323 .aggregates
324 .push(GroupAggregateSpec { kind, target_field });
325
326 self
327 }
328
329 fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
331 let group = self.group.get_or_insert(GroupSpec {
332 group_fields: Vec::new(),
333 aggregates: Vec::new(),
334 execution: GroupedExecutionConfig::unbounded(),
335 });
336 group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
337
338 self
339 }
340
341 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
343 if let Some(existing) = &self.key_access
344 && existing.kind != kind
345 {
346 self.key_access_conflict = true;
347 }
348
349 self.key_access = Some(KeyAccessState { kind, access });
350
351 self
352 }
353
354 pub(crate) fn by_id(self, id: K) -> Self {
356 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
357 }
358
359 pub(crate) fn by_ids<I>(self, ids: I) -> Self
361 where
362 I: IntoIterator<Item = K>,
363 {
364 self.set_key_access(
365 KeyAccessKind::Many,
366 KeyAccess::Many(ids.into_iter().collect()),
367 )
368 }
369
370 pub(crate) fn only(self, id: K) -> Self {
372 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
373 }
374
375 #[must_use]
377 pub(crate) const fn delete(mut self) -> Self {
378 if self.mode.is_load() {
379 self.mode = QueryMode::Delete(DeleteSpec::new());
380 }
381 self
382 }
383
384 #[must_use]
388 pub(crate) const fn limit(mut self, limit: u32) -> Self {
389 match self.mode {
390 QueryMode::Load(mut spec) => {
391 spec.limit = Some(limit);
392 self.mode = QueryMode::Load(spec);
393 }
394 QueryMode::Delete(mut spec) => {
395 spec.limit = Some(limit);
396 self.mode = QueryMode::Delete(spec);
397 }
398 }
399 self
400 }
401
402 #[must_use]
404 pub(crate) const fn offset(mut self, offset: u32) -> Self {
405 if let QueryMode::Load(mut spec) = self.mode {
406 spec.offset = offset;
407 self.mode = QueryMode::Load(spec);
408 }
409 self
410 }
411
412 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
414 let schema_info = SchemaInfo::from_entity_model(self.model)?;
416 self.validate_intent()?;
417
418 let normalized_predicate = self
420 .predicate
421 .as_ref()
422 .map(|predicate| {
423 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
424 let predicate = normalize_enum_literals(&schema_info, predicate)?;
425 Ok::<Predicate, ValidateError>(normalize(&predicate))
426 })
427 .transpose()?;
428 let access_plan_value = match &self.key_access {
429 Some(state) => access_plan_from_keys_value(&state.access),
430 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
431 };
432
433 let scalar = ScalarPlan {
435 mode: self.mode,
436 predicate: normalized_predicate,
437 order: canonicalize_order_spec(self.model, self.order.clone()),
440 distinct: self.distinct,
441 delete_limit: match self.mode {
442 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
443 QueryMode::Load(_) => None,
444 },
445 page: match self.mode {
446 QueryMode::Load(spec) => {
447 if spec.limit.is_some() || spec.offset > 0 {
448 Some(PageSpec {
449 limit: spec.limit,
450 offset: spec.offset,
451 })
452 } else {
453 None
454 }
455 }
456 QueryMode::Delete(_) => None,
457 },
458 consistency: self.consistency,
459 };
460 let mut plan =
461 AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
462 if let Some(group) = self.group.clone() {
463 plan = plan.into_grouped(group);
464 }
465
466 if plan.grouped_plan().is_some() {
467 validate_group_query_semantics(&schema_info, self.model, &plan)?;
468 } else {
469 validate_query_semantics(&schema_info, self.model, &plan)?;
470 }
471
472 Ok(plan)
473 }
474
475 fn validate_intent(&self) -> Result<(), IntentError> {
477 if self.key_access_conflict {
478 return Err(IntentError::KeyAccessConflict);
479 }
480
481 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
482 .map_err(IntentError::from)?;
483
484 if let Some(state) = &self.key_access {
485 match state.kind {
486 KeyAccessKind::Many if self.predicate.is_some() => {
487 return Err(IntentError::ByIdsWithPredicate);
488 }
489 KeyAccessKind::Only if self.predicate.is_some() => {
490 return Err(IntentError::OnlyWithPredicate);
491 }
492 _ => {
493 }
495 }
496 }
497
498 Ok(())
499 }
500}
501
502#[derive(Debug)]
520pub struct PlannedQuery<E: EntityKind> {
521 plan: AccessPlannedQuery<E::Key>,
522 _marker: PhantomData<E>,
523}
524
525impl<E: EntityKind> PlannedQuery<E> {
526 #[must_use]
527 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
528 Self {
529 plan,
530 _marker: PhantomData,
531 }
532 }
533
534 #[must_use]
535 pub fn explain(&self) -> ExplainPlan {
536 self.plan.explain_with_model(E::MODEL)
537 }
538}
539
540#[derive(Clone, Debug)]
548pub struct CompiledQuery<E: EntityKind> {
549 plan: AccessPlannedQuery<E::Key>,
550 _marker: PhantomData<E>,
551}
552
553impl<E: EntityKind> CompiledQuery<E> {
554 #[must_use]
555 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
556 Self {
557 plan,
558 _marker: PhantomData,
559 }
560 }
561
562 #[must_use]
563 pub fn explain(&self) -> ExplainPlan {
564 self.plan.explain_with_model(E::MODEL)
565 }
566
567 #[must_use]
568 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
569 self.plan
570 }
571}
572
573#[derive(Debug)]
574pub struct Query<E: EntityKind> {
575 intent: QueryModel<'static, E::Key>,
576 _marker: PhantomData<E>,
577}
578
579impl<E: EntityKind> Query<E> {
580 #[must_use]
584 pub const fn new(consistency: MissingRowPolicy) -> Self {
585 Self {
586 intent: QueryModel::new(E::MODEL, consistency),
587 _marker: PhantomData,
588 }
589 }
590
591 #[must_use]
593 pub const fn mode(&self) -> QueryMode {
594 self.intent.mode()
595 }
596
597 #[must_use]
598 pub(crate) fn has_explicit_order(&self) -> bool {
599 self.intent.has_explicit_order()
600 }
601
602 #[must_use]
603 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
604 self.intent.load_spec()
605 }
606
607 #[must_use]
609 pub fn filter(mut self, predicate: Predicate) -> Self {
610 self.intent = self.intent.filter(predicate);
611 self
612 }
613
614 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
616 let Self { intent, _marker } = self;
617 let intent = intent.filter_expr(expr)?;
618
619 Ok(Self { intent, _marker })
620 }
621
622 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
624 let Self { intent, _marker } = self;
625 let intent = intent.sort_expr(expr)?;
626
627 Ok(Self { intent, _marker })
628 }
629
630 #[must_use]
632 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
633 self.intent = self.intent.order_by(field);
634 self
635 }
636
637 #[must_use]
639 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
640 self.intent = self.intent.order_by_desc(field);
641 self
642 }
643
644 #[must_use]
646 pub fn distinct(mut self) -> Self {
647 self.intent = self.intent.distinct();
648 self
649 }
650
651 pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
653 let Self { intent, _marker } = self;
654 let intent = intent.push_group_field(field.as_ref())?;
655
656 Ok(Self { intent, _marker })
657 }
658
659 #[must_use]
661 pub fn group_count(mut self) -> Self {
662 self.intent = self
663 .intent
664 .push_group_aggregate(GroupAggregateKind::Count, None);
665 self
666 }
667
668 #[must_use]
670 pub fn group_exists(mut self) -> Self {
671 self.intent = self
672 .intent
673 .push_group_aggregate(GroupAggregateKind::Exists, None);
674 self
675 }
676
677 #[must_use]
679 pub fn group_first(mut self) -> Self {
680 self.intent = self
681 .intent
682 .push_group_aggregate(GroupAggregateKind::First, None);
683 self
684 }
685
686 #[must_use]
688 pub fn group_last(mut self) -> Self {
689 self.intent = self
690 .intent
691 .push_group_aggregate(GroupAggregateKind::Last, None);
692 self
693 }
694
695 #[must_use]
697 pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
698 self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
699 self
700 }
701
702 pub(crate) fn by_id(self, id: E::Key) -> Self {
704 let Self { intent, _marker } = self;
705 Self {
706 intent: intent.by_id(id),
707 _marker,
708 }
709 }
710
711 pub(crate) fn by_ids<I>(self, ids: I) -> Self
713 where
714 I: IntoIterator<Item = E::Key>,
715 {
716 let Self { intent, _marker } = self;
717 Self {
718 intent: intent.by_ids(ids),
719 _marker,
720 }
721 }
722
723 #[must_use]
725 pub fn delete(mut self) -> Self {
726 self.intent = self.intent.delete();
727 self
728 }
729
730 #[must_use]
736 pub fn limit(mut self, limit: u32) -> Self {
737 self.intent = self.intent.limit(limit);
738 self
739 }
740
741 #[must_use]
745 pub fn offset(mut self, offset: u32) -> Self {
746 self.intent = self.intent.offset(offset);
747 self
748 }
749
750 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
752 let plan = self.planned()?;
753
754 Ok(plan.explain())
755 }
756
757 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
759 let plan = self.build_plan()?;
760
761 Ok(PlannedQuery::new(plan))
762 }
763
764 pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
768 let plan = self.build_plan()?;
769
770 Ok(CompiledQuery::new(plan))
771 }
772
773 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
775 let plan_value = self.intent.build_plan_model()?;
776 let (logical, access) = plan_value.into_parts();
777 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
778 let plan = AccessPlannedQuery::from_parts(logical, access);
779
780 Ok(plan)
781 }
782}
783
784impl<E> Query<E>
785where
786 E: EntityKind + SingletonEntity,
787 E::Key: Default,
788{
789 pub(crate) fn only(self) -> Self {
791 let Self { intent, _marker } = self;
792
793 Self {
794 intent: intent.only(E::Key::default()),
795 _marker,
796 }
797 }
798}
799
800#[derive(Debug, ThisError)]
805pub enum QueryError {
806 #[error("{0}")]
807 Validate(#[from] ValidateError),
808
809 #[error("{0}")]
810 Plan(Box<PlanError>),
811
812 #[error("{0}")]
813 Intent(#[from] IntentError),
814
815 #[error("{0}")]
816 Response(#[from] ResponseError),
817
818 #[error("{0}")]
819 Execute(#[from] InternalError),
820}
821
822impl From<PlannerError> for QueryError {
823 fn from(err: PlannerError) -> Self {
824 match err {
825 PlannerError::Plan(err) => Self::from(*err),
826 PlannerError::Internal(err) => Self::Execute(*err),
827 }
828 }
829}
830
831impl From<PlanError> for QueryError {
832 fn from(err: PlanError) -> Self {
833 Self::Plan(Box::new(err))
834 }
835}
836
837#[derive(Clone, Copy, Debug, ThisError)]
842pub enum IntentError {
843 #[error("{0}")]
844 PlanShape(#[from] policy::PlanPolicyError),
845
846 #[error("by_ids() cannot be combined with predicates")]
847 ByIdsWithPredicate,
848
849 #[error("only() cannot be combined with predicates")]
850 OnlyWithPredicate,
851
852 #[error("multiple key access methods were used on the same query")]
853 KeyAccessConflict,
854
855 #[error("cursor pagination requires an explicit ordering")]
856 CursorRequiresOrder,
857
858 #[error("cursor pagination requires an explicit limit")]
859 CursorRequiresLimit,
860
861 #[error("cursor tokens can only be used with .page().execute()")]
862 CursorRequiresPagedExecution,
863}
864
865impl From<policy::CursorPagingPolicyError> for IntentError {
866 fn from(err: policy::CursorPagingPolicyError) -> Self {
867 match err {
868 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
869 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
870 }
871 }
872}
873
874fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
876 match order {
877 Some(mut spec) => {
878 spec.fields.push((field.to_string(), direction));
879 spec
880 }
881 None => OrderSpec {
882 fields: vec![(field.to_string(), direction)],
883 },
884 }
885}
886
887fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
892 let mut order = order?;
893 if order.fields.is_empty() {
894 return Some(order);
895 }
896
897 let pk_field = model.primary_key.name;
898 let mut pk_direction = None;
899 order.fields.retain(|(field, direction)| {
900 if field == pk_field {
901 if pk_direction.is_none() {
902 pk_direction = Some(*direction);
903 }
904 false
905 } else {
906 true
907 }
908 });
909
910 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
911 order.fields.push((pk_field.to_string(), pk_direction));
912
913 Some(order)
914}