1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
7pub type LoadSpec = crate::db::query::plan::LoadSpec;
8pub type QueryMode = crate::db::query::plan::QueryMode;
9
10use crate::{
11 db::{
12 access::{AccessPath, AccessPlan, AccessPlanError},
13 policy,
14 predicate::{
15 MissingRowPolicy, Predicate, SchemaInfo, ValidateError, normalize,
16 normalize_enum_literals, reject_unsupported_query_features,
17 },
18 query::{
19 explain::ExplainPlan,
20 expr::{FilterExpr, SortExpr, SortLowerError},
21 plan::{
22 AccessPlannedQuery, DeleteLimitSpec, FieldSlot, GroupAggregateKind,
23 GroupAggregateSpec, GroupPlanError, GroupSpec, GroupedExecutionConfig, LogicalPlan,
24 OrderDirection, OrderSpec, PageSpec, PlanError, PlannerError, ScalarPlan,
25 canonical, plan_access, validate::validate_query_semantics,
26 validate_group_query_semantics,
27 },
28 },
29 response::ResponseError,
30 },
31 error::InternalError,
32 model::entity::EntityModel,
33 traits::{EntityKind, FieldValue, SingletonEntity},
34 value::Value,
35};
36use std::marker::PhantomData;
37use thiserror::Error as ThisError;
38
39#[derive(Clone, Debug, Eq, PartialEq)]
45pub(crate) enum KeyAccess<K> {
46 Single(K),
47 Many(Vec<K>),
48}
49
50#[derive(Clone, Copy, Debug, Eq, PartialEq)]
56pub(crate) enum KeyAccessKind {
57 Single,
58 Many,
59 Only,
60}
61
62#[derive(Clone, Debug, Eq, PartialEq)]
68pub(crate) struct KeyAccessState<K> {
69 pub kind: KeyAccessKind,
70 pub access: KeyAccess<K>,
71}
72
73pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
75where
76 K: FieldValue,
77{
78 match access {
79 KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
80 KeyAccess::Many(keys) => {
81 let mut values: Vec<Value> = keys.iter().map(FieldValue::to_value).collect();
82 canonical::canonicalize_key_values(&mut values);
83 if let Some(first) = values.first()
84 && values.len() == 1
85 {
86 return AccessPlan::path(AccessPath::ByKey(first.clone()));
87 }
88
89 AccessPlan::path(AccessPath::ByKeys(values))
90 }
91 }
92}
93
94pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
96 model: &EntityModel,
97 access: AccessPlan<Value>,
98) -> Result<AccessPlan<E::Key>, PlanError> {
99 access.into_executable::<E>(model)
100}
101
102pub(crate) fn coerce_entity_key<E: EntityKind>(
104 model: &EntityModel,
105 key: &Value,
106) -> Result<E::Key, PlanError> {
107 E::Key::from_value(key).ok_or_else(|| {
108 PlanError::from(AccessPlanError::PrimaryKeyMismatch {
109 field: model.primary_key.name.to_string(),
110 key: key.clone(),
111 })
112 })
113}
114
115impl AccessPlan<Value> {
116 pub(crate) fn into_executable<E: EntityKind>(
118 self,
119 model: &EntityModel,
120 ) -> Result<AccessPlan<E::Key>, PlanError> {
121 match self {
122 Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
123 Self::Union(children) => {
124 let mut out = Vec::with_capacity(children.len());
125 for child in children {
126 out.push(child.into_executable::<E>(model)?);
127 }
128
129 Ok(AccessPlan::union(out))
130 }
131 Self::Intersection(children) => {
132 let mut out = Vec::with_capacity(children.len());
133 for child in children {
134 out.push(child.into_executable::<E>(model)?);
135 }
136
137 Ok(AccessPlan::intersection(out))
138 }
139 }
140 }
141}
142
143impl AccessPath<Value> {
144 pub(crate) fn into_executable<E: EntityKind>(
146 self,
147 model: &EntityModel,
148 ) -> Result<AccessPath<E::Key>, PlanError> {
149 match self {
150 Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
151 Self::ByKeys(keys) => {
152 let mut out = Vec::with_capacity(keys.len());
153 for key in keys {
154 out.push(coerce_entity_key::<E>(model, &key)?);
155 }
156
157 Ok(AccessPath::ByKeys(out))
158 }
159 Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
160 start: coerce_entity_key::<E>(model, &start)?,
161 end: coerce_entity_key::<E>(model, &end)?,
162 }),
163 Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
164 Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
165 Self::FullScan => Ok(AccessPath::FullScan),
166 }
167 }
168}
169
170#[derive(Debug)]
178pub(crate) struct QueryModel<'m, K> {
179 model: &'m EntityModel,
180 mode: QueryMode,
181 predicate: Option<Predicate>,
182 key_access: Option<KeyAccessState<K>>,
183 key_access_conflict: bool,
184 group: Option<crate::db::query::plan::GroupSpec>,
185 order: Option<OrderSpec>,
186 distinct: bool,
187 consistency: MissingRowPolicy,
188}
189
190impl<'m, K: FieldValue> QueryModel<'m, K> {
191 #[must_use]
192 pub(crate) const fn new(model: &'m EntityModel, consistency: MissingRowPolicy) -> Self {
193 Self {
194 model,
195 mode: QueryMode::Load(LoadSpec::new()),
196 predicate: None,
197 key_access: None,
198 key_access_conflict: false,
199 group: None,
200 order: None,
201 distinct: false,
202 consistency,
203 }
204 }
205
206 #[must_use]
208 pub(crate) const fn mode(&self) -> QueryMode {
209 self.mode
210 }
211
212 #[must_use]
213 fn has_explicit_order(&self) -> bool {
214 policy::has_explicit_order(self.order.as_ref())
215 }
216
217 #[must_use]
218 const fn load_spec(&self) -> Option<LoadSpec> {
219 match self.mode {
220 QueryMode::Load(spec) => Some(spec),
221 QueryMode::Delete(_) => None,
222 }
223 }
224
225 #[must_use]
227 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
228 self.predicate = match self.predicate.take() {
229 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
230 None => Some(predicate),
231 };
232 self
233 }
234
235 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
237 let schema = SchemaInfo::from_entity_model(self.model)?;
238 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
239
240 Ok(self.filter(predicate))
241 }
242
243 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
245 let schema = SchemaInfo::from_entity_model(self.model)?;
246 let order = match expr.lower_with(&schema) {
247 Ok(order) => order,
248 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
249 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
250 };
251
252 policy::validate_order_shape(Some(&order))
253 .map_err(IntentError::from)
254 .map_err(QueryError::from)?;
255
256 Ok(self.order_spec(order))
257 }
258
259 #[must_use]
261 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
262 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
263 self
264 }
265
266 #[must_use]
268 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
269 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
270 self
271 }
272
273 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
275 self.order = Some(order);
276 self
277 }
278
279 #[must_use]
281 pub(crate) const fn distinct(mut self) -> Self {
282 self.distinct = true;
283 self
284 }
285
286 fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
289 let Some(field_slot) = FieldSlot::resolve(self.model, field) else {
290 return Err(QueryError::from(PlanError::from(
291 GroupPlanError::UnknownGroupField {
292 field: field.to_string(),
293 },
294 )));
295 };
296 let group = self.group.get_or_insert(GroupSpec {
297 group_fields: Vec::new(),
298 aggregates: Vec::new(),
299 execution: GroupedExecutionConfig::unbounded(),
300 });
301 if !group
302 .group_fields
303 .iter()
304 .any(|existing| existing.index() == field_slot.index())
305 {
306 group.group_fields.push(field_slot);
307 }
308
309 Ok(self)
310 }
311
312 fn push_group_aggregate(
314 mut self,
315 kind: GroupAggregateKind,
316 target_field: Option<String>,
317 ) -> Self {
318 let group = self.group.get_or_insert(GroupSpec {
319 group_fields: Vec::new(),
320 aggregates: Vec::new(),
321 execution: GroupedExecutionConfig::unbounded(),
322 });
323 group
324 .aggregates
325 .push(GroupAggregateSpec { kind, target_field });
326
327 self
328 }
329
330 fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
332 let group = self.group.get_or_insert(GroupSpec {
333 group_fields: Vec::new(),
334 aggregates: Vec::new(),
335 execution: GroupedExecutionConfig::unbounded(),
336 });
337 group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
338
339 self
340 }
341
342 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
344 if let Some(existing) = &self.key_access
345 && existing.kind != kind
346 {
347 self.key_access_conflict = true;
348 }
349
350 self.key_access = Some(KeyAccessState { kind, access });
351
352 self
353 }
354
355 pub(crate) fn by_id(self, id: K) -> Self {
357 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
358 }
359
360 pub(crate) fn by_ids<I>(self, ids: I) -> Self
362 where
363 I: IntoIterator<Item = K>,
364 {
365 self.set_key_access(
366 KeyAccessKind::Many,
367 KeyAccess::Many(ids.into_iter().collect()),
368 )
369 }
370
371 pub(crate) fn only(self, id: K) -> Self {
373 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
374 }
375
376 #[must_use]
378 pub(crate) const fn delete(mut self) -> Self {
379 if self.mode.is_load() {
380 self.mode = QueryMode::Delete(DeleteSpec::new());
381 }
382 self
383 }
384
385 #[must_use]
389 pub(crate) const fn limit(mut self, limit: u32) -> Self {
390 match self.mode {
391 QueryMode::Load(mut spec) => {
392 spec.limit = Some(limit);
393 self.mode = QueryMode::Load(spec);
394 }
395 QueryMode::Delete(mut spec) => {
396 spec.limit = Some(limit);
397 self.mode = QueryMode::Delete(spec);
398 }
399 }
400 self
401 }
402
403 #[must_use]
405 pub(crate) const fn offset(mut self, offset: u32) -> Self {
406 if let QueryMode::Load(mut spec) = self.mode {
407 spec.offset = offset;
408 self.mode = QueryMode::Load(spec);
409 }
410 self
411 }
412
413 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
415 let schema_info = SchemaInfo::from_entity_model(self.model)?;
417 self.validate_intent()?;
418
419 let normalized_predicate = self
421 .predicate
422 .as_ref()
423 .map(|predicate| {
424 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
425 let predicate = normalize_enum_literals(&schema_info, predicate)?;
426 Ok::<Predicate, ValidateError>(normalize(&predicate))
427 })
428 .transpose()?;
429 let access_plan_value = match &self.key_access {
430 Some(state) => access_plan_from_keys_value(&state.access),
431 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
432 };
433
434 let scalar = ScalarPlan {
436 mode: self.mode,
437 predicate: normalized_predicate,
438 order: canonicalize_order_spec(self.model, self.order.clone()),
441 distinct: self.distinct,
442 delete_limit: match self.mode {
443 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
444 QueryMode::Load(_) => None,
445 },
446 page: match self.mode {
447 QueryMode::Load(spec) => {
448 if spec.limit.is_some() || spec.offset > 0 {
449 Some(PageSpec {
450 limit: spec.limit,
451 offset: spec.offset,
452 })
453 } else {
454 None
455 }
456 }
457 QueryMode::Delete(_) => None,
458 },
459 consistency: self.consistency,
460 };
461 let mut plan =
462 AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
463 if let Some(group) = self.group.clone() {
464 plan = plan.into_grouped(group);
465 }
466
467 if plan.grouped_plan().is_some() {
468 validate_group_query_semantics(&schema_info, self.model, &plan)?;
469 } else {
470 validate_query_semantics(&schema_info, self.model, &plan)?;
471 }
472
473 Ok(plan)
474 }
475
476 fn validate_intent(&self) -> Result<(), IntentError> {
478 if self.key_access_conflict {
479 return Err(IntentError::KeyAccessConflict);
480 }
481
482 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
483 .map_err(IntentError::from)?;
484
485 if let Some(state) = &self.key_access {
486 match state.kind {
487 KeyAccessKind::Many if self.predicate.is_some() => {
488 return Err(IntentError::ByIdsWithPredicate);
489 }
490 KeyAccessKind::Only if self.predicate.is_some() => {
491 return Err(IntentError::OnlyWithPredicate);
492 }
493 _ => {
494 }
496 }
497 }
498
499 Ok(())
500 }
501}
502
503#[derive(Debug)]
521pub struct PlannedQuery<E: EntityKind> {
522 plan: AccessPlannedQuery<E::Key>,
523 _marker: PhantomData<E>,
524}
525
526impl<E: EntityKind> PlannedQuery<E> {
527 #[must_use]
528 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
529 Self {
530 plan,
531 _marker: PhantomData,
532 }
533 }
534
535 #[must_use]
536 pub fn explain(&self) -> ExplainPlan {
537 self.plan.explain_with_model(E::MODEL)
538 }
539
540 #[must_use]
541 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
542 self.plan
543 }
544}
545
546#[derive(Debug)]
547pub struct Query<E: EntityKind> {
548 intent: QueryModel<'static, E::Key>,
549 _marker: PhantomData<E>,
550}
551
552impl<E: EntityKind> Query<E> {
553 #[must_use]
557 pub const fn new(consistency: MissingRowPolicy) -> Self {
558 Self {
559 intent: QueryModel::new(E::MODEL, consistency),
560 _marker: PhantomData,
561 }
562 }
563
564 #[must_use]
566 pub const fn mode(&self) -> QueryMode {
567 self.intent.mode()
568 }
569
570 #[must_use]
571 pub(crate) fn has_explicit_order(&self) -> bool {
572 self.intent.has_explicit_order()
573 }
574
575 #[must_use]
576 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
577 self.intent.load_spec()
578 }
579
580 #[must_use]
582 pub fn filter(mut self, predicate: Predicate) -> Self {
583 self.intent = self.intent.filter(predicate);
584 self
585 }
586
587 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
589 let Self { intent, _marker } = self;
590 let intent = intent.filter_expr(expr)?;
591
592 Ok(Self { intent, _marker })
593 }
594
595 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
597 let Self { intent, _marker } = self;
598 let intent = intent.sort_expr(expr)?;
599
600 Ok(Self { intent, _marker })
601 }
602
603 #[must_use]
605 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
606 self.intent = self.intent.order_by(field);
607 self
608 }
609
610 #[must_use]
612 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
613 self.intent = self.intent.order_by_desc(field);
614 self
615 }
616
617 #[must_use]
619 pub fn distinct(mut self) -> Self {
620 self.intent = self.intent.distinct();
621 self
622 }
623
624 pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
626 let Self { intent, _marker } = self;
627 let intent = intent.push_group_field(field.as_ref())?;
628
629 Ok(Self { intent, _marker })
630 }
631
632 #[must_use]
634 pub fn group_count(mut self) -> Self {
635 self.intent = self
636 .intent
637 .push_group_aggregate(GroupAggregateKind::Count, None);
638 self
639 }
640
641 #[must_use]
643 pub fn group_exists(mut self) -> Self {
644 self.intent = self
645 .intent
646 .push_group_aggregate(GroupAggregateKind::Exists, None);
647 self
648 }
649
650 #[must_use]
652 pub fn group_first(mut self) -> Self {
653 self.intent = self
654 .intent
655 .push_group_aggregate(GroupAggregateKind::First, None);
656 self
657 }
658
659 #[must_use]
661 pub fn group_last(mut self) -> Self {
662 self.intent = self
663 .intent
664 .push_group_aggregate(GroupAggregateKind::Last, None);
665 self
666 }
667
668 #[must_use]
670 pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
671 self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
672 self
673 }
674
675 pub(crate) fn by_id(self, id: E::Key) -> Self {
677 let Self { intent, _marker } = self;
678 Self {
679 intent: intent.by_id(id),
680 _marker,
681 }
682 }
683
684 pub(crate) fn by_ids<I>(self, ids: I) -> Self
686 where
687 I: IntoIterator<Item = E::Key>,
688 {
689 let Self { intent, _marker } = self;
690 Self {
691 intent: intent.by_ids(ids),
692 _marker,
693 }
694 }
695
696 #[must_use]
698 pub fn delete(mut self) -> Self {
699 self.intent = self.intent.delete();
700 self
701 }
702
703 #[must_use]
709 pub fn limit(mut self, limit: u32) -> Self {
710 self.intent = self.intent.limit(limit);
711 self
712 }
713
714 #[must_use]
718 pub fn offset(mut self, offset: u32) -> Self {
719 self.intent = self.intent.offset(offset);
720 self
721 }
722
723 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
725 let plan = self.planned()?;
726
727 Ok(plan.explain())
728 }
729
730 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
732 let plan = self.build_plan()?;
733
734 Ok(PlannedQuery::new(plan))
735 }
736
737 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
739 let plan_value = self.intent.build_plan_model()?;
740 let (logical, access) = plan_value.into_parts();
741 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
742 let plan = AccessPlannedQuery::from_parts(logical, access);
743
744 Ok(plan)
745 }
746}
747
748impl<E> Query<E>
749where
750 E: EntityKind + SingletonEntity,
751 E::Key: Default,
752{
753 pub(crate) fn only(self) -> Self {
755 let Self { intent, _marker } = self;
756
757 Self {
758 intent: intent.only(E::Key::default()),
759 _marker,
760 }
761 }
762}
763
764#[derive(Debug, ThisError)]
769pub enum QueryError {
770 #[error("{0}")]
771 Validate(#[from] ValidateError),
772
773 #[error("{0}")]
774 Plan(Box<PlanError>),
775
776 #[error("{0}")]
777 Intent(#[from] IntentError),
778
779 #[error("{0}")]
780 Response(#[from] ResponseError),
781
782 #[error("{0}")]
783 Execute(#[from] InternalError),
784}
785
786impl From<PlannerError> for QueryError {
787 fn from(err: PlannerError) -> Self {
788 match err {
789 PlannerError::Plan(err) => Self::from(*err),
790 PlannerError::Internal(err) => Self::Execute(*err),
791 }
792 }
793}
794
795impl From<PlanError> for QueryError {
796 fn from(err: PlanError) -> Self {
797 Self::Plan(Box::new(err))
798 }
799}
800
801#[derive(Clone, Copy, Debug, ThisError)]
806pub enum IntentError {
807 #[error("{0}")]
808 PlanShape(#[from] policy::PlanPolicyError),
809
810 #[error("by_ids() cannot be combined with predicates")]
811 ByIdsWithPredicate,
812
813 #[error("only() cannot be combined with predicates")]
814 OnlyWithPredicate,
815
816 #[error("multiple key access methods were used on the same query")]
817 KeyAccessConflict,
818
819 #[error("cursor pagination requires an explicit ordering")]
820 CursorRequiresOrder,
821
822 #[error("cursor pagination requires an explicit limit")]
823 CursorRequiresLimit,
824
825 #[error("cursor tokens can only be used with .page().execute()")]
826 CursorRequiresPagedExecution,
827}
828
829impl From<policy::CursorPagingPolicyError> for IntentError {
830 fn from(err: policy::CursorPagingPolicyError) -> Self {
831 match err {
832 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
833 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
834 }
835 }
836}
837
838fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
840 match order {
841 Some(mut spec) => {
842 spec.fields.push((field.to_string(), direction));
843 spec
844 }
845 None => OrderSpec {
846 fields: vec![(field.to_string(), direction)],
847 },
848 }
849}
850
851fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
856 let mut order = order?;
857 if order.fields.is_empty() {
858 return Some(order);
859 }
860
861 let pk_field = model.primary_key.name;
862 let mut pk_direction = None;
863 order.fields.retain(|(field, direction)| {
864 if field == pk_field {
865 if pk_direction.is_none() {
866 pk_direction = Some(*direction);
867 }
868 false
869 } else {
870 true
871 }
872 });
873
874 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
875 order.fields.push((pk_field.to_string(), pk_direction));
876
877 Some(order)
878}