1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
7pub type LoadSpec = crate::db::query::plan::LoadSpec;
8pub type QueryMode = crate::db::query::plan::QueryMode;
9
10use crate::{
11 db::{
12 access::{AccessPath, AccessPlan, AccessPlanError},
13 contracts::{Predicate, ReadConsistency, SchemaInfo, ValidateError},
14 policy,
15 query::{
16 explain::ExplainPlan,
17 expr::{FilterExpr, SortExpr, SortLowerError},
18 plan::{
19 AccessPlannedQuery, DeleteLimitSpec, FieldSlot, GroupAggregateKind,
20 GroupAggregateSpec, GroupPlanError, GroupSpec, GroupedExecutionConfig, LogicalPlan,
21 OrderDirection, OrderSpec, PageSpec, PlanError, PlannerError, ScalarPlan,
22 canonical, plan_access, validate::validate_query_semantics,
23 validate_group_query_semantics,
24 },
25 predicate::{
26 lower_to_execution_model, normalize, normalize_enum_literals,
27 validate::reject_unsupported_query_features,
28 },
29 },
30 response::ResponseError,
31 },
32 error::InternalError,
33 model::entity::EntityModel,
34 traits::{EntityKind, FieldValue, SingletonEntity},
35 value::Value,
36};
37use std::marker::PhantomData;
38use thiserror::Error as ThisError;
39
40#[derive(Clone, Debug, Eq, PartialEq)]
46pub(crate) enum KeyAccess<K> {
47 Single(K),
48 Many(Vec<K>),
49}
50
51#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub(crate) enum KeyAccessKind {
58 Single,
59 Many,
60 Only,
61}
62
63#[derive(Clone, Debug, Eq, PartialEq)]
69pub(crate) struct KeyAccessState<K> {
70 pub kind: KeyAccessKind,
71 pub access: KeyAccess<K>,
72}
73
74pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
76where
77 K: FieldValue,
78{
79 match access {
80 KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
81 KeyAccess::Many(keys) => {
82 let mut values: Vec<Value> = keys.iter().map(FieldValue::to_value).collect();
83 canonical::canonicalize_key_values(&mut values);
84 if let Some(first) = values.first()
85 && values.len() == 1
86 {
87 return AccessPlan::path(AccessPath::ByKey(first.clone()));
88 }
89
90 AccessPlan::path(AccessPath::ByKeys(values))
91 }
92 }
93}
94
95pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
97 model: &EntityModel,
98 access: AccessPlan<Value>,
99) -> Result<AccessPlan<E::Key>, PlanError> {
100 access.into_executable::<E>(model)
101}
102
103pub(crate) fn coerce_entity_key<E: EntityKind>(
105 model: &EntityModel,
106 key: &Value,
107) -> Result<E::Key, PlanError> {
108 E::Key::from_value(key).ok_or_else(|| {
109 PlanError::from(AccessPlanError::PrimaryKeyMismatch {
110 field: model.primary_key.name.to_string(),
111 key: key.clone(),
112 })
113 })
114}
115
116impl AccessPlan<Value> {
117 pub(crate) fn into_executable<E: EntityKind>(
119 self,
120 model: &EntityModel,
121 ) -> Result<AccessPlan<E::Key>, PlanError> {
122 match self {
123 Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
124 Self::Union(children) => {
125 let mut out = Vec::with_capacity(children.len());
126 for child in children {
127 out.push(child.into_executable::<E>(model)?);
128 }
129
130 Ok(AccessPlan::union(out))
131 }
132 Self::Intersection(children) => {
133 let mut out = Vec::with_capacity(children.len());
134 for child in children {
135 out.push(child.into_executable::<E>(model)?);
136 }
137
138 Ok(AccessPlan::intersection(out))
139 }
140 }
141 }
142}
143
144impl AccessPath<Value> {
145 pub(crate) fn into_executable<E: EntityKind>(
147 self,
148 model: &EntityModel,
149 ) -> Result<AccessPath<E::Key>, PlanError> {
150 match self {
151 Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
152 Self::ByKeys(keys) => {
153 let mut out = Vec::with_capacity(keys.len());
154 for key in keys {
155 out.push(coerce_entity_key::<E>(model, &key)?);
156 }
157
158 Ok(AccessPath::ByKeys(out))
159 }
160 Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
161 start: coerce_entity_key::<E>(model, &start)?,
162 end: coerce_entity_key::<E>(model, &end)?,
163 }),
164 Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
165 Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
166 Self::FullScan => Ok(AccessPath::FullScan),
167 }
168 }
169}
170
171#[derive(Debug)]
179pub(crate) struct QueryModel<'m, K> {
180 model: &'m EntityModel,
181 mode: QueryMode,
182 predicate: Option<Predicate>,
183 key_access: Option<KeyAccessState<K>>,
184 key_access_conflict: bool,
185 group: Option<crate::db::query::plan::GroupSpec>,
186 order: Option<OrderSpec>,
187 distinct: bool,
188 consistency: ReadConsistency,
189}
190
191impl<'m, K: FieldValue> QueryModel<'m, K> {
192 #[must_use]
193 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
194 Self {
195 model,
196 mode: QueryMode::Load(LoadSpec::new()),
197 predicate: None,
198 key_access: None,
199 key_access_conflict: false,
200 group: None,
201 order: None,
202 distinct: false,
203 consistency,
204 }
205 }
206
207 #[must_use]
209 pub(crate) const fn mode(&self) -> QueryMode {
210 self.mode
211 }
212
213 #[must_use]
214 fn has_explicit_order(&self) -> bool {
215 policy::has_explicit_order(self.order.as_ref())
216 }
217
218 #[must_use]
219 const fn load_spec(&self) -> Option<LoadSpec> {
220 match self.mode {
221 QueryMode::Load(spec) => Some(spec),
222 QueryMode::Delete(_) => None,
223 }
224 }
225
226 #[must_use]
228 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
229 self.predicate = match self.predicate.take() {
230 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
231 None => Some(predicate),
232 };
233 self
234 }
235
236 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
238 let schema = SchemaInfo::from_entity_model(self.model)?;
239 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
240
241 Ok(self.filter(predicate))
242 }
243
244 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
246 let schema = SchemaInfo::from_entity_model(self.model)?;
247 let order = match expr.lower_with(&schema) {
248 Ok(order) => order,
249 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
250 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
251 };
252
253 policy::validate_order_shape(Some(&order))
254 .map_err(IntentError::from)
255 .map_err(QueryError::from)?;
256
257 Ok(self.order_spec(order))
258 }
259
260 #[must_use]
262 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
263 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
264 self
265 }
266
267 #[must_use]
269 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
270 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
271 self
272 }
273
274 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
276 self.order = Some(order);
277 self
278 }
279
280 #[must_use]
282 pub(crate) const fn distinct(mut self) -> Self {
283 self.distinct = true;
284 self
285 }
286
287 fn push_group_field(mut self, field: &str) -> Result<Self, QueryError> {
290 let Some(field_slot) = FieldSlot::resolve(self.model, field) else {
291 return Err(QueryError::from(PlanError::from(
292 GroupPlanError::UnknownGroupField {
293 field: field.to_string(),
294 },
295 )));
296 };
297 let group = self.group.get_or_insert(GroupSpec {
298 group_fields: Vec::new(),
299 aggregates: Vec::new(),
300 execution: GroupedExecutionConfig::unbounded(),
301 });
302 if !group
303 .group_fields
304 .iter()
305 .any(|existing| existing.index() == field_slot.index())
306 {
307 group.group_fields.push(field_slot);
308 }
309
310 Ok(self)
311 }
312
313 fn push_group_aggregate(
315 mut self,
316 kind: GroupAggregateKind,
317 target_field: Option<String>,
318 ) -> Self {
319 let group = self.group.get_or_insert(GroupSpec {
320 group_fields: Vec::new(),
321 aggregates: Vec::new(),
322 execution: GroupedExecutionConfig::unbounded(),
323 });
324 group
325 .aggregates
326 .push(GroupAggregateSpec { kind, target_field });
327
328 self
329 }
330
331 fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
333 let group = self.group.get_or_insert(GroupSpec {
334 group_fields: Vec::new(),
335 aggregates: Vec::new(),
336 execution: GroupedExecutionConfig::unbounded(),
337 });
338 group.execution = GroupedExecutionConfig::with_hard_limits(max_groups, max_group_bytes);
339
340 self
341 }
342
343 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
345 if let Some(existing) = &self.key_access
346 && existing.kind != kind
347 {
348 self.key_access_conflict = true;
349 }
350
351 self.key_access = Some(KeyAccessState { kind, access });
352
353 self
354 }
355
356 pub(crate) fn by_id(self, id: K) -> Self {
358 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
359 }
360
361 pub(crate) fn by_ids<I>(self, ids: I) -> Self
363 where
364 I: IntoIterator<Item = K>,
365 {
366 self.set_key_access(
367 KeyAccessKind::Many,
368 KeyAccess::Many(ids.into_iter().collect()),
369 )
370 }
371
372 pub(crate) fn only(self, id: K) -> Self {
374 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
375 }
376
377 #[must_use]
379 pub(crate) const fn delete(mut self) -> Self {
380 if self.mode.is_load() {
381 self.mode = QueryMode::Delete(DeleteSpec::new());
382 }
383 self
384 }
385
386 #[must_use]
390 pub(crate) const fn limit(mut self, limit: u32) -> Self {
391 match self.mode {
392 QueryMode::Load(mut spec) => {
393 spec.limit = Some(limit);
394 self.mode = QueryMode::Load(spec);
395 }
396 QueryMode::Delete(mut spec) => {
397 spec.limit = Some(limit);
398 self.mode = QueryMode::Delete(spec);
399 }
400 }
401 self
402 }
403
404 #[must_use]
406 pub(crate) const fn offset(mut self, offset: u32) -> Self {
407 if let QueryMode::Load(mut spec) = self.mode {
408 spec.offset = offset;
409 self.mode = QueryMode::Load(spec);
410 }
411 self
412 }
413
414 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
416 let schema_info = SchemaInfo::from_entity_model(self.model)?;
418 self.validate_intent()?;
419
420 let normalized_predicate = self
422 .predicate
423 .as_ref()
424 .map(|predicate| {
425 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
426 let predicate = normalize_enum_literals(&schema_info, predicate)?;
427 Ok::<Predicate, ValidateError>(normalize(&predicate))
428 })
429 .transpose()?;
430 let access_plan_value = match &self.key_access {
431 Some(state) => access_plan_from_keys_value(&state.access),
432 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
433 };
434
435 let scalar = ScalarPlan {
437 mode: self.mode,
438 predicate: normalized_predicate.map(lower_to_execution_model),
439 order: canonicalize_order_spec(self.model, self.order.clone()),
442 distinct: self.distinct,
443 delete_limit: match self.mode {
444 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
445 QueryMode::Load(_) => None,
446 },
447 page: match self.mode {
448 QueryMode::Load(spec) => {
449 if spec.limit.is_some() || spec.offset > 0 {
450 Some(PageSpec {
451 limit: spec.limit,
452 offset: spec.offset,
453 })
454 } else {
455 None
456 }
457 }
458 QueryMode::Delete(_) => None,
459 },
460 consistency: self.consistency,
461 };
462 let mut plan =
463 AccessPlannedQuery::from_parts(LogicalPlan::Scalar(scalar), access_plan_value);
464 if let Some(group) = self.group.clone() {
465 plan = plan.into_grouped(group);
466 }
467
468 if plan.grouped_plan().is_some() {
469 validate_group_query_semantics(&schema_info, self.model, &plan)?;
470 } else {
471 validate_query_semantics(&schema_info, self.model, &plan)?;
472 }
473
474 Ok(plan)
475 }
476
477 fn validate_intent(&self) -> Result<(), IntentError> {
479 if self.key_access_conflict {
480 return Err(IntentError::KeyAccessConflict);
481 }
482
483 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
484 .map_err(IntentError::from)?;
485
486 if let Some(state) = &self.key_access {
487 match state.kind {
488 KeyAccessKind::Many if self.predicate.is_some() => {
489 return Err(IntentError::ByIdsWithPredicate);
490 }
491 KeyAccessKind::Only if self.predicate.is_some() => {
492 return Err(IntentError::OnlyWithPredicate);
493 }
494 _ => {
495 }
497 }
498 }
499
500 Ok(())
501 }
502}
503
504#[derive(Debug)]
522pub struct PlannedQuery<E: EntityKind> {
523 plan: AccessPlannedQuery<E::Key>,
524 _marker: PhantomData<E>,
525}
526
527impl<E: EntityKind> PlannedQuery<E> {
528 #[must_use]
529 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
530 Self {
531 plan,
532 _marker: PhantomData,
533 }
534 }
535
536 #[must_use]
537 pub fn explain(&self) -> ExplainPlan {
538 self.plan.explain_with_model(E::MODEL)
539 }
540
541 #[must_use]
542 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
543 self.plan
544 }
545}
546
547#[derive(Debug)]
548pub struct Query<E: EntityKind> {
549 intent: QueryModel<'static, E::Key>,
550 _marker: PhantomData<E>,
551}
552
553impl<E: EntityKind> Query<E> {
554 #[must_use]
558 pub const fn new(consistency: ReadConsistency) -> Self {
559 Self {
560 intent: QueryModel::new(E::MODEL, consistency),
561 _marker: PhantomData,
562 }
563 }
564
565 #[must_use]
567 pub const fn mode(&self) -> QueryMode {
568 self.intent.mode()
569 }
570
571 #[must_use]
572 pub(crate) fn has_explicit_order(&self) -> bool {
573 self.intent.has_explicit_order()
574 }
575
576 #[must_use]
577 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
578 self.intent.load_spec()
579 }
580
581 #[must_use]
583 pub fn filter(mut self, predicate: Predicate) -> Self {
584 self.intent = self.intent.filter(predicate);
585 self
586 }
587
588 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
590 let Self { intent, _marker } = self;
591 let intent = intent.filter_expr(expr)?;
592
593 Ok(Self { intent, _marker })
594 }
595
596 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
598 let Self { intent, _marker } = self;
599 let intent = intent.sort_expr(expr)?;
600
601 Ok(Self { intent, _marker })
602 }
603
604 #[must_use]
606 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
607 self.intent = self.intent.order_by(field);
608 self
609 }
610
611 #[must_use]
613 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
614 self.intent = self.intent.order_by_desc(field);
615 self
616 }
617
618 #[must_use]
620 pub fn distinct(mut self) -> Self {
621 self.intent = self.intent.distinct();
622 self
623 }
624
625 pub fn group_by(self, field: impl AsRef<str>) -> Result<Self, QueryError> {
627 let Self { intent, _marker } = self;
628 let intent = intent.push_group_field(field.as_ref())?;
629
630 Ok(Self { intent, _marker })
631 }
632
633 #[must_use]
635 pub fn group_count(mut self) -> Self {
636 self.intent = self
637 .intent
638 .push_group_aggregate(GroupAggregateKind::Count, None);
639 self
640 }
641
642 #[must_use]
644 pub fn group_exists(mut self) -> Self {
645 self.intent = self
646 .intent
647 .push_group_aggregate(GroupAggregateKind::Exists, None);
648 self
649 }
650
651 #[must_use]
653 pub fn group_first(mut self) -> Self {
654 self.intent = self
655 .intent
656 .push_group_aggregate(GroupAggregateKind::First, None);
657 self
658 }
659
660 #[must_use]
662 pub fn group_last(mut self) -> Self {
663 self.intent = self
664 .intent
665 .push_group_aggregate(GroupAggregateKind::Last, None);
666 self
667 }
668
669 #[must_use]
671 pub fn group_min_by(mut self, field: impl AsRef<str>) -> Self {
672 self.intent = self
673 .intent
674 .push_group_aggregate(GroupAggregateKind::Min, Some(field.as_ref().to_string()));
675 self
676 }
677
678 #[must_use]
680 pub fn group_max_by(mut self, field: impl AsRef<str>) -> Self {
681 self.intent = self
682 .intent
683 .push_group_aggregate(GroupAggregateKind::Max, Some(field.as_ref().to_string()));
684 self
685 }
686
687 #[must_use]
689 pub fn grouped_limits(mut self, max_groups: u64, max_group_bytes: u64) -> Self {
690 self.intent = self.intent.grouped_limits(max_groups, max_group_bytes);
691 self
692 }
693
694 pub(crate) fn by_id(self, id: E::Key) -> Self {
696 let Self { intent, _marker } = self;
697 Self {
698 intent: intent.by_id(id),
699 _marker,
700 }
701 }
702
703 pub(crate) fn by_ids<I>(self, ids: I) -> Self
705 where
706 I: IntoIterator<Item = E::Key>,
707 {
708 let Self { intent, _marker } = self;
709 Self {
710 intent: intent.by_ids(ids),
711 _marker,
712 }
713 }
714
715 #[must_use]
717 pub fn delete(mut self) -> Self {
718 self.intent = self.intent.delete();
719 self
720 }
721
722 #[must_use]
728 pub fn limit(mut self, limit: u32) -> Self {
729 self.intent = self.intent.limit(limit);
730 self
731 }
732
733 #[must_use]
737 pub fn offset(mut self, offset: u32) -> Self {
738 self.intent = self.intent.offset(offset);
739 self
740 }
741
742 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
744 let plan = self.planned()?;
745
746 Ok(plan.explain())
747 }
748
749 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
751 let plan = self.build_plan()?;
752
753 Ok(PlannedQuery::new(plan))
754 }
755
756 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
758 let plan_value = self.intent.build_plan_model()?;
759 let (logical, access) = plan_value.into_parts();
760 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
761 let plan = AccessPlannedQuery::from_parts(logical, access);
762
763 Ok(plan)
764 }
765}
766
767impl<E> Query<E>
768where
769 E: EntityKind + SingletonEntity,
770 E::Key: Default,
771{
772 pub(crate) fn only(self) -> Self {
774 let Self { intent, _marker } = self;
775
776 Self {
777 intent: intent.only(E::Key::default()),
778 _marker,
779 }
780 }
781}
782
783#[derive(Debug, ThisError)]
788pub enum QueryError {
789 #[error("{0}")]
790 Validate(#[from] ValidateError),
791
792 #[error("{0}")]
793 Plan(Box<PlanError>),
794
795 #[error("{0}")]
796 Intent(#[from] IntentError),
797
798 #[error("{0}")]
799 Response(#[from] ResponseError),
800
801 #[error("{0}")]
802 Execute(#[from] InternalError),
803}
804
805impl From<PlannerError> for QueryError {
806 fn from(err: PlannerError) -> Self {
807 match err {
808 PlannerError::Plan(err) => Self::from(*err),
809 PlannerError::Internal(err) => Self::Execute(*err),
810 }
811 }
812}
813
814impl From<PlanError> for QueryError {
815 fn from(err: PlanError) -> Self {
816 Self::Plan(Box::new(err))
817 }
818}
819
820#[derive(Clone, Copy, Debug, ThisError)]
825pub enum IntentError {
826 #[error("{0}")]
827 PlanShape(#[from] policy::PlanPolicyError),
828
829 #[error("by_ids() cannot be combined with predicates")]
830 ByIdsWithPredicate,
831
832 #[error("only() cannot be combined with predicates")]
833 OnlyWithPredicate,
834
835 #[error("multiple key access methods were used on the same query")]
836 KeyAccessConflict,
837
838 #[error("cursor pagination requires an explicit ordering")]
839 CursorRequiresOrder,
840
841 #[error("cursor pagination requires an explicit limit")]
842 CursorRequiresLimit,
843
844 #[error("cursor tokens can only be used with .page().execute()")]
845 CursorRequiresPagedExecution,
846}
847
848impl From<policy::CursorPagingPolicyError> for IntentError {
849 fn from(err: policy::CursorPagingPolicyError) -> Self {
850 match err {
851 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
852 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
853 }
854 }
855}
856
857fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
859 match order {
860 Some(mut spec) => {
861 spec.fields.push((field.to_string(), direction));
862 spec
863 }
864 None => OrderSpec {
865 fields: vec![(field.to_string(), direction)],
866 },
867 }
868}
869
870fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
875 let mut order = order?;
876 if order.fields.is_empty() {
877 return Some(order);
878 }
879
880 let pk_field = model.primary_key.name;
881 let mut pk_direction = None;
882 order.fields.retain(|(field, direction)| {
883 if field == pk_field {
884 if pk_direction.is_none() {
885 pk_direction = Some(*direction);
886 }
887 false
888 } else {
889 true
890 }
891 });
892
893 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
894 order.fields.push((pk_field.to_string(), pk_direction));
895
896 Some(order)
897}